1、 kafka 是什么,有什么作用
2、Kafka为什么这么快
3、Kafka架构及名词解释
4、Kafka中的AR、ISR、OSR代表什么
5、HW、LEO代表什么
6、ISR收缩性
7、kafka follower如何与leader同步数据
8、Zookeeper 在 Kafka 中的作用(早期)
9、Kafka如何快速读取指定offset的消息
10、生产者发送消息有哪些模式
11、发送消息的分区策略有哪些
12、Kafka可靠性保证(不丢消息)
13、Kafka 是怎么去实现负载均衡的
14、简述Kafka的Rebalance机制
15、Kafka 负载均衡会导致什么问题
16、如何增强消费者的消费能力
17、消费者与Topic的分区策略
18、如何保证消息不被重复消费(消费者幂等性)
19、为什么Kafka不支持读写分离
20、Kafka选举机制
21、脑裂问题
22、如何为Kafka集群选择合适的Topics/Partitions数量
23、Kafka 分区数可以增加或减少吗?为什么
24、谈谈你对Kafka生产者幂等性的了解
25、谈谈你对 Kafka事务的了解?
26、Kafka消息是采用Pull模式,还是Push模式?
27、Kafka缺点
28、Kafka什么时候会丢数据
29、Kafka分区数越多越好吗?
30、Kafka如何保证消息的有序性
31、Kafka精确一次性(Exactly-once)如何保证
Kafka是一个开源的高吞吐量的分布式消息中间件,对比于其他 1) 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
1) 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
1) 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
1) 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
1) 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
简易架构图如下:
详细架构图如下
ISR的伸缩:1)Leader跟踪维护ISR中follower滞后状态,落后太多或失效时,leade把他们从ISR剔除。2)OSR中follower“追上”Leader,在ISR中才有资格选举leader。
启动 Kafka时候自动开启的两个定时任务,“isr-expiration"和”isr-change-propagation"。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
1. Broker注册:Broker是分布式部署并且互相独立,此时需要有一个注册系统能够将整个集群中的Broker管理起来,此时就用到的Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokes/ids
2.Topic注册:在kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息以及与Broker的对应关系也都是由Zookeeper维护,由专门的节点记录:/brokers/topics
3.消费者注册:消费者服务器在初始化启动时加入消费者分组的步骤如下:注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumer/[groupid]/ids/[consumerid],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
4.生产者负载均衡:由于同一个Topic消息会被分区并将其分布在多个Broker上,因此生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
5.消费者负载均衡:与生产者相似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
6.分区与消费者的关系:消费组consumer group下有多个Consumer(消费者)。对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的Group ID,Group内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group) 同时,kafka为每个消费者分配一个Consumer ID,通常采用“Hostname:UUID”形式表示。在kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在zookeeper上记录消息分区与Consumer之间的关系,每个消费者一旦确定了对一个消费分区的消费权利,需要将其Consumer ID写入到平Zookeeper对应消息分区的临时节点上,例如:/consumers/[groupid]/owners/topic/[brokerid-partitionid] 其中,[brokerid-partition_id]就是一个消息分区的表示,节点内容就是该消息分区上消费者的Consumer ID。
7.补充:早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offse t的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖
Kafka本地日志存储根据segement分段存储,默认1G,其中segement包括index稀疏索引文件和log数据文件。其中index文件索引通过offset与posttion来定位数据文件中指定message的消息。其中index和log的文件名都为当前segement的起始offset。
读取offset=170418的消息,首先通过offset根据二分法定位到index索引文件,然后根据索引文件中的[offset,position](position为物理偏移地址)去log中获取指定offset的message数据。
对于生产者的异步发送来说就是,我发送完当前消息后,并不需要你将当前消息的发送结果立马告诉我,而是可以随即进行下一条消息的发送。但是我会允许添加一个回调函数,接收你后续返回的发送结果。异步发送这块我们直接调用kafkaProducer的send方法即可实现异步发送。
如果生产者需要使用同步发送的方式,只需要拿到 send 方法返回的future对象后,调用其 get() 方法即可。此时如果消息还未发送到broker中,get方法会被阻塞,等到 broker 返回消息发送结果后会跳出当前方法并将结果返回。
所谓分区写入策略,即是生产者将数据写入到kafka主题后,kafka如何将数据分配到不同分区中的策略。
常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略,而随机策略则是较老版本的分区策略,不过由于其分配的均衡性不如轮询策略,故而后来改成了轮询策略为默认策略。
所谓轮询策略,即按顺序轮流将每条数据分配到每个分区中。
举个例子,假设主题test有三个分区,分别是分区A,分区B和分区C。那么主题对接收到的第一条消息写入A分区,第二条消息写入B分区,第三条消息写入C分区,第四条消息则又写入A分区,依此类推。
轮询策略是默认的策略,故而也是使用最频繁的策略,它能最大限度保证所有消息都平均分配到每一个分区。除非有特殊的业务需求,否则使用这种方式即可。
随机策略,也就是每次都随机地将消息分配到每个分区。其实大概就是先得出分区的数量,然后每次获取一个随机数,用该随机数确定消息发送到哪个分区。
在比较早的版本,默认的分区策略就是随机策略,但其实使用随机策略也是为了更好得将消息均衡写入每个分区。但后来发现对这一需求而言,轮询策略的表现更优,所以社区后来的默认策略就是轮询策略了。
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储。
至于要如何实现,那也简单,只要让生产者发送的时候指定key就行。欸刚刚不是说默认的是轮询策略吗?其实啊,kafka默认是实现了两个策略,没指定key的时候就是轮询策略,有的话那激素按键保存策略了。
上面有说到一个场景,那就是要顺序发送消息到kafka。前面提到的方案是让所有数据存储到一个分区中,但其实更好的做法,就是使用这种按键保存策略。
让需要顺序存储的数据都指定相同的键,而不需要顺序存储的数据指定不同的键,这样一来,即实现了顺序存储的需求,又能够享受到kafka多分区的优势,岂不美哉。
所以如果使用默认的轮询partition策略,可能会造成一个大的batch被轮询成多个小的batch的情况。鉴于此,kafka2.4的时候推出一种新的分区策略,即StickyPartitioning Strategy,StickyPartitioning Strategy会随机地选择另一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
鉴于小batch可能导致延时增加,之前对于无Key消息的分区策略效率很低。社区于2.4版本引入了黏性分区策略(StickyPartitioning Strategy)。该策略是一种全新的策略,能够显著地降低给消息指定分区过程中的延时。使用StickyPartitioner有助于改进消息批处理,减少延迟,并减少broker的负载。
实现partitioner接口
切记分区是实现负载均衡以及高吞吐量的关键,所以一定要在生产者这一端就要考虑好合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,从而导致下游数据消费的性能下降的问题。
Kafka精确一次性(Exactly-once)保障之一
Kafka可靠性主要从三个方面来看,Broker、Producer、Consumer。1. Brokerbroker写数据时首先写到PageCache中,pageCache的数据通过linux的flusher程序异步批量存储至磁盘中,此过程称为刷盘。而pageCache位于内存。这部分数据会在断电后丢失。刷盘触发条件有三:
kafka没有提供同步刷盘的方式,也就是说理论上要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况,比如:
减少刷盘间隔log.flush.interval.ms(在刷新到磁盘之前,任何topic中的消息保留在内存中的最长时间) 减少刷盘数据量大小log.flush.interval.messages(在将消息刷新到磁盘之前,在日志分区上累积的消息数量)。
时间越短,数据量越小,性能越差,但是丢失的数据会变少,可靠性越好。这是一个选择题。
同时,Kafka通过producer和broker协同处理消息丢失的情况,一旦producer发现broker消息丢失,即可自动进行retry。retry次数可根据参数retries进行配置,超过指定次数会,此条消息才会被判断丢失。producer和broker之间,通过ack机制来判断消息是否丢失。
Topic 分区副本
在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。
Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
2. Producer
producer在发送数据时可以将多个请求进行合并后异步发送,合并后的请求首先缓存在本地buffer中,正常情况下,producer客户端的异步调用可以通过callback回调函数来处理消息发送失败或者超时的情况,但是当出现以下情况,将会出现数据丢失
针对以上情况,可以有以下解决思路。
3.Consumer
Consumer消费消息有以下几个步骤:
消费方式主要分为两种
Consumer自动提交机制是根据一定的时间间隔,将收到的消息进行commit,具体配置为:auto.commit.interval.ms。commit和消费的过程是异步的,也就是说可能存在消费过程未成功,commit消息就已经提交,此时就会出现消息丢失。我们可将提交类型改为手动提交,在消费完成后再进行提交,这样可以保证消息“至少被消费一次”(at least once),但如果消费完成后在提交过程中出现故障,则会出现重复消费的情况,本章不讨论,下章讲解。
分区器是生产者层面的负载均衡。Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进行分区分配的:
如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计算出一个分区号;拥有相同 Key 值的消息被写入同一个分区;如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner 接口,自行实现分区方法。
注:在笔者的理解中,分区器的负载均衡与顺序性有着一定程度上的矛盾。
主要根据消费者的Rebalance机制实现,内容详见下章
什么是 Rebalance
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。 例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机
Rebalance 的触发条件有3个。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
Rebalance 过程
Rebalance 过程分为两步:JoinGroup 请求和 SyncGroup 请求。JoinGroup :JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。SyncGroup:SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
在消费者组Rebalance期间,一直等到rebalance结束前,消费者会出现无法读取消息,造成整个消费者组一段时间内不可用。
1、如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数==分区数。两者缺一不可。
2、如果是下游的数据处理不及时:则提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
3、优化消费者的处理逻辑,提高处理效率
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。使用RoundRobin策略有两个前提条件必须满足:
无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。
Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动(上一次的结果是有粘性的),其目标有两点:
StickyAssignor的模式比其他两种提供更加均衡的分配结果,在发生Consumer或者Partition变更的情况下,也能减少不必要的分区调整。
Kafka精确一次性(Exactly-once)保障之一
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
出现原因:
解决思路:
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
Kafka选举主要分为以下三种:
控制器选举
控制器是Kafka的核心组件,它的主要作用是在Zookeeper的帮助下管理和协调整个Kafka集群包括所有分区与副本的状态。集群中任意一个Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器。集群中第一个启动的Broker会通过在Zookeeper中创建临时节点/controller来让自己成为控制器,其他Broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在Zookeeper中创建watch对象,便于它们收到控制器变更的通知。如果控制器与Zookeeper断开连接或异常退出,其他broker通过watch收到控制器变更的通知,就会尝试创建临时节点/controller,如果有一个Broker创建成功,那么其他broker就会收到创建异常通知,代表控制器已经选举成功,其他Broker只需创建watch对象即可。
控制器作用
分区副本选举机制
发生副本选举的情况:
分区leader副本的选举由Kafka控制器负责具体实施。主要过程如下:
分区副本分为ISR(同步副本)和OSR(非同步副本),当leader发生故障时,只有“同步副本”才可以被选举为leader。选举时按照集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。同时kafka支持OSR(非同步副本)也参加选举,Kafka broker端提供了一个参数unclean.leader.election.enable,用于控制是否允许非同步副本参与leader选举;如果开启,则当 ISR为空时就会从这些副本中选举新的leader,这个过程称为 Unclean leader选举。可以根据实际的业务场景选择是否开启Unclean leader选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。一般建议是关闭Unclean leader选举,因为通常数据的一致性要比可用性重要。
消费组(Consumer Group)选主
在Kafka的消费端,会有一个消费者协调器以及消费组,组协调器(Group Coordinator)需要为消费组内的消费者选举出一个消费组的leader。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,选举源码如下:
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
在组协调器中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。而leader的取值为HashMap中的第一个键值对的key(这种选举方式等同于随机)。
消费组的Leader和Coordinator没有关联。消费组的leader负责Rebalance过程中消费分配方案的制定。
controller挂掉后,Kafka集群会重新选举一个新的controller。这里面存在一个问题,很难确定之前的controller节点是挂掉还是只是短暂性的故障。如果之前挂掉的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller。
其实这种情况是很容易发生。比如,某个controller由于GC而被认为已经挂掉,并选择了一个新的controller。在GC的情况下,在最初的controller眼中,并没有改变任何东西,该Broker甚至不知道它已经暂停了。因此,它将继续充当当前controller,这是分布式系统中的常见情况,称为脑裂。
假如,处于活跃状态的controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller节点被删除。集群中其他Broker会收到zookeeper的这一通知。
由于集群中必须存在一个controller Broker,所以现在每个Broker都试图尝试成为新的controller。假设Broker 2速度比较快,成为了最新的controller Broker。此时,每个Broker会收到Broker2成为新的controller的通知,由于Broker3正在进行"stop the world"的GC,可能不会收到Broker2成为最新的controller的通知。
等到Broker3的GC完成之后,仍会认为自己是集群的controller,在Broker3的眼中好像什么都没有发生一样。
现在,集群中出现了两个controller,它们可能一起发出具有冲突的命令,就会出现脑裂的现象。如果对这种情况不加以处理,可能会导致严重的不一致。所以需要一种方法来区分谁是集群当前最新的Controller。
Kafka是通过使用epoch number(纪元编号,也称为隔离令牌)来完成的。epoch number只是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number 。其他Broker 在知道当前epoch number 后,如果收到由controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的controller。
上图,Broker3向Broker1发出命令:让Broker1上的某个分区副本成为leader,该消息的epoch number值为1。于此同时,Broker2也向Broker1发送了相同的命令,不同的是,该消息的epoch number值为2,此时Broker1只听从Broker2的命令(由于其epoch number较大),会忽略Broker3的命令,从而避免脑裂的发生。
1、根据当前topic的消费者数量确认
在kafka中,单个patition是kafka并行操作的最小单元。在producer和broker端,向每一个分区写入数据是可以完全并行化的,此时,可以通过加大硬件资源的利用率来提升系统的吞吐量,例如对数据进行压缩。在consumer段,kafka只允许单个partition的数据被一个consumer线程消费。因此,在consumer端,每一个Consumer Group内部的consumer并行度完全依赖于被消费的分区数量。综上所述,通常情况下,在一个Kafka集群中,partition的数量越多,意味着可以到达的吞吐量越大。
2、根据consumer端的最大吞吐量确定
我们可以粗略地通过吞吐量来计算kafka集群的分区数量。假设对于单个partition,producer端的可达吞吐量为p,Consumer端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的partition数量至少为max(t/p,t/c)。在producer端,单个分区的吞吐量大小会受到批量大小、数据压缩方法、 确认类型(同步/异步)、复制因子等配置参数的影响。经过测试,在producer端,单个partition的吞吐量通常是在10MB/s左右。在consumer端,单个partition的吞吐量依赖于consumer端每个消息的应用逻辑处理速度。因此,我们需要对consumer端的吞吐量进行测量。
kafka支持分区数增加
例如我们可以使用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令将原本分区数为1得topic-name设置为3。当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区中;当分区数增加到3时,那么就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或者分区2中。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。
Kafka 不支持减少分区数。
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
Kafka精确一次性(Exactly-once)保障之一
生产者幂等性主要避免生产者数据重复提交至Kafka broker中并落盘。在正常情况下,Producer向Broker发送消息,Broker将消息追加写到对应的流(即某一Topic的某一Partition)中并落盘,并向Producer返回ACK信号,表示确认收到。但是Producer和Broker之间的通信总有可能出现异常,如果消息已经写入,但ACK在半途丢失了,Producer就会进行retry操作再次发送该消息,造成重复写入。
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
Producer使用幂等性的示例非常简单,与正常情况下Producer使用相比变化不大,只需要 把Producer的配置enable.idempotence设置为true即可,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//当enable.idempotence为true时acks默认为 all
// props.put("acks", "all");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对上层应用做了很好的封装,应用层并不需要去关心具体的实现细节,对用户非常友好
Kafka的幂等性实现了对于单个Producer会话、单个TopicPartition级别的不重不漏,也就是最细粒度的保证。如果Producer重启(PID发生变化),或者写入是跨Topic、跨Partition的,单纯的幂等性就会失效,需要更高级别的事务性来解决了。当然事务性的原理更加复杂
幂等性可以保证单个Producer会话、单个TopicPartition、单个会话session的不重不漏,如果Producer重启,或者是写入跨Topic、跨Partition的消息,幂等性无法保证。此时需要用到Kafka事务。Kafka 的事务处理,主要是允许应用可以把消费和生产的 batch 处理(涉及多个 Partition)在一个原子单元内完成,操作要么全部完成、要么全部失败。为了实现这种机制,我们需要应用能提供一个唯一 id,即使故障恢复后也不会改变,这个 id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义。当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:
事务性示例
Kafka 事务性的使用方法也非常简单,用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:生产者:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {
String msg = "matt test";
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction();
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.close();
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction();
}
producer.close();
消费者:消费者应该设置提交事务的隔离级别
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
Kafka中只有两种事务隔离级别:readcommitted、readuncommitted 设置为readcommitted时候是生产者事务已提交的数据才能读取到。在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“readcommitted”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法之后它才能将这些消息推送给消费端应用。同时KafkaConsumer会根据分区对数据进行整合,推送时按照分区顺序进行推送。而不是按照数据发送顺序。反之,如果生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。设置为read_uncommitted时候可以读取到未提交的数据(报错终止前的数据)
push模式下,消费者速率主要由生产者决定,当消息生产速率远大于消费速率,消费者容易崩溃,如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式可以根据自己的消费能力拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断轮询。但是可以在消费者设置轮询间隔。
broker端的消息不丢失,其实就是用partition副本机制来保证。
enable.auto.commit=true,消费在处理之前提交了offset,则处理异常可能会造成消息的丢失。enable.auto.commit=false,Consumer手动批量提交位点,在批量位点中某个位点数据异常时,没有正确处理异常,而是将批量位点的最后一个位点提交,导致异常数据丢失
并非分区数量越多,效率越高
Kafka在特定条件下可以保障单分区消息的有序性
kafka在发送消息过程中,正常情况下是有序的,如果消息出现重试,则会造成消息乱序。导致乱序的原因是:max.in.flight.requests.per.connection默认值为5。
该参数指定了生产者在收到服务器响应之前,请求队列中可以提交多少个请求,用于提高网络吞吐量。
图中,batch1-5在请求队列中,batch1作为最新数据进行提交,提交失败后如果开启重试机制,则batch1会重新添加到本地缓冲池的头部,然后提交至请求队列中重新发送。此时batch1的顺序会排在batch5之后,发生了乱序。
解决方式是将max.in.flight.requests.per.connection设置为1,消息队列中只允许有一个请求,这样消息失败后,可以第一时间发送,不会产生乱序,但是会降低网络吞吐量。
或者开启生产者幂等性设置,开启后,该Producer发送的消息都对应一个单调增的Sequence Number。同样的Broker端也会为每个生产者的每条消息维护一个序号,并且每commit一条数据时就会将其序号递增。对于接收到的数据,如果其序号比Borker维护的序号大一(即表示是下一条数据),Broker会接收它,否则将其丢弃。如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber Sender发送失败后会重试,这样可以保证每个消息都被发送到broker
Kafka本身无法保障多分区的有序性,可以通过业务设计进行保证,例如需要单表数据通过自定义partition的方式发送至同一个分区
宏观上:可靠性 + at least once + 幂等性
具体实现:Kafka不丢消息-生产者幂等性-消费者幂等性
详见目录:
12、Kafka可靠性保证(不丢消息)
18、如何保证消息不被重复消费(消费者幂等性)
24、谈谈你对Kafka生产者幂等性的了解?
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有