我正试图使用凝聚式Kafka Python从bluemix上的消息中心上获取来自主题的消息。我的代码在下面找到,但有些东西不起作用。主题和消息中心已经启动并运行,所以代码中可能有一些东西。
from confluent_kafka import Producer, KafkaError, Consumer
consumer_settings = {
'bootstrap.servers': 'broker-url-here',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'ssl',
'sasl.username': 'username-here',
'sasl.password': 'password-here',
}
c = Consumer(**consumer_settings)
c.subscribe(['topic-here'])
running = True
while running:
msg = c.poll()
if msg.error():
print("Error while retrieving message")
c.close()
sys.exit(10)
elif (msg is not None):
for x in msg:
print(x)
else:
sys.exit(10)当我运行代码时,它似乎被msg = c.poll()卡住了。所以我想这要么是连接失败,要么是无法检索消息。证书本身是正确的。
发布于 2018-03-19 21:05:24
消费逻辑看起来很好,但是使用者的配置不正确。
security.protocol设置为sasl_sslssl.ca.location需要指向包含受信任证书的PEM文件。该文件的位置因操作系统而异,但对于最常见的操作系统:/etc/ssl/certs/etc/pki/tls/cert.pem/etc/ssl/certs.pem
我们还有一个使用这个客户机的示例应用程序,它可以很容易地启动或部署到Bluemix:https://github.com/ibm-messaging/message-hub-samples/tree/master/kafka-python-console-sample
https://stackoverflow.com/questions/49322289
复制相似问题