目录
Apache Kafka是由LinkedIn采用Scala和Java开发的开源流处理(open source、 stream-processing)平台,该项目旨在提供统一的、高吞吐量、低延迟的平台来处理实时数据流。
Kafka是一种分布式的,基于发布/订阅的消息系统,主要特性如下:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。(近乎实时性的消息处理能力)。
- 可扩展性:kafka集群支持热扩展,支持消息分区,支持在线增加分区(水平扩展)。
- 高并发:支持数千个客户端同时读写(支持批量读写消息)。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败(若副本数量为n,最多允许n-1个节点失败)(容灾)。
一个kafka架构包括若干个Producer(服务器日志、业务数据、web前端等),若干个Broker(一般broker数量越多集群的吞吐量越大),若干个Consumer,一个Zookeeper集群(kafka通过Zookeeper管理集群配置、选举leader、consumer group rebalance),如下图。
基本组件:
每个消息文件都是一个logentry序列,其格式如下图所示:
一条完整的消息包含RECORD、offset以及message size,“RECORD”部分就是Kafka的消息格式,其中offset用来标识它在Partition中的偏移量(offset是逻辑值,而非实际物理偏移值),message size表示消息的大小。
与消息对应的还有消息集的概念,消息集中包含一条或者多条消息,消息集不仅是存储于磁盘以及在网络上传输的基本形式,也是kafka中压缩的基本单元,详细结构参考上图右侧。
下面来具体描述一下消息(RECORD)格式中的各个字段,从crc32开始算起,各个字段的解释如下:
crc32(4B):crc32校验值,校验范围为magic至value之间。
magic(1B):消息格式版本号,0.9.X版本的magic值为0。
attributes(1B):消息的属性,总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4,其余位保留。
key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
key:可选,如果没有key则无此字段。
value length(4B):实际消息体的长度。如果为-1,则表示消息为空。
value:消息体,可以为空。
一个消息的最小长度(RECORD_OVERHEAD_V0)为crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是说v0版本中一条消息的最小长度为14B,如果小于这个值,那么这就是一条破损的消息而不被接受。
不同于一些 logging-centric system(如Facebook 的 Scribe 和 Cloudera 的 Flume )采用Push模式,Kafka选择由 Producer 向 Broker Push 消息并由 Consumer 从 Broker Pull 消息的形式,事实上Push模式和Pull模式各有优劣。Push 模式的目标是尽可能以最快速度传递消息,因为消息发送速率是由Broker决定的,Push 模式很难适应消费速率不同的消费者,容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而Pull 模式则可以根据Consumer 的消费能力以适当的速率消费消息,同时Consumer可以自己控制消费方式(即可批量消费也可逐条消费)。
Producer生产、序列化并压缩消息后,追加到本地的记录收集器(RecordAccumulator),Sender不断轮询记录收集器,当满足一定条件时,将队列中的数据发送到Partition Leader节点。Sender发送数据到Broker的条件有两个:
Producer会为每个Partition都创建一个双端队列来缓存客户端消息,队列的每个元素是一个批记录(ProducerBatch),批记录使用createdMs表示批记录的创建时间(批记录中第一条消息加入的时间), topicPartion表示对应的Partition元数据,序列化后的消息写入到recordsBuilder对象中。一旦队列中有批记录的大小达到阈值或者批记录等待发送的时间达到阈值,就会被Sender发送到Partition对应的Leader节点。
Producer发送消息到Broker时,会根据Paritition机制选择将消息存储到哪一个Partition,如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。具体有以下几种策略:
轮询策略
随机策略
1. 随机策略默认从Partition列表中随机选择一个,随机策略的消息分布大致如下图所示:
按消息键保序策略
自定义策略
Kafka的Consumer Group是采用Pull的方式来消费消息,那么每个Consumer该消费哪个Partition的消息则需要一套严格的机制来保证,而且Partition是可以水平无限扩展的,随着Partition的扩展Consumer消费的Partition也会重新分配,这就涉及到kafka消息的消费分配策略,在Kafka内部存在两种默认的分区分配策略:Range和RoundRobin( partition.assignment.strategy参数默认的值是range),当发生以下事件时,Kafka将会进行一次分区分配:
Range策略
Range是对每个Topic而言的,Range策略的工作原理是:首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序,然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
举例:假设 T1 的Topic包含了5个分区,且有两个消费者(C1,C2)来消费这5个分区里面的数据,C1的 num.streams = 2,C2 的 num.streams = 1(这里num.streams指的是消费者的消费线程个数)。在这个例子里,Partitions分区排完序会是0, 1, 2, 3, 4,消费者线程排完序将会是C1-0, C1-1, C2-0,有5个分区,3个消费者线程, 5 / 3 = 1,而且除不尽,那么消费者线程 C1-0 、C1-1将会多消费一个分区,所以最后分区分配的是:C1-0 将消费 0, 1分区,C1-1 将消费 2, 3 分区,C2-0 将消费4分区。具体消费示意图如下:
如果增加了Partition,从之前的5个分区现在变成了6个分区,那么最后分区分配的结果是这样的:C1-0 将消费0,1分区,C1-1 将消费2,3分区,C2-0 将消费 4,5分区:
RoundRobin策略
使用RoundRobin策略有两个前提条件必须满足:
• 同一个Consumer Group里面的所有消费者的num.streams必须相等;
• 每个消费者订阅的Topic必须相同。
假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,依次分给消费线程。在例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-2, T1-1, T1-4, ,消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:C1-0 将消费T1-5,T1-1; 分区C1-1 将消费 T1-3, T1-4,分区;C2-0 将消费 T1-0, 分区;C2-1 将消费 T1-2, 分区;消费示意如下图:
StickyAssignor策略
这种分配策略是在kafka的0.11.X版本开始引入的,是目前最复杂也是最优秀的分配策略,Sticky分配策略的原理比较复杂,它的设计主要实现了两个目的:
当这两个目标发生冲突时,优先保证第一个目标。
Kafka大量依赖文件系统去存储和缓存消息,尽可能把内容直接写入到磁盘,所有数据及时的以持久化日志的方式写入到文件系统。
Producer传递到Broker的每条消息都会分配一个顺序值,顺序值从0开始,用来标记消息生产的顺序。每条消息的顺序值只相对于本批次的序号,所以这个值不能直接存储在日志文件中,服务端会将每条消息的顺序值转换成绝对偏移量。Kafka通过nextOffset(下一个偏移量)来记录存储在日志中最近一条消息的偏移量。如下表所示,消息写入前nextOffset是899,Message0、Message1、Message2是连续写入的三条消息,消息被写入后其绝对偏移量分别是900、901、902,对应的顺序值分别是0、1、2,nextOffset变成902。
Broker将每个Partition的消息追加到文件中,是以日志分段(Segment)为单位的。当Segment的大小达到阈值(默认是1G)时,会新创建一个Segment保存新的消息,每个Segment都有一个基准偏移量(baseOffset,每个Segment保存的第一个消息的绝对偏移量),通过这个基准偏移量,就可以计算出每条消息在Partition中的绝对偏移量。 每个日志分段由数据文件和索引文件组成,数据文件(文件名以log结尾)保存了消息集的具体内容,索引文件(文件名以index结尾)保存了消息偏移量到物理位置的索引。如下图所示:
基于索引文件的查询
Kafka通过索引文件提高对磁盘上消息的查询效率,Kafka的索引文件的特性:
举例:假设有1000条消息,每100条消息写满一个日志分段,一共会有10个日志分段。客户端要查询偏移量为999的消息内容,如果没有索引文件,我们必须从第一个日志分段的数据文件中,从第一条消息一直往前读,直到找到偏移量为999的消息。有了索引文件后,我先找到先找到 offset=999 的 message 所在的 segment文件(利用二分法查找)找到最后一个分段,再使用绝对偏移量999减去基准偏移量900得到相对偏移量99,然后找到最接近相对偏移量99的索引数据90,相对偏移量90对应的物理地址是1365,然后再到数据文件中,从文件物理位置1365开始往后读消息,直到找到偏移量为999的消息。
Kafka中存在大量的网络数据持久化到磁盘(Producer到Broker)和磁盘文件通过网络发送(Broker到Consumer)的过程。这一过程的性能直接影响到Kafka的整体性能。Kafka采用零拷贝通用技术解决该问题。
零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,减少用户应用程序地址空间和操作系统内核地址空间之间因为上下文切换而带来的开销,从而有效地提高数据传输效率。
以将磁盘文件通过网络发送为例。下面展示了传统方式下读取数据后并通过网络发送所发生的数据拷贝:
Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝,这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,没有cpu数据拷贝,因此大大提高了性能。零拷贝过程如下图所示。
从具体实现来看,Kafka的数据传输通过TransportLayer来完成,其子类PlaintextTransportLayer通过Java NIO的FileChannel的transferTo()和transferFrom()方法实现零拷贝,transferTo()和transferFrom()并不保证一定能使用零拷贝。
Kafka采用多机备份和消息应答方式来解决数据丢失,允许同一个Partition存在多个消息副本(Replica),每个Partition的副本通常由1个Leader及0个以上的Follower组成,生产者将消息直接发往对应Partition的Leader,Follower会周期地向Leader发送同步请求,Kafka的Leader机制在保障数据一致性地同时降低了消息备份的复杂度。
同一Partition的Replica不应存储在同一个Broker上,因为一旦该Broker宕机,对应Partition的所有Replica都无法工作,为了做好负载均衡并提高容错能力,Kafka会尽量将所有的Partition以及各Partition的副本均匀地分配到整个集群上,如下图:
ISR(In-Sync Replicas)指的是一个Partition中与Leader“保持同步”的Replica列表(副本所在Broker),这里的保持同步不是指与Leader数据保持完全一致,而是在replica.lag.time.max.ms时间内与Leader保持有效连接。
Follower周期性地向Leader发送FetchRequest请求(发送时间间隔配置在replica.fetch.wait.max.ms中,默认值为500ms),如果 Follower 并不能保持同步状态,那么该 Follower 会被移出 ISR,不再阻塞写入,被移出ISR的Follower会继续向Leader发FetchRequest请求,试图再次跟上Leader重新进入ISR,各Partition的Leader负责维护ISR列表并将ISR的变更同步至ZooKeeper ,通常只有ISR里的成员才可能被选为Leader。当Kafka中unclean.leader.election.enable配置为true(默认值为false)且ISR中所有副本均宕机的情况下,才允许ISR外的副本被选Leader,此时会丢失部分已应答的数据。
为了讲清楚ISR的作用,下面介绍一下生产者可以选择的消息应答方式。
生产者无需等待服务端的任何确认,消息被添加到生产者套接字缓冲区后就视为已发送,因此acks=0不能保证服务端已收到消息,使用场景较少。
Leader将消息写入本地日志后无需等待Follower的消息确认就做出应答。如果Leader在应答消息后立即宕机且其他Follower均未完成消息的复制,则该条消息将丢失。
Leader将等待ISR中的所有副本确认后再做出应答,因此只要ISR中任何一个副本还存活着,这条应答过的消息就不会丢失。acks=all是可用性最高的选择,但等待Follower应答引入了额外的响应时间。Leader需要等待ISR中所有副本做出应答,此时响应时间取决于ISR中最慢的那台机器。
Broker的配置项min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数,当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的Follower赶上并重新进入ISR。
每个Kafka副本对象都有下面两个重要属性:
Kafka有一套Leader选举及失败恢复机制:首先在集群所有Broker中选出一个Controller,负责各Partition的Leader选举以及Replica的重新分配,当出现Leader故障后,Controller会将Leader/Follower的变动通知到需为此作出响应的Broker,ZooKeeper存储Broker、Topic等状态数据,Controller和Broker会在ZooKeeper指定节点上注册Watcher(事件监听器),以便在特定事件触发时,由ZooKeeper将事件通知到对应Broker。
场景1 Broker与其他Broker断开连接
上图中Broker0和其余Broker都断开了连接,由于ZooKeeper还能接收到Broker0的心跳,因此ZooKeeper认为Broker0依然存活,则对于
Broker0中的副本为Partition0的Leader,当Broker0超过replica.lag.time.max.ms没接收到Broker1、Broker2的FetchRequest请求后,Broker0选择将Partition0的ISR收缩到仅剩Broker0本身,并将ISR的变更同步到ZooKeeper,Broker0需要根据min.insync.replicas的配置值决定是否继续接收生产者数据。
超过replica.lag.time.max.ms后,Broker1会将Broker0中的副本从Partition1的ISR中移除。若后续Broker0恢复连接并赶上了Broker1,则Broker1还会再将Broker0重新加入Partition1的ISR
场景2 Broker与ZooKeeper断开连接
Broker0与ZooKeeper断开连接后,ZooKeeper会自动删除该Broker对应节点,并认为Broker0已经宕机,则对于
ZooKeeper删除节点后,该节点上注册的Watcher会通知Controller,Controller发现Broker0为Partition0的Leader,于是从当前存活的ISR中选择了Broker2作为Partition0的新Leader,新Leader会将HW更新为它的LEO值,而Follower则通过一系列策略截断log以保证数据一致性,Controller通过LeaderAndIsrRequest将Leader变更通知到Broker1、Broker2,于是Broker1改向Broker2发送Partition0数据的FetchRequest请求。
当Producer发现Partition0的Leader发生变更后,会改向新Leader-Broker2发送Partition0数据。另一边,Broker0收不到ZooKeeper通知,依然认为自己是Partition0的Leader,由于Broker1、Broker2不再向Broker0发送FetchRequest请求,缺失了ISR应答的Broker0停止写入acks=all的消息,但可以继续写入acks=1的消息。在replica.lag.time.max.ms时间后,Broker0尝试向ZooKeeper发送ISR变更请求但失败了,于是不再接收Producer的消息。
当Broker0与ZooKeeper恢复连接后,发现自己不再是Partition0的Leader,于是将本地日志截断(为了保证和Leader数据一致性),并开始向Broker2发送FetchRequest请求。在Broker0与ZooKeeper失联期间写入Broker0的所有消息由于未在新Leader中备份,这些消息都丢失了。
Broker0中的副本只是作为Partition1的Follower节点,而Broker0与Broker1依然保持连接,因此Broker0依然会向Broker1发送FetchRequest。只要Broker0能继续保持同步,Broker1也不会向ZooKeeper变更ISR。
Controller发生故障时对应的Controller临时节点会自动删除,此时注册在其上的Watcher会被触发,所有活着的Broker都会去竞选成为新的Controller(即创建新的Controller节点,由ZooKeeper保证只会有一个创建成功)。竞选成功者即为新的Controller,会在ZooKeeper的下述节点上注册Watcher,以监控各Broker运行状态、负责Leader宕机的失败恢复,并对管理脚本做出响应。
场景1 Controller与ZooKeeper断开连接
此时ZooKeeper会将Controller临时节点删除,并按照下节的故障恢复过程重新竞选出新Controller。而原本的Controller由于无法连上ZooKeeper,它什么也执行不了;当它与ZooKeeper恢复连接后发现自己不再是Controller,会在Kafka集群中充当一个普通的Broker。
场景2 Controller与某个Broker断开连接
因为Controller无法通知到Broker0,所以Broker0不晓得Partition0的Leader已经更换了,所以也会出现4.1.1节场景2描述的出现短暂服务不可用并可能发生数据丢失。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。