Kafka 是一个优秀的分布式消息中间件,许多系统中都会使用到 Kafka 来做消息通信。对分布式消息系统的了解和使用几乎成为一个开发人员必备的技能。
思维导图
消息队列
分布式消息是一种通信机制,和 RPC、HTTP、RMI 等不一样,消息中间件采用分布式中间代理的方式进行通信。如图所示,采用了消息中间件之后,上游业务系统发送消息,先存储在消息中间件,然后由消息中间件将消息分发到对应的业务模块应用(分布式生产者 - 消费者模式)。这种异步的方式,减少了服务之间的耦合程度。
架构
定义消息中间件:
在系统架构中引用额外的组件,必然提高系统的架构复杂度和运维的难度,那么在系统中使用分布式消息中间件有什么优势呢?消息中间件在系统中起的作用又是什么呢?
下面是常见的几种分布式消息系统的对比:
选择
Kafka 架构中的一般概念:
架构
Kafka Topic Partitions Layout
主题
Kafka 将 Topic 进行分区,分区可以并发读写。
Kafka Consumer Offset
consumer offset
zookeeper
Kafka 的命令行工具在 Kafka 包的/bin
目录下,主要包括服务和集群管理脚本,配置脚本,信息查看脚本,Topic 脚本,客户端脚本等。
我们通常可以使用kafka-console-consumer.sh
和kafka-console-producer.sh
脚本来测试 Kafka 生产和消费,kafka-consumer-groups.sh
可以查看和管理集群中的 Topic,kafka-topics.sh
通常用于查看 Kafka 的消费组情况。
Kafka producer 的正常生产逻辑包含以下几个步骤:
Producer 发送消息的过程如下图所示,需要经过拦截器
,序列化器
和分区器
,最终由累加器
批量发送至 Broker。
producer
Kafka Producer 需要以下必要参数:
常见参数:
Kafka 有消费组的概念,每个消费者只能消费所分配到的分区的消息,每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。消费组与消费者关系如下图所示:
consumer group
Kafka Consumer Client 消费消息通常包含以下步骤:
过程
因为 Kafka 的 Consumer 客户端是线程不安全的,为了保证线程安全,并提升消费性能,可以在 Consumer 端采用类似 Reactor 的线程模型来消费数据。
消费模型
host:port
格式。key.serializer
对应,key 的反序列化方式。value.serializer
对应,value 的反序列化方式。false
,则需要在程序中手动提交位移。对于精确到一次的语义,最好手动提交位移max.poll.records
条数据需要在在 session.timeout.ms 这个时间内处理完 。默认值为 500rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。
什么时候 rebalance?
这也是经常被提及的一个问题。rebalance 的触发条件有三种:
如何进行组内分区分配?
Kafka 默认提供了两种分配策略:Range 和 Round-Robin。当然 Kafka 采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。
/bin
目录,管理 kafka 集群、管理 topic、生产和消费 kafka分区副本
在分布式数据系统中,通常使用分区来提高系统的处理能力,通过副本来保证数据的高可用性。多分区意味着并发处理的能力,这多个副本中,只有一个是 leader,而其他的都是 follower 副本。仅有 leader 副本可以对外提供服务。多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。
为什么 follower 副本不提供读服务?
这个问题本质上是对性能和一致性的取舍。试想一下,如果 follower 副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。比如你现在写入一条数据到 kafka 主题 a,消费者 b 从主题 a 消费数据,却发现消费不到,因为消费者 b 去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者 c 却可以消费到最新那条数据,因为它消费了 leader 副本。Kafka 通过 WH 和 Offset 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。
watermark
只有 Leader 可以对外提供读服务,那如何选举 Leader
kafka 会将与 leader 副本保持同步的副本放到 ISR 副本集合中。当然,leader 副本是一直存在于 ISR 副本集合中的,在某些特殊情况下,ISR 副本中甚至只有 leader 一个副本。当 leader 挂掉时,kakfa 通过 zookeeper 感知到这一情况,在 ISR 副本中选取新的副本成为 leader,对外提供服务。但这样还有一个问题,前面提到过,有可能 ISR 副本集合中,只有 leader,当 leader 副本挂掉后,ISR 集合就为空,这时候怎么办呢?这时候如果设置 unclean.leader.election.enable 参数为 true,那么 kafka 会在非同步,也就是不在 ISR 副本集合中的副本中,选取出副本成为 leader。
副本的存在就会出现副本同步问题
Kafka 在所有分配的副本 (AR) 中维护一个可用的副本列表 (ISR),Producer 向 Broker 发送消息时会根据ack
配置来确定需要等待几个副本已经同步了消息才相应成功,Broker 内部会ReplicaManager
服务来管理 flower 与 leader 之间的数据同步。
sync
一方面,由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的 disk drive 上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
Kafka 每一个 partition 目录下的文件被平均切割成大小相等(默认一个文件是 500 兆,可以手动去设置)的数据文件, 每一个数据文件都被称为一个段(segment file), 每个 segment 都采用 append 的方式追加数据。
追加数据
at least once
、at most once
和exactly once
。kafka 通过 ack 的配置来实现前两种。建议读者同学结合 Kafka 的配置去了解 Kafka 的实现原理,Kafka 有大量的配置,这也是 Kafka 高度扩展的一个表现,很多同学对 Kafka 的配置也不敢轻易改动。所以理解这些配置背后的实现原理,可以让我们在实践中懂得如何使用和优化 Kafka。既可面试造火箭,也可以实战造火箭。
Kafka 配置说明链接:https://kafka.apache.org/documentation
理解 Kafka 架构,就是理解 Kafka 的各种组件的概念,以及这些组件的关系。先简单看一下各组件及其简单说明。
Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。 Consumer: 消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。 Consumer Group: 一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。 Broker: 服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。 Topic: Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。 Partition: Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。 Offset: offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。 Replication: 副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。 Record: 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。
我们应该通过理解的方式去记忆它们。
生产者
-消费者
是一种设计模式,生产者
和消费者
之间通过添加一个中间组件
来达到解耦。生产者
向中间组件
生成数据,消费者
消费数据。
就像读书时65 哥给小芳写情书,这里写情书就是生产者
,情书就是消息
,小芳就是消费者
。但有时候小芳不在,或者比较忙,65 哥也比较害羞,不敢直接将情书塞小芳手里,于是将情书塞在小芳抽屉中。所以抽屉就是这个中间组件
。
在程序中我们通常使用Queue
来作为这个中间组件
。可以使用多线程向队列中写入数据,另外的消费者线程依次读取队列中的数据进行消费。模型如下图所示:
生产者
-消费者
模式通过添加一个中间层,不仅可以解耦生产者和消费者,使其易于扩展,还可以异步化调用、缓冲消息等。
后来 65 哥和小芳异地了,65 哥在卷都
奋斗,小芳在魔都
逛街。于是只能通过邮局
寄暧昧信了。这样 65 哥、邮局和小芳就成了分布式
的了。65 哥将信件发给邮局,小芳从邮局拿到 65 哥写的信,再回去慢慢看。
Kafka 的消息生产者
就是Producer
,上游消费者进程添加 Kafka Client 创建 Kafka Producer,向 Broker 发送消息,Broker 是集群部署在远程服务器上的 Kafka Server 进程,下游消费者进程引入 Kafka Consumer API 持续消费队列中消息。
因为 Kafka Consumer 使用 Poll 的模式,需要 Consumer 主动拉去消息。所有小芳只能定期去邮局拿信件了(呃,果然主动权都在小芳手上啊)。
邮局不能只为 65 哥服务,虽然 65 哥一天写好几封信。但也无法挽回邮局的损失。所以邮局是可以供任何人寄信。只需要寄信人写好地址(主题),邮局建有两地的通道就可以发收信件了。
Kafka 的 Topic 才相当于一个队列,Broker 是所有队列部署的机器。可以按业务创建不同的 Topic,Producer 向所属业务的 Topic 发送消息,相应的 Consumer 可以消费并处理消息。
由于 65 哥写的信太多,一个邮局已经无法满足 65 哥的需求,邮政公司只能多建几个邮局了,65 哥将信件按私密度分类(分区策略),从不同的邮局寄送。
同一个 Topic 可以创建多个分区。理论上分区越多并发度越高,Kafka 会根据分区策略将分区尽可能均衡的分布在不同的 Broker 节点上,以避免消息倾斜,不同的 Broker 负载差异太大。分区也不是越多越好哦,毕竟太多邮政公司也管理不过来。
为防止由于邮局的问题,比如交通断啦,邮车没油啦。导致 65 哥的暧昧信无法寄到小芳手上,使得 65 哥晚上远程跪键盘。邮局决定将 65 哥的信件复制几份发到多个正常的邮局,这样只要有一个邮局还在,小芳就可以收到 65 哥的信了。
Kafka 采用分区副本的方式来保证数据的高可用,每个分区都将建立指定数量的副本数,kakfa 保证同一分区副本尽量分布在不同的 Broker 节点上,以防止 Broker 宕机导致所有副本不可用。Kafka 会为分区的多个副本选举一个作为主副本(Leader),主副本对外提供读写服务,从副本(Follower)实时同步 Leader 的数据。
哎,65 哥的信件满天飞,小芳天天跑邮局,还要一一拆开看,65 哥写的信又臭又长,让小芳忙得满身大汉大汗。于是小芳啪的一下,很快啊,变出多个分身去不同的邮局取信,这样小芳终于可以挤出额外的时间逛街了。
邮局最近提供了定制明信片业务,每个人都可以设计明信片,同一个身份只能领取一种明信片。65 哥设计了一堆,广播给所有漂亮的小妹妹都可以来领取,美女啪变出的分身也可以来领取,但是同一个身份的多个分身只能取一种明信片。
Kafka 通过 Consumer Group 来实现广播模式消息订阅,即不同 group 下的 consumer 可以重复消费消息,相互不影响,同一个 group 下的 consumer 构成一个整体。
最后我们完成了 Kafka 的整体架构,如下:
Zookeeper 是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服、同步服务和命名注册等能力.。对于任何分布式系统,都需要一种协调任务的方法。Kafka 是使用 ZooKeeper 而构建的分布式系统。但是也有一些其他技术(例如 Elasticsearch 和 MongoDB)具有其自己的内置任务协调机制。
Kafka 将 Broker、Topic 和 Partition 的元数据信息存储在 Zookeeper 上。通过在 Zookeeper 上建立相应的数据节点,并监听节点的变化,Kafka 使用 Zookeeper 完成以下功能:
我们看一看 Zookeeper 下 Kafka 创建的节点,即可一目了然的看出这些相关的功能。
Controller 是从 Broker 中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生故障时,由 Controller 负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR(In-Sync Replica)集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用kafka-topics.sh
脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中 Contorller 的选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller
这个临时(EPHEMERAL)节点。
Broker 启动的时候尝试去读取/controller
节点的brokerid
的值,如果brokerid
的值不等于-1,则表明已经有其他的 Broker 成功成为 Controller 节点,当前 Broker 主动放弃竞选;如果不存在/controller
节点,或者 brokerid 数值异常,当前 Broker 尝试去创建/controller
这个节点,此时也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。
Controller 读取 Zookeeper 中的节点数据,初始化上下文(Controller Context),并管理节点变化,变更上下文,同时也需要将这些变更信息同步到其他普通的 broker 节点中。Controller 通过定时任务,或者监听器模式获取 zookeeper 信息,事件监听会更新更新上下文信息,如图所示,Controller 内部也采用生产者-消费者实现模式,Controller 将 zookeeper 的变动通过事件的方式发送给事件队列,队列就是一个LinkedBlockingQueue
,事件消费者线程组通过消费消费事件,将相应的事件同步到各 Broker 节点。这种队列 FIFO 的模式保证了消息的有序性。
Controller 被选举出来,作为整个 Broker 集群的管理者,管理所有的集群信息和元数据信息。它的职责包括下面几部分:
“状态机啊,听起来好复杂。 ”
Controller 管理着集群中所有副本和分区的状态机。大家不要被状态机
这个词唬住了。理解状态机很简单。先理解模型,即这是什么关于什么模型,然后就是模型的状态有哪些,模型状态之间如何转换,转换时发送相应的变化事件。
Kafka 的分区和副本状态机很简单。我们先理解,这分别是管理 Kafka Topic 的分区和副本的。它们的状态也很简单,就是 CRUD,具体说来如下:
PartitionStateChange,管理 Topic 的分区,它有以下 4 种状态:
我们用一张图来直观的看看这些状态是如何变化的,以及在状态发生变化时 Controller 都有哪些操作:
ReplicaStateChange,副本状态,管理分区副本信息,它也有 4 种状态:
副本状态间的变化如下图所示,Controller 在状态变化时会做出相应的操作:
Kafka 的网络通信模型是基于 NIO 的 Reactor 多线程模型来设计的。其中包含了一个Acceptor
线程,用于处理新的连接,Acceptor
有 N 个 Processor
线程 select 和 read socket 请求,N 个 Handler
线程处理请求并相应,即处理业务逻辑。下面就是 KafkaServer 的模型图:
从高度抽象的角度来看,性能问题逃不出下面三个方面:
对于 Kafka 这种网络分布式队列来说,网络和磁盘更是优化的重中之重。针对于上面提出的抽象问题,解决方案高度抽象出来也很简单:
知道了问题和思路,我们再来看看,在 Kafka 中,有哪些角色,而这些角色就是可以优化的点:
是的,所有的问题,思路,优化点都已经列出来了,我们可以尽可能的细化,三个方向都可以细化,如此,所有的实现便一目了然,即使不看 Kafka 的实现,我们自己也可以想到一二点可以优化的地方。
这就是思考方式。提出问题
> 列出问题点
> 列出优化方法
> 列出具体可切入的点
> tradeoff和细化实现
。
现在,你也可以尝试自己想一想优化的点和方法,不用尽善尽美,不用管好不好实现,想一点是一点。
“不行啊,我很笨,也很懒,你还是直接和我说吧,我白嫖比较行。 ”
“人家 Redis 是基于纯内存的系统,你 kafka 还要读写磁盘,能比? ”
为什么说写磁盘慢?
我们不能只知道结论,而不知其所以然。要回答这个问题,就得回到在校时我们学的操作系统课程了。来,翻到讲磁盘的章节,让我们回顾一下磁盘的运行原理。
“鬼还留着哦,课程还没上到一半书就没了。要不是考试俺眼神好,估计现在还没毕业。 ”
看经典大图:
完成一次磁盘 IO,需要经过寻道
、旋转
和数据传输
三个步骤。
影响磁盘 IO 性能的因素也就发生在上面三个步骤上,因此主要花费的时间就是:
因此,如果在写磁盘的时候省去寻道
、旋转
可以极大地提高磁盘读写的性能。
Kafka 采用顺序写
文件的方式来提高磁盘写入性能。顺序写
文件,基本减少了磁盘寻道
和旋转
的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。
Kafka 中每个分区是一个有序的,不可变的消息序列,新的消息不断追加到 Partition 的末尾,在 Kafka 中 Partition 只是一个逻辑概念,Kafka 将 Partition 划分为多个 Segment,每个 Segment 对应一个物理文件,Kafka 对 segment 文件追加写,这就是顺序写文件。
“为什么 Kafka 可以使用追加写的方式呢? ”
这和 Kafka 的性质有关,我们来看看 Kafka 和 Redis,说白了,Kafka 就是一个Queue
,而 Redis 就是一个HashMap
。Queue
和Map
的区别是什么?
Queue
是 FIFO 的,数据是有序的;HashMap
数据是无序的,是随机读写的。Kafka 的不可变性,有序性使得 Kafka 可以使用追加写的方式写文件。
其实很多符合以上特性的数据系统,都可以采用追加写的方式来优化磁盘性能。典型的有Redis
的 AOF 文件,各种数据库的WAL(Write ahead log)
机制等等。
“所以清楚明白自身业务的特点,就可以针对性地做出优化。 ”
“哈哈,这个我面试被问到过。可惜答得一般般,唉。 ”
什么是零拷贝?
我们从 Kafka 的场景来看,Kafka Consumer 消费存储在 Broker 磁盘的数据,从读取 Broker 磁盘到网络传输给 Consumer,期间涉及哪些系统交互。Kafka Consumer 从 Broker 消费数据,Broker 读取 Log,就使用了 sendfile。如果使用传统的 IO 模型,伪代码逻辑就如下所示:
readFile(buffer)
send(buffer)
如图,如果采用传统的 IO 流程,先读取网络 IO,再写入磁盘 IO,实际需要将数据 Copy 四次。
“啊,操作系统这么傻吗?copy 来 copy 去的。 ”
并不是操作系统傻,操作系统的设计就是每个应用程序都有自己的用户内存,用户内存和内核内存隔离,这是为了程序和系统安全考虑,否则的话每个应用程序内存满天飞,随意读写那还得了。
不过,还有零拷贝
技术,英文——Zero-Copy
。零拷贝
就是尽量去减少上面数据的拷贝次数,从而减少拷贝的 CPU 开销,减少用户态内核态的上下文切换次数,从而优化数据传输的性能。
常见的零拷贝思路主要有三种:
Kafka 使用到了 mmap
和 sendfile
的方式来实现零拷贝
。分别对应 Java 的 MappedByteBuffer
和 FileChannel.transferTo
。
使用 Java NIO 实现零拷贝
,如下:
FileChannel.transferTo()
在此模型下,上下文切换的数量减少到一个。具体而言,transferTo()
方法指示块设备通过 DMA 引擎将数据读取到读取缓冲区中。然后,将该缓冲区复制到另一个内核缓冲区以暂存到套接字。最后,套接字缓冲区通过 DMA 复制到 NIC 缓冲区。
我们将副本数从四减少到三,并且这些副本中只有一个涉及 CPU。我们还将上下文切换的数量从四个减少到了两个。这是一个很大的改进,但是还没有查询零副本。当运行 Linux 内核 2.4 及更高版本以及支持收集操作的网络接口卡时,后者可以作为进一步的优化来实现。如下所示。
根据前面的示例,调用transferTo()
方法会使设备通过 DMA 引擎将数据读取到内核读取缓冲区中。但是,使用gather
操作时,读取缓冲区和套接字缓冲区之间没有复制。取而代之的是,给 NIC 一个指向读取缓冲区的指针以及偏移量和长度,该偏移量和长度由 DMA 清除。CPU 绝对不参与复制缓冲区。
关于零拷贝
详情,可以详读这篇文章零拷贝 (Zero-copy) 浅析及其应用。
producer 生产消息到 Broker 时,Broker 会使用 pwrite() 系统调用【对应到 Java NIO 的 FileChannel.write() API】按偏移量写入数据,此时数据都会先写入page cache
。consumer 消费消息时,Broker 使用 sendfile() 系统调用【对应 FileChannel.transferTo() API】,零拷贝地将数据从 page cache 传输到 broker 的 Socket buffer,再通过网络传输。
leader 与 follower 之间的同步,与上面 consumer 消费数据的过程是同理的。
page cache
中的数据会随着内核中 flusher 线程的调度以及对 sync()/fsync() 的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。另外,如果 consumer 要消费的消息不在page cache
里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入 page cache,以方便下一次读取。
因此如果 Kafka producer 的生产速率与 consumer 的消费速率相差不大,那么就能几乎只靠对 broker page cache 的读写完成整个生产 - 消费过程,磁盘访问非常少。
“网络嘛,作为 Java 程序员,自然是 Netty ”
是的,Netty 是 JVM 领域一个优秀的网络框架,提供了高性能的网络服务。大多数 Java 程序员提到网络框架,首先想到的就是 Netty。Dubbo、Avro-RPC 等等优秀的框架都使用 Netty 作为底层的网络通信框架。
Kafka 自己实现了网络模型做 RPC。底层基于 Java NIO,采用和 Netty 一样的 Reactor 线程模型。
Reacotr 模型主要分为三个角色
在传统阻塞 IO 模型中,每个连接都需要独立线程处理,当并发数大时,创建线程数多,占用资源;采用阻塞 IO 模型,连接建立后,若当前线程没有数据可读,线程会阻塞在读操作上,造成资源浪费
针对传统阻塞 IO 模型的两个问题,Reactor 模型基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理;基于 IO 复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。遍历到有新数据可以处理时,操作系统会通知程序,线程跳出阻塞状态,进行业务逻辑处理
Kafka 即基于 Reactor 模型实现了多路复用和处理线程池。其设计如下:
其中包含了一个Acceptor
线程,用于处理新的连接,Acceptor
有 N 个 Processor
线程 select 和 read socket 请求,N 个 Handler
线程处理请求并相应,即处理业务逻辑。
I/O 多路复用可以通过把多个 I/O 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。它的最大优势是系统开销小,并且不需要创建新的进程或者线程,降低了系统的资源开销。
总结: Kafka Broker 的 KafkaServer
设计是一个优秀的网络架构,有想了解 Java 网络编程,或需要使用到这方面技术的同学不妨去读一读源码。
Kafka Producer 向 Broker 发送消息不是一条消息一条消息的发送。使用过 Kafka 的同学应该知道,Producer 有两个重要的参数:batch.size
和linger.ms
。这两个参数就和 Producer 的批量发送有关。
Kafka Producer 的执行流程如下图所示:
发送消息依次经过以下处理器:
Accumulate
顾名思义,就是一个消息累计器。其内部为每个 Partition 维护一个Deque
双端队列,队列保存将要发送的批次数据,Accumulate
将数据累计到一定数量,或者在一定过期时间内,便将数据以批次的方式发送出去。记录被累积在主题每个分区的缓冲区中。根据生产者批次大小属性将记录分组。主题中的每个分区都有一个单独的累加器 / 缓冲区。Kafka 支持多种压缩算法:lz4、snappy、gzip。Kafka 2.1.0 正式支持 ZStandard —— ZStandard 是 Facebook 开源的压缩算法,旨在提供超高的压缩比 (compression ratio),具体细节参见 zstd。
Producer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。
Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。
Kafka 具有优秀的分区分配算法——StickyAssignor,可以保证分区的分配尽量地均衡,且每一次重分配的结果尽量与上一次分配结果保持一致。这样,整个集群的分区尽量地均衡,各个 Broker 和 Consumer 的处理不至于出现太大的倾斜。
“那是不是分区数越多越好呢? ”
当然不是。
在 kafka 的 broker 中,每个分区都会对照着文件系统的一个目录。在 kafka 的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着 partition 的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。
客户端 producer 有个参数 batch.size,默认是 16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。
分区越多,每个 Broker 上分配的分区也就越多,当一个发生 Broker 宕机,那么恢复时间将很长。
Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。
Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。
index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap
的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。mmap
的 Java 实现对应 MappedByteBuffer
。
“mmap 是一种内存映射文件的方法。即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read,write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。 ”
Kafka 充分利用二分法来查找对应 offset 的消息位置:
Kafka 是一个优秀的开源项目。其在性能上面的优化做的淋漓尽致,是很值得我们深入学习的一个项目。无论是思想还是实现,我们都应该认真的去看一看,想一想。