我正在按照快速入门指南here上的说明在本地运行kafka。
然后,我在config/consumer.properties
中定义了我的使用者组配置,以便我的使用者可以从定义的group.id
中挑选消息
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
结果是,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
我能够通过一个基于python的消费者连接到kafka,该消费者被配置为使用provide group.id
,即test-consumer-group
首先,我不能理解kafka是如何/何时创建消费者群体的。它似乎会在某个时间点加载conf/consumer.properties
,并且在通过kafka-console-consumer.sh
连接时还会隐式地创建消费者组(在我的例子中是console-consumer-67807
)。
我如何才能显式地创建自己的消费者组,比如说my-created-consumer-group
?
发布于 2020-05-13 09:37:49
您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论哪种技术(Spark,Spring,Flink,...)您正在使用,每个Kafka消费者都将有一个消费者组。消费者组可针对每个单独的消费者进行配置。
,它似乎在某个时刻加载了conf/consumer.properties,并且在通过kafka-console-consumer.sh连接时,它还隐式地创建了使用者组(在我的例子中是console-consumer-67807)
如果您不告诉控制台使用者实际使用该文件,则不会考虑该文件。
提供消费者组名称的方法有以下几种:
带有属性文件的控制台使用者(--consumer.config)
文件config/consumer.properties
应该是这样的
# consumer group id
group.id=my-created-consumer-group
这就是您如何确保控制台消费者将此group.id
考虑在内:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
带有--group的控制台使用者
对于控制台使用者,除非您通过添加--group
来提供自己的使用者组,否则会自动创建带有前缀" console - consumer“和类似PID的后缀的使用者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
标准的基于代码的消费者API
当使用标准的JAVA/Scala/...Consumer API您可以通过属性为Consumer Group提供以下属性:
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")
https://stackoverflow.com/questions/61770993
复制