Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka 生产与消费

Kafka 生产与消费

原创
作者头像
大鹅
发布于 2019-09-17 02:32:58
发布于 2019-09-17 02:32:58
1.2K0
举报

@toc

1. 概述

接着上一篇博客,本篇主要介绍Kafka的生产与消费的过程。Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息。

图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。

2. 生产

创建一条记录,记录中一个要指定对应的topicvaluekeypartition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。

如果partition没填,那么情况会是这样的:

  • key有填 按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
  • key没填 round-robin来选partition

这些要发往同一个partition的请求按照配置,攒一批然后由一个单独的线程一次性发过去。

2.1 partition分配与Leader选举

当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)

然后这里就涉及两个细节:怎么分配partition,怎么选leader。

关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。

2.1.1 partition分配
  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
  3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
2.1.2 Leader选举

controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk 的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来做leader。

选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker。这里不是使用zk通知,而是直接给broker发送rpc请求。

如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。

2.2 多副本同步

这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。

生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。

acks

what happen

0

which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。

1

which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。

-1

which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。

在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。

这里ISR列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。

从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;

从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。

也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。

3. 消费

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。

3.1 offset保存

一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。

0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:

代码语言:txt
AI代码解释
复制
__consumers_offsets partition =
	           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   
	//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
3.2 分配partition--reblance

生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。

下面从顶向下,分别阐述一下

  1. 怎么选coordinator
  2. 交互流程
  3. reblance的流程
3.2.1 选coordinator
  1. 看offset保存在那个partition
  2. 该partition leader所在的broker就是被选定的coordinator

这里我们可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。

3.2.2 交互流程

把coordinator选出来之后,就是要分配了

整个流程是这样的:

  1. consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。
  2. consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。
3.2.3 reblance流程
  1. consumer给coordinator发送JoinGroupRequest请求。
  2. 这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
  3. 其他consumer发送JoinGroupRequest请求。
  4. 所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
  5. consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
  6. coordinator回包,把分配的情况告诉consumer,包括leader。 当partition或者消费者的数量发生变化时,都得进行reblance。

列举一下会reblance的情况:

  1. 增加partition
  2. 增加消费者
  3. 消费者主动关闭
  4. 消费者宕机
  5. coordinator自己也宕机

4. 消息投递语义

kafka支持3种消息投递语义

  • At most once:最多一次,消息可能会丢失,但不会重复
  • At least once:最少一次,消息不会丢失,可能会重复
  • Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)

在业务中,常常都是使用At least once的模型,如果需要可重入的话,往往是业务自己实现。

4.1 At least once

先获取数据,再进行业务处理,业务处理成功后commit offset。

  1. 生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息
  2. 消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费
4.2 At most once

先获取数据,再commit offset,最后进行业务处理。

  1. 生产者生产消息异常,不管,生产下一个消息,消息就丢了
  2. 消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了
4.3 Exactly once

思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 首先想出来的:

生产者重做导致重复写入消息----生产保证幂等性

消费者重复消费---消灭重复消费,或者业务接口保证幂等性重复消费也没问题

由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。

生产者幂等性好做,没啥问题。

解决重复消费有两个方法:

  1. 下游系统保证幂等性,重复消费也不会导致多条记录。
  2. 把commit offset和业务处理绑定成一个事务。

本来exactly once实现第1点就ok了。

但是在一些使用场景下,我们的数据源可能是多个topic,处理后输出到多个topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其他topic绑定成一个事务。

