目前来说市面上可以选择的消息队列非常多,像activemq,rabbitmq,zeromq已经被大多数人耳熟能详,特别像activemq早期应用在企业中的总线通信,基本作为企业级IT设施解决方案中不可或缺的一部分。目前来说Kafka已经非常稳定,并且逐步应用更加广泛,已经算不得新生事物,但是不可否认Kafka一枝独秀如同雨后春笋,非常耀眼,今天我们仔细分解一下Kafka,了解一下它的内幕。以下的内容版本基于当前最新的Kafka稳定版本2.4.0。文章主要包含以下内容:
文章为开篇引导之做,后续会有对应的HBase,Spark,Kylin,Pulsar等相关组件的剖析。
快是一个相对概念,没有对比就没有伤害,因此通常我们说Kafka是相对于我们常见的activemq,rabbitmq这类会发生IO,并且主要依托于IO来做信息传递的消息队列,像zeromq这种基本纯粹依靠内存做信息流传递的消息队列,当然会更快,但是此类消息队列只有特殊场景下会使用,不在对比之列。
因此当我们说Kakfa快的时候,通常是基于以下场景:
那么基于以上几点,我们来仔细探讨一下,为什么Kafka就快了。
首先,如果我们单纯站在Consumer的角度来看“Kafka快”,是一个伪命题,因为相比其他MQ,Kafka从Producer产生一条Message到Consumer消费这条Message来看它的时间一定是大于等于其他MQ的,背后的原因涉及到消息队列设计的两种模型:推模型和拉模型,如下图所示:
对于拉模型来说,Producer产生Message后,会主动发送给MQ Server,为了提升性能和减少开支,部分Client还会设计成批量发送,但是无论是单条还是批量,Producer都会主动推送消息到MQ Server,当MQ Server接收到消息后,对于拉模型,MQ Server不会主动发送消息到Consumer,同时也不会维持和记录消息的offset,Consumer会自动设置定时器到服务端去询问是否有新的消息产生,通常时间是不超过100ms询问一次,一旦产生新的消息则会同步到本地,并且修改和记录offset,服务端可以辅助存储offset,但是不会主动记录和校验offset的合理性,同时Consumer可以完全自主的维护offset以便实现自定义的信息读取。
对于推模型来说,服务端收到Message后,首先会记录消息的信息,并且从自己的元信息数据库中查询对应的消息的Consumer有谁,由于服务器和Consumer在链接的时候建立了长链接,因此可以直接发送消息到Consumer。
Kafka是基于拉模型的消息队列,因此从Consumer获取消息的角度来说,延迟会小于等于轮询的周期,所以会比推模型的消息队列具有更高的消息获取延迟,但是推模型同样又其问题。首先,由于服务器需要记录对应的Consumer的元信息,包括消息该发给谁,offset是多少,同时需要向Consumer推送消息,必然会带来系列的问题:假如这一刻网络不好,Consumer没有收到,消息没有发成功怎么办?假设消息发出去了,我怎么知道它有没有收到?因此服务器和Consumer之间需要首先多层确认口令,以达到至少消费一次,仅且消费一次等特性。
Kafka此类的拉模型将这一块功能都交由Consumer自动维护,因此服务器减少了更多的不必要的开支,因此从同等资源的角度来讲,Kafka具备链接的Producer和Consumer将会更多,极大的降低了消息堵塞的情况,因此看起来更快了。
太阳底下无新鲜事,对于一个框架来说,要想运行的更快,通常能用的手段也就那么几招,Kafka在将这一招用到了极致,其中之一就是极大化的使用了OS的Cache,主要是Page Cache和Buffer Cache。对于这两个Cache,使用Linux的同学通常不会陌生,例如我们在Linux下执行free命令的时候会看到如下的输出:
(图片来自网络)
会有两列名为buffers和cached,也有一行名为“-/+ buffers/cache”。这两个信息的具体解释如下:
pagecache:文件系统层级的缓存,从磁盘里读取的内容是存储到这里,这样程序读取磁盘内容就会非常快,比如使用Linux的grep和find等命令查找内容和文件时,第一次会慢很多,再次执行就快好多倍,几乎是瞬间。另外page cache的数据被修改过后,也即脏数据,等到写入磁盘时机到来时,会转移到buffer cache 而不是直接写入到磁盘。我们看到的cached这列的数值表示的是当前的页缓存(page cache)的占用量,page cache文件的页数据,页是逻辑上的概念,因此page cache是与文件系统同级的。
buffer cache:磁盘等块设备的缓冲,内存的这一部分是要写入到磁盘里的 。buffers列表示当前的块缓存(buffer cache)占用量,buffer cache用于缓存块设备(如磁盘)的块数据。块是物理上的概念,因此buffer cache是与块设备驱动程序同级的。
两者都是用来加速数据IO,将写入的页标记为dirty,然后向外部存储flush,读数据时首先读取缓存,如果未命中,再去外部存储读取,并且将读取来的数据也加入缓存。操作系统总是积极地将所有空闲内存都用作page cache和buffer cache,当os的内存不够用时也会用LRU等算法淘汰缓存页。
有了以上概念后,我们再看来Kafka是怎么利用这个特性的。首先,对于一次数据IO来说,通常会发生以下的流程:
可以发现一次IO请求操作进行了2次上下文切换和4次系统调用,而同一份数据在缓存中多次拷贝,实际上对于拷贝来说完全可以直接在内核态中进行,也就是省去第二和第三步骤,变成这样:
正因为可以如此的修改数据的流程,于是Kafka在设计之初就参考此流程,尽可能大的利用os的page cache来对数据进行拷贝,尽量减少对磁盘的操作。如果kafka生产消费配合的好,那么数据完全走内存,这对集群的吞吐量提升是很大的。早期的操作系统中的page cache和buffer cache是分开的两块cache,后来发现同样的数据可能会被cache两次,于是大部分情况下两者都是合二为一的。
Kafka虽然使用JVM语言编写,在运行的时候脱离不了JVM和JVM的GC,但是Kafka并未自己去管理缓存,而是直接使用了OS的page cache作为缓存,这样做带来了以下好处:
所以Kafka优化IO流程,充分利用page cache,其消耗的时间更短,吞吐量更高,相比其他MQ就更快了,用一张图来简述三者之间的关系如下:
当Producer和Consumer速率相差不大的情况下,Kafka几乎可以完全实现不落盘就完成信息的传输。
除了前面的重要特性之外,Kafka还有一个设计,就是对数据的持久化存储采用的顺序的追加写入,Kafka在将消息落到各个topic的partition文件时,只是顺序追加,充分的利用了磁盘顺序访问快的特性。
(图片来自网络)
Kafka的文件存储按照topic下的partition来进行存储,每一个partition有各自的序列文件,各个partition的序列不共享,主要的划分按照消息的key进行hash决定落在哪个分区之上,我们先来详细解释一下Kafka的各个名词,以便充分理解其特点:
可以看到最终落地到磁盘都是Segment文件,每一个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便老的 segment file快速被删除。因为Kafka处理消息的力度是到partition,因此只需要保持好partition对应的顺序处理,segment可以单独维护其状态。
segment的文件由index file和data file组成,落地在磁盘的后缀为.index和.log,文件按照序列编号生成,如下所示:
(图片来自网络)
其中index维持着数据的物理地址,而data存储着数据的偏移地址,相互关联,这里看起来似乎和磁盘的顺序写入关系不大,想想HDFS的块存储,每次申请固定大小的块和这里的segment?是不是挺相似的?另外因为有index文的本身命名是以offset作为文件名的,在进行查找的时候可以快速根据需要查找的offset定位到对应的文件,再根据文件进行内容的检索。因此Kafka的查找流程为先根据要查找的offset对文件名称进行二分查找,找到对应的文件,再根据index的元数据的物理地址和log文件的偏移位置结合顺序读区到对应offset的位置的内容即可。
segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间,特别是在随机读取的场景下,Kafka非常不合适。所以因为Kafka特殊的存储设计,也让Kafka感觉起来,更快。
前面提到Kafka为什么快,除了快的特性之外,Kafka还有其他特点,那就是:稳。Kafka的稳体现在几个维度:
对于Kafka的稳,通常是由其整体架构设计决定,很多优秀的特性结合在一起,就更加的优秀,像Kafka的Qutota就是其中一个,既然是限流,那就意味着需要控制Consumer或者Producer的流量带宽,通常限制流量这件事需要在网卡上作处理,像常见的N路交换机或者高端路由器,所以对于Kafka来说,想要操控OS的网卡去控制流量显然具有非常高的难度,因此Kafka采用了另外一个特别的思路,即:没有办法控制网卡通过的流量大小,就控制返回数据的时间。对于JVM程序来说,就是一个wait或者seelp的事情。
所以对于Kafka来说,有一套特殊的时延计算规则,Kafka按照一个窗口来统计单位时间传输的流量,当流量大小超过设置的阈值的时候,触发流量控制,将当前请求丢入Kafka的Qutota Manager,等到延迟时间到达后,再次返回数据。我们通过Kafka的ClientQutotaManager类中的方法来看:
这几行代码代表了Kafka的限流计算逻辑,大概的思路为:假设我们设定当前流量上限不超过T,根据窗口计算出当前的速率为O,如果O超过了T,那么会进行限速,限速的公示为:
X = (O - T)/ T * W
X为需要延迟的时间,让我举一个形象的例子,假设我们限定流量不超过10MB/s,过去5秒(公示中的W,窗口区间)内通过的流量为100MB,则延迟的时间为:(100-5*10)/ 10=5秒。这样就能够保障在下一个窗口运行完成后,整个流量的大小是不会超过限制的。通过KafkaApis里面对Producer和Consumer的call back代码可以看到对限流的延迟返回:
对于kafka的限流来讲,默认是按照client id或者user来进行限流的,从实际使用的角度来说,意义不是很大,基于topic或者partition分区级别的限流,相对使用场景更大,ThoughtWroks曾经帮助某客户修改Kafka核心源码,实现了基于topic的流量控制。
Kafka背后的元信息重度依赖Zookeeper,再次我们不解释Zookeeper本身,而是关注Kafka到底是如何使用zk的,首先一张图解释Kafka对zk的重度依赖:
(图片来源于网络)
利用zk除了本身信息的存储之外,最重要的就是Kafka利用zk实现选举机制,其中以controller为主要的介绍,首先controller作为Kafka的心脏,主要负责着包括不限于以下重要事项:
也就是说Controller是Kafka的核心角色,对于Controller来说,采用公平竞争,任何一个Broker都有可能成为Controller,保障了集群的健壮性,对于Controller来说,其选举流程如下:
其代码直接通过KafkaController可以看到:
一旦Controller选举出来之后,则其他Broker会监听zk的变化,来响应集群中Controller挂掉的情况:
从而触发新的Controller选举动作。对于Kafka来说,整个设计非常紧凑,代码质量相当高,很多设计也非常具有借鉴意义,类似的功能在Kafka中有非常多的特性体现,这些特性结合一起,形成了Kafka整个稳定的局面。
虽然Kafka整体看起来非常优秀,但是Kafka也不是全能的银弹,必然有其对应的短板,那么对于Kafka如何,或者如何能用的更好,则需要经过实际的实践才能得感悟的出。经过归纳和总结,能够发现以下不同的使用场景和特点。
曾经我帮助某企业修改了分区创建规则,考虑了容量的情况,也就是按照磁盘容量进行分区的选择,紧接着带来第二个问题:容量大的磁盘具备更多的分区,则会导致大量的IO都压向该盘,最后问题又落回IO,会影响该磁盘的其他topic的性能。所以在考虑MQ系统的时候,需要合理的手动设置Kafka的分区规则。。
Kafka并不是唯一的解决方案,像几年前新生势头挺厉害的pulsar,以取代Kafka的口号冲入市场,也许会成为下一个解决Kafka部分痛点的框架,下文再讲述pulsar。
作者介绍:
白发川,ThoughtWorks大数据&AI团队核心成员,长期从事大数据和人工智能领域的研究,深度神经网络框架:deeplearning.scala 贡献者之一, 课程讲师,对海量数据处理和检索有着丰富的经验,具有丰富的大数据架构经验,在金融,制造业,供应链等多个领域进行数据平台落地实施,致力于大数据和人工智能在工程中的应用。
本文转载自ThoughtWorks洞见。
原文链接:
领取专属 10元无门槛券
私享最新 技术干货