kafka启动流程
创建主题的流程

生产者
oldproducer是生产者scala版本的生产者,支持同步模式,和异步模式,通过实行producer.type进行配置。
oldProducer


prodcuerPool保存的是生产者和代理的连接,每一个SyncProducer对象,是一个hashMap接口,key是brokerId,value是syncproducer
EventHander,对消息进行发送前准备如下
异步发送和同步发送的最大的区别就是异步模式会首先将消息存入消息队列,然后又一个独立的线程判断是否需要将数据向代理发送。
kafkaProducer实现原理
实例化过程
kafkaProducer在实例化首先会加载和解析生产者相关配置信息并封装成producerConfig对象,然后根据配置项主要完成以下对象或数据结构的实例化
sender过程分析
kafka实例化之后,调用kafakProducer.send()方法进行消息发送
其中send方法并没有直接发送消息给kafka server,而是存入了RecordAccumulator中,当达到一定条件,会唤醒sender线程获取消息并发送,send方法我们详细看看RecordAccumulator.append方法实现逻辑

至此kafkaproducer发送Record的第一步操作将Record写入消息写入缓冲区过成分析完毕,第二步有sender线程从消息累加器中取出Record将请求发送到响应的kafak节点。
Send发送消息

首先要获取MetaData中获取集群信息,然后从RecondAccumulator中读取符合的消息,然后构造网络层请求交由NetworkClient去执行,这个过程取出每个TopicPartition对应的分区Leader,但可能存在TopicPartition的leader不存在,就会触发元数据更新操作,在发送的NetworkClient内部维护一个InFlightRequests类型的inflightRequest对象用于保存已发送但未收到响应的请求,这个流程sender更像一个调度器,而NetWorkClient是网路请求的真正执行这,sender不断的从RecordAccumulator获取符合条件的消息,构造请求交由NetworkClient执行。
详细分析sender将消息最终发送到kafka节点
sender是后台一个一直执行的线程,他是通过run方法一直会执行,但真正执行的是run(long now)方法,该方法入参是当前系统时间,具体逻辑如下
生产者的整体流程如图

kafkaConsumer各组件说明
ConsumerConfig,消费者级别的配置,将相应配置传递给其他组件
SubscriotionState,维护了消费者订阅和消费消息的情况
ConsumerCoodinator,负责消费者与服务端GroupCoodinator通信
ConsumerNetWorkClient,对网络层通信NetworkClient的封装,用于消费者与服务端通信
Fetcher,对CondumerNetworkClient进行包装,负责从服务端获取消息
消费订阅
kafkaConusmer提供了两种订阅消息的方法,一种通过KafkaConusmer,subscrible方法指定消息对应的主题,另一种是通过kafakConsumer.assign方法指定需要消费的分区,
subscribe有几个重载方法如下
public void subscribe(Collection topics)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Collection topics, ConsumerRebalanceListener listener)subcribe也可分为两种,一种是非模式匹配订阅主题的基本流程,一种是模式匹配订阅主题(正则的方式)

