作者:闫燕飞
接上篇:《高性能消息队列 CKafka 核心原理介绍(上)》
Kafka设计目的就是为业界提供一套高性能、高可靠的消息中间件,故下面将会从高性能、高可靠几个方面对Kafka的技术原理进行剖析。
注:Kafka为支持不同的业务形态当前共存在3个版本格式的Message
a. v0版本包括0.9及其之前的版本采用 b. v1 0.10版本新定义的Message格式,较v0增加了一个8byte的time字段 c. v2 0.11版本新定义的Message格式之前的编码格式有较大的改动,而且该版本于2017年6月底才发布故不再本文讨论范围
1、message v0格式详解
采用一字节对齐,且使用网络字节序(大端)
字段 | 大小 | 含义 |
---|---|---|
offset | 8byte | 消息在partition中的offset,可以理解为消息id,从零开始自增 |
size | 4byte | 消息大小,不包含offset所占的字节空间,和本身的所占的字节空间,可以理解为剩下的消息大小 |
CRC32 | 4byte | 消息校验码,采用CRC32方式进行校验,计算时从magic(含)开始计算 |
magic | 1byte | 消息版本号,用于区分不同版本的消息格式,Kafka正是通过该字段实现了消息格式的扩展和向前兼容 |
attributes | 1byte | 消息属性,主要用于标识消息的属性,不同版本情况下意义可能不一样,针对v0版本其含义如下bit 0 ~ 2 : Compression codec.0 : no compression1 : gzip2 : snappy3 : lz4bit 3 ~ 7 : reserved |
key length(K) | 4byte | 用于表明key的大小(不包含本身所占用的空间),需要注意的是Kafka对该字段的表明的意义不一样:-1表示不存在key 0表示存在key但key的大小为0>0 表示key的长度 |
key | K byte | 存储key的内容,其大小由key length字段表示。用户可以自己指定,Kafka Broker进行日志compact也会有用到该字段 |
value length(V) | 4byte | 用于表明value的大小(不包含本身所占用的空间) |
value | V byte | 存储value的内容,其大小由value length字段表示 |
为了方便查看用C的伪代码可以如下表示v0消息的格式
2、essage v1 格式详解
字段 | 大小 | 含义 |
---|---|---|
offset | 8byte | 消息在partition中的offset,可以理解为消息id,从零开始自增 |
size | 4byte | 消息大小,不包含offset所占的字节空间,和本身的所占的字节空间,可以理解为剩下的消息大小 |
CRC32 | 4byte | 消息校验码,采用CRC32方式进行校验,计算时从magic(含)开始计算 |
magic | 1byte | 消息版本号,用于区分不同版本的消息格式,Kafka正是通过该字段实现了消息格式的扩展和向前兼容 |
attributes | 1byte | 消息属性,主要用于标识消息的属性,不同版本情况下意义可能不一样,针对v0版本其含义如下bit 0 ~ 2 : Compression codec.0 : no compression1 : gzip2 : snappy3 : lz4bit 3:时间戳类型0: 消息产生的时间,由用户端指定1: 消息添加到Broker log的时间,由Broker指定bit 4 ~ 7 : reserved |
timestamp | 8byte | 消息生产的时间或者消息被broker添加到log的时间,具体看attributes的值。单位ms |
key length(K) | 4byte | 用于表明key的大小(不包含本身所占用的空间),需要注意的是Kafka对该字段的表明的意义不一样:-1表示不存在key 0表示存在key但key的大小为0>0 表示key的长度 |
key | K byte | 存储key的内容,其大小由key length字段表示。用户可以自己指定,Kafka Broker进行日志compact也会有用到该字段 |
value length(V) | 4byte | 用于表明value的大小(不包含本身所占用的空间) |
value | V byte | 存储value的内容,其大小由value length字段表示 |
可以看出比v0版本的消息仅多了一个timestamp字段用于表明消息的生产(或添加到broker日志的时间),方便用户通过指定时间去获取消息。为了方便查看用C的伪代码可以如下表示v1消息的格式
3、批量消息及压缩详解
通过上图,可以看出Kafka通过消息嵌套(Kafka要求消息仅能嵌套一层)的方式,实现了消息的批量压缩,既保证了格式的一致,也实现了批量消息压缩用以提高整体压缩率。而不是采用单个消息压缩。较友好的实现了批量压缩。
4、Message格式小结
a. Message整个头部非常紧凑,v0格式的仅仅26字节 v1格式的也只有34字节。既节省了带宽也节省了存储。 b. 格式定义良好,扩展方便,编码解码简单。对于客户端和后端的存储采用同一套编码方式,这样就导致了不用来回编解码和格式转换,节省了Broker及客户端的资源,让整个消息流转及存储更加的高效。 c. 包含了校验码,用于进行数据完整性校验,提高了数据的可靠性 d. 可以非常简单的不改变原有格式的情况下,通过嵌套方式实现消息批量压缩,更近一步提高资源利用率。而且Broker仅仅只用解压对消息进行一些必要的校验,而不用再次压缩,效率高。最重要的是由于整个消息格式在客户端和broker都是一致的,消费时broker不用做任何解压缩和压缩操作,直接将消息传递个消费者,效率奇高。
1、Kafka文件存储机制Kafka文件存储中,同一topic有不同的partition,每个partition为一个目录,partition的命名规则为topic名称 + 有序序号(从零开始)。具体可以参考下图。
2、partition在Kafka中以目录形式存在,每个partition物理上由多个segment(文件分片)和与其一一对应的index文件组成。每个segment大小类似(默认配置为1G)。segment的命名方式是使用其第一条消息的offset格式化生成,这样通过名称可以很容易查看到segment消息的开始,同时通过相邻的segment也很容易计算出segment中包含的消息个数。将partition分成segment,则非常方便快速删除过期的分片。
3、index文件 采用稀疏存储用于节省空间,主要用于读取消息时快速定位所需要的消息位置。
4、生产消息 消息以append方式添加到最新的segment尾部,算法复杂度为O(1)不会受到现有数据总量的影响,当写入的数据大小或消息条数达到配置的阈值则主动强制刷盘,用于保证系统崩溃时,消息的丢失量在可以接受的范围。同时当写入一定的数据后生成一条索引信息添加到index文件末尾。当segment大小超过配置则滚动生成一个新的segment。可以看到消息都是顺序追加写,且大块刷,可以很好的利用系统的调度和磁盘顺序写的能力达到较高的吞吐量。
5、消费消息 当消费者拉取消息时会指定offset,这时候先从segment list定位到所需的segment的index文件。由于index文件采用稀疏索引方式,文件较小(Kafka默认配置为10M),故采用mmap形式直接加载到内存,然后采用二分法查找到与所需的消息offset相同或相邻的位置,最后再去segment文件中读取到真正的数据。可以看到消费消息时,整个过程最耗时的地方就在从index文件中查找(时间复杂度O(logn)),然index文件比较小,个数也很少,基本可以全部缓存在page cache加之消费者基本都是顺序消费会进一步提高整个cache命中率,故也能提供非常高的消费性能。
6、删除消息 Kafka的Topic根据应用场景不同提供两种删除方式compact和delete。
_
consumer_offsets这个特殊的内置topic进行存储consumer group中每个partition的消费状态,而这个topic就采用的是compact方式进行日志删除。该方式要求msg的key不能为空,且比较消耗broker系统资源,一般用户的topic,非常不建议使用该方式进行日志删除。算法步骤:
算法代码(从Kafka工具截取采用Scala编写不太好看可以对照上面的步骤进行阅读):
注:partition分配算法虽然很重要,但并不是在broker端实现,而是在Kafka Admin工具包中实现。由相关工具分配好,然后写入zookeeper的一个管理节点,最后触发controller watcher事件,controller根据相应的事件,直接从zookeeper拉取到已经分配好的结果,仅仅只用选出leader然后通知相关broker而已。
Kafka的可靠性及可用性,都源自其0.8版本加入的replication概念以及ISR选举算法和消息commit机制(Kafka commit机制通过HW和ISR来保证)共同保证。只有commit后的消息才能被consumer消费,不让消费者消费到未确认的消息从而提高数据的一致性。最后再配合消息生产确认方式及刷盘策略,进而达到用户需要的可靠性及可用性。
当Producer向leader发送数据时,可以通过设置acks参数通过确认方式来配合后端提高可靠性级别。其主要配置项如下:
含义 | |
---|---|
1 | 默认配置,仅当leader成功收到消息并写入日志后,就返回成功给producer。如果leader宕机,则有可能丢失数据。 |
0 | Producer无需等待来自broker的确认而继续发送下一批消息。该配置下,数据传输效率最高,但可靠性最低 |
-1 | Producer需要等待ISR中达到配置的min.insync.replicas(该参数可以在broker级别或topic级别配置,默认值为1需要注意只有acks=-1该配置才生效)个follower都存储了该消息后才回复,可靠性最高。 |
all | 意义同-1 |
1、消息在segment文件中的状态
HW:High Water缩写,指的是consumer能够消费到此partition的最大位置,取partition对应的ISR的最小值为HW,另外每个replica都有HW, leader和follower各自负责维护自己的HW状态。 LEO:Log End Offset的缩写,表示每个partition最后一条message的位置。
2、消息复制及HW和LEO流转过程
通过该流转过程可见, Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka使用ISR的方式,很好的均衡了数据可靠性及吞吐率。
leader选举的常用算法非常多,比如Zookeeper的Zab、Raft以及Viewstamped Replication等,这些算法其实都是采用少数服从多数的方式。在这种模式下,如果有2f+1个副本,那么在commit之前必须保证有f + 1个replica复制完消息,同时为了保证能正确的选举出新的leader,失败的副本个数不能超过f。这种方式有个很大的优势就是系统的延迟取决于最快的大多数机器,而且数据一致性及可靠性高。当然这种方式也有些劣势就是所能容忍的失败的follower个数较少,故在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能急剧下降。而Kafka所使用的leader选举算法更像是微软的PacificA算法。
Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,ISR里的成员会优先选为leader(unclean.leader.election.enable=false)。在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,为了容忍f个副本的失败,“少数服从多数”的方式和ISR在commit前需要等待的副本数量是一样的,但是ISR需要的总的副本的个数几乎是“少数服从多数”的方式的一半。在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案(Kafka默认选择第二种):
a.等待ISR中任意一个replica“活”过来,并且选它作为leader b.选择第一个“活”过来的replica(并不一定是在ISR中)作为leader 第一种方案会提高可用性,第二种方案则是提高数据可靠性。两种方案,用户可以根据业务形态进行相关的定制化。
常用的消息传输语义:
可以看出,从上到下实现的难度一次递增,当前Kafka其实可以支持到At least once,但这个也基本上要配置成全同步才能做到(比如必须配置所有ISR都同步,并且每条消息都同步刷盘,且生产者配置acks=-1),通过这种方式可以达到同步多写,进而做到数据不丢失,但配置成这种方式又会导致Kafka性能急剧下降,完全违背了Kafka标榜的高吞吐。 注:Kafka 0.11版本添加了Exactly once消息传输支持,但该版本2017年6月底才发布,这里暂不做分析。
鉴于Kafka存在以上(2.3.2)几个无法调和的劣势,我们选择开发Ckafka。
Ckafka是以实例(实例包含吞吐量、容量两个维度)的形式对外提供服务,不同用户、实例间通过网络策略进行隔离,实例内部支持按照IP白名单鉴权,满足个性化需求。
由于同一个实例会分配多台broker为其服务,以便达到更高的可用性。这种情况下,就需要有一个分布式流量控制程序,对实例的流量进行相关的控制,防止实例使用超过允诺的流量。当前的分布式流量控制由master完成,broker会定期上报当前流量数据到master,master进行汇总后,采用下面的算法进行流量分配。算法具体步骤如下:
1.如果实例的消息存储容量已经达到最大,则堵住生产放行消费 2.根据实例所分配的broker上包含的partition数,计算出每个broker应该分配的流量,这里记为F 3.将实例当前在每个broker产生的流量与F进行比较,如果所有broker当前的量都小于80%,则按照初始量F进行分配。 4.计算出每个broker的流量,计算出每个broker所占的流量比率记为P 5.找出最大最小流量,并计算最大最小流量,分别记为Pmax, Pmin a.如果Pmin / Pmax >= 80%则根据步骤4中计算出的比率P进行分配 b.如果Pmin / Pmax < 80% 则流量最大的按照P进行分配, 其他的按照min(P + Pmax * 10%, Pmax)进行分配,(加上10%的量是为了给流量小的节点一些余量好进行流量增长,用以达到再平衡)
提到Kafka,大家第一反应应该都是高性能。Kafka的确做到了非常高的性能,被业内很多大数据选作中间存储或管道,便是对其高性能的认可。但追求性能的Ckafka同样也对其做了一些优化,而且也取得了一定的效果。
测试场景:1个topic 3个partition 单机测试生产性能 机型B6: Intel(R) Xeon(R) CPU 2.13GHz 16核 、64G内存、1G网卡
消息中间通常分为高可靠和高性能两种版本。Ckafka是一款高性能消息中间,主要用于满足对性能要求极高的应用场景(如网站活动追踪、运营监控、日志聚合、流式处理、事件追踪等等),并且兼容现有的Kafka协议使用户零成本迁入。当然用户如果有高可靠的要求,我们自研的CMQ则是一款金融级高可靠分布式消息中间件,其通过Raft保证了消息的可靠不丢失,同时性能和可用性相比竞品也有显著的提高。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。