首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何在kafka中创建新的消费群体

如何在kafka中创建新的消费群体
EN

Stack Overflow用户
提问于 2020-05-13 17:30:03
回答 1查看 16.2K关注 0票数 5

我正在按照快速入门指南here上的说明在本地运行kafka。

然后,我在config/consumer.properties中定义了我的使用者组配置,以便我的使用者可以从定义的group.id中挑选消息

运行以下命令,

代码语言:javascript
代码运行次数:0
运行
复制
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

结果是,

代码语言:javascript
代码运行次数:0
运行
复制
test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh

我能够通过一个基于python的消费者连接到kafka,该消费者被配置为使用provide group.id,即test-consumer-group

首先,我不能理解kafka是如何/何时创建消费者群体的。它似乎会在某个时间点加载conf/consumer.properties,并且在通过kafka-console-consumer.sh连接时还会隐式地创建消费者组(在我的例子中是console-consumer-67807)。

我如何才能显式地创建自己的消费者组,比如说my-created-consumer-group

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-13 17:37:49

您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论哪种技术(Spark,Spring,Flink,...)您正在使用,每个Kafka消费者都将有一个消费者组。消费者组可针对每个单独的消费者进行配置。

,它似乎在某个时刻加载了conf/consumer.properties,并且在通过kafka-console-consumer.sh连接时,它还隐式地创建了使用者组(在我的例子中是console-consumer-67807)

如果您不告诉控制台使用者实际使用该文件,则不会考虑该文件。

提供消费者组名称的方法有以下几种:

带有属性文件的控制台使用者(--consumer.config)

文件config/consumer.properties应该是这样的

代码语言:javascript
代码运行次数:0
运行
复制
# consumer group id
group.id=my-created-consumer-group

这就是您如何确保控制台消费者将此group.id考虑在内:

代码语言:javascript
代码运行次数:0
运行
复制
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties

带有--group的控制台使用者

对于控制台使用者,除非您通过添加--group来提供自己的使用者组,否则会自动创建带有前缀" console - consumer“和类似PID的后缀的使用者组

代码语言:javascript
代码运行次数:0
运行
复制
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group

标准的基于代码的消费者API

当使用标准的JAVA/Scala/...Consumer API您可以通过属性为Consumer Group提供以下属性:

代码语言:javascript
代码运行次数:0
运行
复制
Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61770993

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档