前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 消费者原理(4)

Kafka 消费者原理(4)

作者头像
兜兜毛毛
发布2021-04-02 07:52:15
1.5K0
发布2021-04-02 07:52:15
举报
文章被收录于专栏:兜兜毛毛

本篇文章主要讲解消费者和Partition的关系以及消费的一些相关问题。

Offset维护

通过前几篇文章我们知道在Partition中,消息是不会删除的,所以才可以追加写入,写入的消息是连续并且有序的。

这种特性决定了kafka可以消费历史消息,而且按照消息的顺序消费指定消息,而不是只能消费队头的消息。

正常情况下我们希望消费没有被消费过的数据,而且是从最先发送(序号最小的)的开始消费(这样才是有序和公平的)。

对于一个Partition,消费者组(Consumer Gup)怎么才能做到接着上次消费的位置(offset)继续消费呢?

肯定需要将这个对应关系保存起来,下次消费的时候查找一下。(还有一种方式是根据时间戳消费)

首先对应关系确实是可以查看到的。比如消费者组:test-group-1 和 test-topic(5个分区)的partition的偏移量关系,可以使用如下命令查看

代码语言:javascript
复制
./kafka-consumer-groups.sh --bootstrap-server localhost --describe --group test-group-1

PARTITION

CURRENT-OFFSET

LOG-END-OFFSET

LAG

CONSUMER-ID

0

5

5

0

consumer-1

1

5

5

0

consumer-1

2

5

5

0

consumer-1

3

5

5

0

consumer-2

4

5

5

0

consumer-2

CURRENT-OFFSET:指的是下一个未使用的offset。

LEO(LOG-END-OFFSET):下一条等待写入的消息的offset(最新的offset + 1)

LAG:延迟量

注意:消费者与topic的关系是一个consumer group 和 topic 中的一个partition的关系,不是一个消费者和一个topic的关系。

offset的对应关系到底是保存在哪里的呢?

首先可以排除不会在消费者本地的,因为所有消费者都可以使用这个consumer group id,放在本地是做不到统一维护的,肯定要放到服务端。

kafka早期的版本把消费者组和partition的offset直接维护在ZK中,但是读写的性能消耗太大了。后来就放在一个特殊的topic中,名字叫_consumer_offsets,默认有50个分区(offset.tipic.num.partitions=50),每个分区默认一个replication。

代码语言:javascript
复制
./kafka-topics.sh --topic __connsumer_offsets --describe --zookeeper localhost:2181

看起来这些分区副本在3个Broker上非常均匀和轮流地分布。

这样一个特殊的Topic怎么存储消费者组test-group-1对于分区的偏移量呢?

Topic里面是可以存放对象类型的value的(经过序列化和反序列化)。这个Topic里面主要存储的两种对象:

GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者都有编号)。

OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据。

代码语言:javascript
复制
./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

如果找不到offset

以上都是Broker有记录的offset的情况,如果说增加了一个新的消费者组去消费一个topic的某个partition,没有offset记录,这个时候应该从哪里开始消费呢?

什么情况下找不到offset?

就是你从来没有消费过,没有把当前的offset上报给Broker。

消费者代码中有一个参数,用来控制如果找不到偏移量的时候从哪里开始消费。

代码语言:javascript
复制
auto.offset.rest=latest

参数

说明

latest(默认)

从最新的消息开始消费(最后发送的)。历史消息是不能消费的

earliest

从最早的消息开始消费(最先发送的)。可以消费到历史消息

none

consumer group 在服务端找不到offset会报错。

anything else

向消费者抛出异常。

Offset更新

上边讲了消费者组的offset是保存在Broker的,但是,是由消费者上报给Broker的。并不是消费者组消费了消息,offset就会更新,消费者必须要有一个commit的动作。就跟RabbitMQ中消费者的ACK一样。

同样的,消费者可以自动提交或手动提交。由消费端的这个参数控制:

代码语言:javascript
复制
ennable.auto.commit=true

默认是true。true代表消费者消费消息以后自动提交此时的Broker会更新消费者组的offset。

还有一个参数可以控制自动提交频率:

代码语言:javascript
复制
auto.commit.interval.ms=5

如果我们要在消费完消息做完业务逻辑处理之后才commit,就要把这个值改成false。如果是false,消费者就必须要调用一个方法让Broker更新offset。

有两种方式:

代码语言:javascript
复制
consumer.commitSync() 的手动同步提交。
consumer.commitAsync()手动异步提交。

如果不提交或者提交失败,Broker的offset不会更新,消费者组下次消费的时候会消费到重复的消息。

消费者策略

多个consumer group和partition的关系?

重复消费。任何一个消费者组,都会把一个topic的所有partition分配完。

上边我们说过,一个消费者组里面的一个消费者,只能消费Topic的一个分区。

如果分区数量跟消费者数量一样,那就一人消费一个。如果是消费者比分区多,或者消费者比分区少,这时消费者跟分区的关系是怎样的呢?

如果消费者比分区多,肯定有一些消费者消费不到(空闲)。

如2个消费者消费5个分区,如果分配呢?

创建一个5个分区的topic。

代码语言:javascript
复制
./kafka-topic.sh --create --zookeeper localhost:2181 --partition 5 --replication-factor 1 --topic test-5-part

实际上是采用了默认策略:RangeAssignor

分策略图:

StickyAssignor:这种策略复杂一点,但相对来说均匀一点(每次的结果都可能不一样)原则:

  1. 分区的分配尽可能的均匀
  2. 分区的分配尽可能和上次分配保持相同
rebalance 分区重分配

有两种情况需要重新分配分区和消费者的关系:

  • 消费者组的消费者数量发生biannhua,比如新增加了消费者,消费者关闭连接。
  • Topic的分区数发生变化,新增或者减少。

为了让分区分配尽量地均匀,这个时候会触发rebalance机制。

分区重新分配可以分成以下几步:

  1. 找一个话事人,它起到一个监督和保证公平的作用。每个Broker上都有一个用来管理offset、消费者组的实例,叫做GroupCoordinator。第一步就是要从所有GroupCoordinator中找一个话事人出来。
  2. 清点人数。所有的消费者连接到GroupCoordinator报数,这个叫join group请求。
  3. 选组长,GroupCoordinator从所有消费者里面选一个leader。这个消费者会根据消费者的情况和设置的策略,确定一个方案。leader把方案上报给GroupCoordinator,GroupCoordinator会通知所有消费者。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Offset维护
  • 如果找不到offset
  • Offset更新
  • 消费者策略
    • rebalance 分区重分配
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档