kafka脱胎于雅虎项目,在现今的消息系统中,存在着举足轻重的意义。在笔者看来学习Kafka这款系统既有利于思考分布式消息队列的推演,也有利于发掘rabbitmq中的不足,以史为鉴可以知兴替,学习老大哥rabbitmq中的精华,摒弃其中的糟粕,提升下一代消息队列服务的性能,荣幸之至。
不妨看看消息队列的官方介绍
In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality. 翻译过来就是,在计算机科学领域,消息队列和邮箱都是软件工程组件,通常用于进程间或同一进程内的线程通信。它们通过队列来传递消息-传递控制信息或内容,群组通信系统提供类似的功能
消息队列就是一个使用队列来通信的组件,消息队列常常指代的是消息服务中间件,然而它的存在不仅仅只是为了解决通信这个问题。笔者看来其存在至少有三个目的:
消息队列的存在就是为了实现这三个目的,也是围绕这三个目的而进行架构设计实践和功能升级迭代
减少请求的等待,还能让服务异步并发处理,提升系统总体性能。流程如图所示
订单服务把订单相关消息塞到消息队列中,下游系统谁要谁就订阅这个主题。这样订单服务就解放啦
后端服务相对而言都是比较弱的,因为业务较重,处理时间较长
利用消息队列,可以存在一个缓冲的作用
一条消息是有生命的,存在出生,亦存在死亡,生死之间的传导也存在着诸多的升华,会历经沉浮,各种纷纷扰扰,有副本,有恢复,有重建,还是很值得探索一番,尝试为其写一篇评传,纪念其短暂又有意义的一生。🍵
消息的生产方,可以是来自于我们的日志系统、订单系统、告警系统、邮箱系统等等。
其实就是消息队列的服务端
Producer将往Broker的topic发送消息,而数据是存在partition上的,而partition持久化到磁盘是IO顺序访问的,并且是先写缓存,隔一段时间或者数据量足够大的时候才批量写入磁盘的。
如果要保证全局有序,那只能写入一个partition中。如果要消费也有序,consumer也只能有一个。
Consumer从Broker拉取消息,或者Broker推送消息至Consumer,最后消费
一个消息中间件,队列不单单只有一个,我们往往会有多个队列,而我们生产者和消费者就得知道:把数据丢给哪个队列,从哪个队列获取消息。我们需要给队列取名字,叫做topic(相当于数据库里边表的概念)
给队列取了名字以后,生产者就知道往哪个队列丢数据了,消费者也知道往哪个队列拿数据了。我们可以有多个生产者往同一个队列(topic)丢数据,多个消费者往同一个队列(topic)拿数据
Consumer看似订阅的是Topic,实则是从Topic下的某个分区获得消息,Producer发送消息也是如此。
该模型是为了实现一条消息能被多个消费者消费的需求
其允许消息发往一个Topic,所有订阅了这个 Topic 的订阅者都能消费这条消息
其实可以这么理解,发布/订阅模型等于我们都加入了一个群聊中,我发一条消息,加入了这个群聊的人都能收到这条消息。那么队列模型就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。
注意到在这款软件中,消息服务模型只有一个,这个区别于rabbitmq只有一种,但经过观察,我们会发现实际上,不管是Direct,还是Fanout,都是Topic的一个子集
为了提高一个队列(topic)的吞吐量,Kafka会把topic进行分区(Partition)
生产者往一个topic里面丢数据,实际上数据会在partition中,partition会分布在不同的broker(服务器)上
往一个topic丢数据,实际上就是往多个broker的partition存储数据
这,也就是Kafka天生分布式的原因了。
映射到实际代码中在磁盘上的关系
每个分区对应一个Log对象,在磁盘中就是一个子目录,子目录下面会有多组日志段即多Log Segment,每组日志段包含:消息日志文件(以log结尾)、位移索引文件(以index结尾)、时间戳索引文件(以timeindex结尾)。其实还有其它后缀的文件,例如.txnindex、.deleted等等
既然数据是保存在partition中的,那么消费者实际上也是从partition中取数据
一个消费者消费三个分区的数据。多个消费者可以组成一个消费者组。
本来是一个消费者消费三个分区的,现在我们有消费者组,就可以每个消费者去消费一个分区(也是为了提高吞吐量)
一台Kafka服务器叫做Broker,Kafka集群就是多台Kafka服务器的集合。
古早时期的 kafka 用 zookeeper 做 meta 信息存储、consumer 的消费状态、group 的管理以及 offset的值。后期考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zk的作用。新的consumer使用了kafka内部的group coordination
协议,也减少了对zookeeper的依赖。接下来也是在品味一下历史上的zk与kafka之间的关系,同时展望一下未来的操作。
以下是kafka在zookeeper中的详细存储结构图:
Broker是部署的过程中属于分布式的,并且之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper
。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokers/ids
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]
。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
也就是说从多个 broker 中选出控制器,这个工作就是 zookeeper 负责的。
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]
的形式被记录,如/brokers/topics/login
和/brokers/topics/search
等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2
,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
消费者服务器在初始化启动时加入消费者分组的步骤如下:
/consumers/[group_id]/ids/[consumer_id]
,完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。消费者分组
中的消费者
的变化注册监听。每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids
节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。/broker/ids/[0-N]
中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。消费者
与消息
分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。消费组(Consumer Group)
下有多个 Consumer(消费者)。
对于每个消费者组(Consumer Group)
,Kafka都会为其分配一个全局唯一的Group ID
,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用Hostname:UUID
形式表示。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录消息分区
与Consumer
之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]
就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset
记录到Zookeeper
上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
ISR(in-sync replica)
是 partition 的一组同步集合,就是所有 follower 里面同步最积极的那部分。
一条消息只有被 ISR 中的成员都接收到,才被视为“已同步”状态。
只有处于 ISR 集合中的副本才有资格被选举为 leader。
zookeeper 记录着 ISR 的信息,而且是实时更新的,只要发现其中有成员不正常,马上移除。
etcd数据模型与Zookeeper数据模型不兼容。 因此,要将Kafka从Zookeeper迁移到etcd并不容易。
不过最近,etcd3已发布。 coreos团队为etcd3开发了名为zetcd的新工具。 它能够使用etcd模拟Zookeeper数据模型给Zookeeper客户端。 zetcd是位于etcd群集前面并模拟Zookeeper客户端端口的代理。 Zookeeper客户端请求到达端口2181(默认的Zookeeper端口)并重定向到etcd服务器。 在较高的级别上,zetcd接收Zookeeper客户端请求,使它们适合etcd的数据模型和API,将请求发布给etcd,然后将转换后的响应返回给客户端。
我们继续回顾这个图
从前面的只言片语中,我们知道一个消息从生产到被消费这个过程中需要经历过服务端的记录落盘和读出,这期间就可能会触发一系列的生产上的灵异事件。
凡是分布式就无法避免网络抖动/机器宕机等问题的发生,很有可能消费者A读取了数据,还没来得及消费,就挂掉了。Zookeeper发现消费者A挂了,让消费者B去消费原本消费者A的分区,等消费者A重连的时候,发现已经重复消费同一条数据了。
事实上消息重复是不可避免的,那要怎么解决呢?
如果业务上不允许重复消费的问题,最好消费者那端做业务上的校验(如果已经消费过了,就不消费了)
既然我们不能防止重复消息的产生,那么我们只能在业务上抹掉重复消息所带来的影响,比如说采用幂等。要采用这种方式因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。实际上这个过程的设计在现在的微服务体系中称之为无状态服务。
所谓幂等,即同样的参数多次调用同一个接口和调用一次产生的结果是一致的。例如这条 SQLupdate t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等。
更通用的做法是做个version即版本号控制,对比消息中的版本号和数据库中的版本号。或者通过数据库的约束例如唯一键,例如insert into update on duplicate key...或者记录关键的key。这个在FaaS中经常应用到
最理想状态下来说数据是不会丢失的,然而理想是丰满的,现实是骨感的。数据一定会出现丢失的情况,只是概率大还是小的问题。我们继续追踪消息的生命周期,来推演丢数据的环节。
__consumer_offsets
这个topic中在生产环境中严格做到exactly once其实是难的,同时也会牺牲效率和吞吐量,最佳实践是在业务侧做好相关的补偿机制,万一出现消息丢失可以有兜底,为了为消息可靠性投递提供保证,这个需要三方的配合。
kafka自身只可以完成数据的顺序读写,那么如何完成在集群中的数据一致性?其实这个跟kafka集群的底层有关,比如说kafka是借助zookeeper来实现数据一致性,而etcd则是通过使用raft协议来保持数据一致性
对于kafka来说,需要借助两个概念来辅助理解
数据同步过程
对于Leader新写入的msg,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置。这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取。对于来自内部Broker的读取请求,没有HW的限制
我们数据存在不同的partition上,那kafka就把这些partition做备份。比如,现在我们有三个partition,分别存在三台broker上。每个partition都会备份,这些备份散落在不同的broker上。
红色块的partition代表的是主分区,紫色的partition块代表的是备份分区。生产者往topic丢数据,是与主分区交互,消费者消费topic的数据,也是与主分区交互。
备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用。
快是源于三个读写加速技术的综合应用
正常的读磁盘数据是需要将内核态数据拷贝到用户态的,而Kafka 通过调用sendfile()
直接从内核空间(DMA的)到内核空间(Socket的),这个过程比传统方式少做了一步拷贝的操作。
Sendfile
实现零拷贝(Zero Copy)Read/Write
方式进行网络文件传输,在传输过程中,文件数据实际上是经过了四次 Copy 操作Read
函数,文件数据被Copy
到内核缓冲区Read
函数返回,文件数据从内核缓冲区Copy
到用户缓冲区Write
函数调用,将文件数据从用户缓冲区Copy
到内核与Socket
相关的缓冲区Socket
缓冲区Copy
到相关协议引擎Sendfile
系统调用则提供了一种减少以上多次 Copy
,提升文件传输性能的方法。在内核版本 2.1 中,引入了Sendfile
系统调用,以简化网络上和两个本地文件之间的数据传输。Sendfile
的引入不仅减少了数据复制,还减少了上下文切换。相较传统Read/Write
方式,2.1 版本内核引进的Sendfile
已经减少了内核缓冲区到User
缓冲区,再由User
缓冲区到Socket
相关缓冲区的文件Copy
。而在内核版本 2.4 之后,文件描述符结果被改变,Sendfile
实现了更简单的方式,再次减少了一次Copy
操作。Kafka 会把收到的消息都写入到硬盘中,在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构(杠精别跟我扯ssd,这个后面有机会再讨论),每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机 I/O,最喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。
每一个 Partition 其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾(虚框部分)
这种方法有一个缺陷——没有办法删除数据 ,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个 消费者(Consumer)对每个 Topic 都有一个 Offset 用来表示读取到了第几条数据
为了避免磁盘被撑满的情况,Kakfa 提供了两种策略来删除数据
mmf (Memory Mapped Files)直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘。mmf 通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制。
然而这种方式存在一定的缺陷,即不可靠,因为当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka 提供了produce.type参数来控制是否主动的进行刷新,如果 Kafka 写入到 mmf 后立即flush再返回给生产者则为同步模式,反之为异步模式。
我们知道kafka是不支持回溯的,经常的会存在这样一种场景:一个消费者组中的某个消费者挂了,那挂掉的消费者所消费的分区可能就由存活的消费者消费。那存活的消费者是需要知道挂掉的消费者消费到哪了,不然怎么玩?
Kafka就是用offset来表示消费者的消费进度到哪了,每个消费者会都有自己的offset。说白了offset就是表示消费者的消费进度在以前版本的Kafka,这个offset是由Zookeeper来管理的,后来Kafka开发者认为Zookeeper不合适大量的删改操作,于是把offset在broker以内部topic(__consumer_offsets)的方式来保存起来。每次消费者消费的时候,都会提交这个offset,Kafka可以让你选择是自动提交还是手动提交。
这个要分两个方面来讨论
首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的。
不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可。
绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。
其实也就是消息发生了堆积
生产者的生产速度与消费者的消费速度不匹配
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。