连接到别人的公共Kafka主题需要遵循以下步骤:
broker1:9092,broker2:9092
)。confluent-kafka-python
库;对于Java,可以使用org.apache.kafka:kafka-clients
库。bootstrap.servers
:Kafka集群的地址。group.id
:消费者组ID,用于标识消费者所属的组。key.deserializer
和value.deserializer
:用于反序列化消息键和值的类。sasl.mechanism
、security.protocol
、sasl.jaas.config
等参数。subscribe
方法订阅公共主题。poll
方法轮询并处理消息。以下是一个使用Python的confluent-kafka-python
库连接到公共Kafka主题的示例:
from confluent_kafka import Consumer, KafkaException
# Kafka集群的连接信息
bootstrap_servers = 'broker1:9092,broker2:9092'
group_id = 'my-consumer-group'
# 创建Kafka消费者实例
conf = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'key.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer'
}
if security_enabled:
conf.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.jaas.config': 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your_username" password="your_password";'
})
consumer = Consumer(conf)
# 订阅公共主题
topic = 'public_topic'
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
请根据您的实际情况修改上述代码中的Kafka集群连接信息、消费者组ID、主题名称等参数。如果Kafka集群启用了安全认证,请确保正确配置了安全相关的参数。
领取专属 10元无门槛券
手把手带您无忧上云