1.创建生产者
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])
future = producer.send("pic_collect", b'I am rito yan')
try:
record_metadata = future.get(timeout=10)
print(record_metadata)
except KafkaError as e:
print(e)
2.创建消费者:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"pic_collect",
group_id="pic_consumer",
bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有