上图就是非模式订阅的基流程,而模式匹配订阅主题的方式与直接致电主题列表方式时间逻辑类似,也是调用Subscriptionstate.subscribe方式经订阅关系保存到SubscriptionState维护的用来保存订阅关系的数据结构中,即将订阅主题的模式赋值给subsctibedPatten,由于通过模式匹配来查找订阅的主题,所以接下来需要先设置Metadata.needMetadataForAllTopic表示设置为true,然后更细Metadata,最后交由消费者协调器ConsumerCoordinator从集群Cluter的当前所有主题中查找符合的主题,将主题添加到subsriptionState的subscription和groupSubscription集合中,并更新这些主题在Metadata中记录的过去时间
assign方法实现逻辑和subcribe类似,也是首先检测是否有并发操作,然后判断请求参数是否合法,即分区是否nul,以及是否为空集合,分别进行与subcribe方法相同的吹,然后遍历订阅的分区,构造一个与分区相对应的主题集合,
在将用户的消费者分区分配关系保存到subscriptionSatate.assignment之前,先调用Consumer.maybeAutoCommitOffsetsNow进行消费偏移量提交,保证同一个消费组的消费者对分区的消费偏移量已提交,防止重复消费,最后更新订阅的分区对应的主题过期时间。
两种订阅方式是互斥,客户端只能选择其中一种订阅方式。
消费消息
kafkaConsumer提供poll方法从服务端拉取消息,该方法是通过Fetcher类来完成消息的拉取及更新消费偏移量,因此我们首先讲解Fetcher.
Fetcher主要功能是负责构造拉取消息的FetchRequest请求,然后通过ConsumerNetWorkclient发送FetchRequest请求,最后对返回的结果进行处理更新缓存中记录的消费记录,
首先我们分析用于构造FetchRequest的Fetcher.createFetchRequest实现逻辑
kafkaConsumer拉取消息
kafkaConusmer.poll核心逻辑是当没有拉取到消息是在timeout时间内循环调用pollOnce方法向服务端发送FetchRequest请求并进行相应处理,若pollOnce方法拉取消息,则poll方法会在消息返回给客户端之前调用Fetcher.sendFetces方法发送下一次拉取消息的请求,若干没有拉取消息同时等待时间没有超过timeout设置,则循环调用pollonce方法处理,若超时则构造一个空消息集合返回客户端。
pollonce方法的主要逻辑是,确保消费组在服务端对应的组协调器已完成分配并正常连接,消费者已加入到该组协调器的管理之中,同时以同步方式调用doAutoCommitOffsetsAsync方法获取消费初始位置,然后调用Fecher.fetchRecords方法,检测是否已获取消息,之所以首先调用Fetch.fetchedRecords进行处理,是因为kafkaConsumer.poll方法每次调用pollOnce方法获取消息之后,就会发送下一次FetcherRequest请求避免阻塞等待,若获取到消息立刻返回到poll方法执行题,然后发送下一次FetcherRequst,若没有获取到消息调用Fetcher.sendFetches方法发送FetchRequest请求,并调用ConusmerNetWorkClient.poll,执行网络层请求处理,阻塞等服务端响应之后构造返回结果,在构造返回结果之前,需要检测在长时间处理poll过程中,消费者是否需要重新加入消费组进行平衡操作,若需要重新加入消费组则返回一个空消息结合,否则代用Fetcher.fetchedRecord获取消息最后返回poll方法执行体。
kafka提供了两种方式获取消费起始位置和客户端调用相应的API确定消费其实位置
seek()用于指定消费起始位置到一个特定位置
seekToBeginning()指定OffsetResetStrategy为EARLIEST,相当与auto.offset.reset=earliest
seekToEnd()指定offsetResetStrategy为LATEST,相当与通过配置向auto.offset.reset=latest另一种方式就是设置auto.offset.reset设置消费起始位置,默认是LATEST策略自动重置消费起始位置.
消费偏移量提交
kafka提供了两种消费偏移量的方式,一种是自动提交,一种是手动提交使用API。
kafak提供同步提交commitSync和异步提交commitAsync提供客户端提交消费偏移量,这两种方式分别调用ConsumerCoordinator的CommitOffsetSync和commitOffsetsAsync,底层实现是通过客户端消费者协调器ConsumerCoordinator发送offsetCommitRequeq请求,服务端协调器GroupCoodinator进行处理,最后将消费偏移量提交到kafak内部主题中
分区数与消费者线程的关系
kafka分配线程与分区的分配策略
首先订阅的主题分区以及消费者线程进行排序,然后通过轮询方式分别将分区依次分给消费者线程

2.range分配策略
首先对同一个主题里面分区进行序号排序,并对消费者按字母排序进行排序,假设我们分区进行排完序为0,1,2,3,4,5,6,7,8,9,消费者排序完将会是C1,C2然后将分区的个数除于消费者的总数决定每个消费者消费几个分区,如果除不尽,那么前面几个消费者线程将会多消费一个分区,
我们10分分区,2个消费者,10/2=5,那个消费者C1消费者C2将会消费同样多的分区,所以最后分区分配的结果如下
C1 ->0,1,2,3,4分区
C2 ->5,6,7,8,9分区假设如果有11分区,那么最后分区分配结果如下
C1->0,1,2,3,4,5分区
C2 ->6,7,8,9分区假设我们有2个主题(T1,T2)分别有11个分区,最后的结果如下
C1 ->T1 0,1,2,3,4,5 T2 0,1,2,3,4,5分区
C2 ->T1 6,7,8,9,10 T2 6,7,8,9,10 分区很明显C1消费者比C2消费者多消费了2个分区,这就是一个弊端
如果我们把round-robin分配实例按照range分配策略进行分配,分配如下

持续关注,下一篇kafka全面总结,如果对您有一丝丝帮助,麻烦点个关注,也欢迎转发,谢谢