Ref

  1. https://www.jianshu.com/p/d3e963ff8b70

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
震惊了,原来这才是Kafka的“真面目”?!
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/08/31
2350
震惊了,原来这才是Kafka的“真面目”?!
【万字长文】Kafka最全知识点整理(建议收藏)
Kafka是一个开源的高吞吐量的分布式消息中间件,对比于其他 1) 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
857技术社区
2022/05/17
27.3K2
【万字长文】Kafka最全知识点整理(建议收藏)
震惊了,原来这才是Kafka的“真面目”!
Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
Java帮帮
2019/05/22
1.4K0
震惊了,原来这才是Kafka的“真面目”!
都说Kafka牛3万字带你全面掌握kafka
系统间的耦合高怎么办,我们如何不让一个服务过于庞大,一个好的方式就是依据具体的功能模块拆分服务,降低服务的耦合度,服务间的交互可以通过消息传递数据来实现,除此之外Kafka非常适合在线日志收集等高吞吐场景,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息,所以kafka被各大公司广泛运用于消息队列的构建:
大数据老哥
2021/03/05
1.3K0
都说Kafka牛3万字带你全面掌握kafka
Kafka进阶面试题分享
1) 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
857技术社区
2022/05/17
1.2K0
Kafka进阶面试题分享
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
全栈程序员站长
2022/08/24
16.8K4
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
首页 归档 分类 标签 作者 kafka原理总结
分区策略决定 producer 将消息怎么分发到 partition 中, 分区策略不合适可能导致数据倾斜, 有些时候我们需要实现顺序消息, 也需要将同一业务的消息都发送到同一个 partition 上。生产端将消息发送给 broker 之前主要经过拦截、序列化、分区(Partitioner)几个步骤。分区器主要读取 partition 配置(生产端配置partitioner.class, 默认值是 DefaultPartitioner)
leobhao
2023/03/08
4640
首页  归档  分类  标签  作者     kafka原理总结
Kafka 面试真题及答案,建议收藏
Kafka可以说是必知必会的了,首先面试大数据岗位的时候必问kafka,甚至现在java开发岗位也会问到kafka一些消息队列相关的知识点。先来看看有哪些最新的Kafka相关面试点:
大数据技术架构
2020/06/05
3.2K0
Kafka 面试真题及答案,建议收藏
Kafka是如何保证高性能和高吞吐量的?
最早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控。
IT大咖说
2019/11/14
2.9K0
你可能需要的Kafka面试题与部分答案整理
场景:数据比较集中且实时要求不是太高,如果同步处理,假如业务高峰需要4台服务支撑,那么在业务高峰过了之后,就会出现资源闲置,如果引入消息队列的话,将数据放到消息队列后直接返回成功,提升了响应时间,真正的业务在消息队列后面消费处理,可能2台服务就能够支撑的住,而且流量更加均匀。
大数据真好玩
2020/02/13
9010
kafka架构原理最全解释
答:Kafka是一个发布 - 订阅的消息队列中间件。这个消息传递应用程序是用“scala”编码的。 kafka 支持的协议是防AMQP协议,支持集群,负载均衡和动态扩容(zk), 不支持事务;
Tim在路上
2020/08/04
2.9K0
Kafka学习笔记之概述、入门、架构深入
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
大数据真好玩
2021/03/15
7560
kafka学习笔记:知识点整理
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
Java架构师必看
2021/08/05
3870
第一天:Kafka理论学习
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
sowhat1412
2020/11/05
5190
第一天:Kafka理论学习
Java基础面试题【分布式】Kafka
producer发送消息完,只等待lead写入成功就返回了,leader crash了,这时follower没来及同步,消 息丢失。
@派大星
2023/10/25
3130
Java基础面试题【分布式】Kafka
Kafka系列之高频面试题
ISR是由Leader维护,Follower从Leader同步数据有一些延迟,超过配置的阈值会把Follower剔除出ISR,存入OSR列表,新加入的Follower也会先存放在OSR中。AR=ISR+OSR。
johnny666
2024/09/21
1470
Kafka原理和实践
本文从Kafka的基本概念、特点、部署和配置、监控和管理等方面阐述 Kafka 的实践过程。
杨振涛
2019/08/08
1.4K0
一篇文章把RabbitMQ、RocketMQ、Kafka三元归一
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/07/12
7660
一篇文章把RabbitMQ、RocketMQ、Kafka三元归一
一文快速了解Kafka
初学Kafka,肯定会被各种概念搞得很头疼,所以整理下Kafka进阶学习必须要了解的概念。
全菜工程师小辉
2021/05/17
1.1K0
一文快速了解Kafka
Kafka基础(一):基本概念及生产者、消费者示例
Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 Zookeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性被广泛使用。目前越来越多的开源式分布处理系统如:Storm、Spark、Flink 等都支持与 Kafka 集成。
create17
2019/06/19
9370
相关推荐
震惊了,原来这才是Kafka的“真面目”?!
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档