我有如下的kafka制作人:
import json
from kafka import KafkaProducer
def send(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_server)
message = prepare_message()
producer.send(self.topic, message)
sleep(2)
def prepare_message(self):
message = {"Topic": "Test",
"Message": "Test"}
return json.dumps(message)
当我这样做时,我得到以下错误:
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\json\__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\json\encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\json\encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\json\encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'bytes' is not JSON serializable
如何向kafka发送json消息?如果我对消息进行编码并发送该消息,则可以正常工作。但这不是我想做的。我想向kafka发布一个json对象。我该怎么做呢?
任何关于这方面的帮助都将不胜感激!
发布于 2021-06-27 01:26:51
我想通了。更正只是将prepare_message
方法的最后一行更改为:json.dumps(message).encode('utf-8')
。特别是,它现在作为python kafka生成器工作,生成json消息:
import json
from kafka import KafkaProducer
def send(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_server)
message = prepare_message()
producer.send(self.topic, message)
sleep(2)
def prepare_message(self):
message = {"Topic": "Test",
"Message": "Test"}
return json.dumps(message).encode('utf-8')
https://stackoverflow.com/questions/68144687
复制相似问题