这个专题的主要目的是debug kafka,并从中学习到kafka的架构原理,以及底层实现,提升抽象思维、汲取优秀设计思想、实现场景复用,当然也能实现与面试官侃大山。
kafka producer
kafka 消费者
点对点模式,一对一的关系,消费者主动去拉取消息进行处理
消息生产者将消息发送到topic中,同时会有多个消费者(订阅)消费该消息,和点对点方式不同发布到topic的消息会被所有订阅者消费。
topic 根据消费者订阅列表,主动推送消息给订阅者,如果消费者的处理能力比较弱,则会消息会产生积压。订阅有两种模式:1. 消费者主动从topic 拉取数据 2. topic 主动推 拉取的模式有缺点,实时获取消息,轮训topic,获取信息,长时间没有消息,会出现长轮训,会浪费消费者的资源。
注意客户端请求zk leader 和 follower 分读写请求,写请求打到follower上 ,先将请求转发给leader,leaderchu
主题是分区的,且有副本的概念,leader接收所有的请求,副本只做一个容灾,这样当leader挂掉之后,follower可以顶上来
2 consumer group - 消费者组 某一个分区的数据只能被同一个消费者组里面的某一个(only)消费者消费, 多个分区数据 可以被同一个消费者组里面 一夫多妻制度, 可以将消费者组里面的消费者想象成男性,分区想象成女性 提高并发处理能力的一个机制
消费者组的消费者个数与 partition 分区个数保持一致
zk 存储kafka集群的信息,kafka集群信息被zk集群统一管理 consumer 挂掉之后,需要从index + 1 处开始,已经消费的位置信息,需要存储在zk中。
0.9 版本之前offset 存在zk 0.9 版本之后offset 存储在本地 存在kafka系统当中,kafka系统生成topic 为什么改? 消费者拉取(kafka消费者消费信息的手段) 频繁跟zk打交道(频繁在zk中更新offset,zk的压力也会比较大)效率不高。
消息存在磁盘,存在内存干不了这个事情,默认存7天,168小时
1. 设置broker_id
2. 设置kafka运行日志(消息数据)存放的路径
3. 配置需要的zk集群连接,上报自身meta信息的远程zk集群地址
采用哨兵模式 sentinal (zk能够保持数据强一致的原因)
kafka内置了zk,可以使用内置zk在单机上启动一个zk服务进程(哨兵、集群、与选举模式 & 如何保证一致性,这颗好好研究一下,)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > /data/home/bofengliu/soft/logs/zk.log 2>&1 &
(PS:zk的作用) zk 用来存储维护broker集群的的meta信息,如topic的信息,0.9 版本以前consoumer 最近消费的message offest 会记录在zk中,0.9 版本以后消费者的消费offset 由broker集群自己维护。
虽然kafka有-dameond 后台启动的选项,但是没有日志,这个比较痛苦,所以采用nohup & + 日志的方式启动
nohup bin/kafka-server-start.sh config/server_0.properties > /data/home/bofengliu/soft/logs/kas_0.log 2>&1 &
nohup bin/kafka-server-start.sh config/server_1.properties > /data/home/bofengliu/soft/logs/kas_1.log 2>&1 &
nohup bin/kafka-server-start.sh config/server_2.properties > /data/home/bofengliu/soft/logs/kas_2.log 2>&1 &
bin/kafka-topics.sh --list --zookeeper localhost:2181
注意: --list 后面不跟 --zookeeper 参数是查询不出来的,因为zk上存储了topic的相关元数据
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic first_test --partitions 2 --replication-factor 2
注意: bin/kafka-topics.sh --help 可以显示当前的命令提示 --topic 后面紧跟主题名字 --partitions 紧跟这个 topic 对应的分区数量 当前我们设置为2 --replication-factor 设置每个分区的副本数量 当前我们设置为2
然后查看下kafka存储log(其实也存储消息数据的)目录,如下图所示:
我们发下在topic name 后面会加上-0、-1的数字,这个数字代表着分区号,而且每个分区目录会copy 两份,且存放在不同的目录里面(PS: 其实不同的目录在真实 的生产环境中代表不同的server,)即统一分区的不同副本不会存储在同一server上,这也很容易理解,如果同一分区的不同副本存储在同一台server, 那么当这台 机器宕机之后副本的容灾能力就太鸡肋了。
文件夹的名字 = [topic_name] + [partition index](PS:) 创建主题时, 分区的副本数量不能超过kafka集群server的总数量
bin/kafka-topic.sh --delete
--zookeeper localhost:2181
--topic third-test
删除后的效果如下所示:
bin/kafka-topics.sh --describe --topic first-test --zookeeper localhost:2181
命令格式: kafka-console-producer.sh --topic [topic-name] --broker-list [one broker of brokers 地址,ip:port]
bin/kafka-console-producer.sh --topic first-test --broker-list localhost:9093
参数详解:
命令格式: old-version: kafka-console-consumer.sh --topic [topic-name] --zookeeper [zk 地址] new-version: kafka-console-consumer.sh --topic [topic-name] --bootstrap-server
kafka-console-consumer.sh --topic first-test --zookeeper localhost:2181
(PS:
与生产者的标准终端打开方式不同)
kafka-console-consumer.sh --topic first-test --bootstrap-server
broker 记录消费者已经消费的消息的offset, kafka 分配专门的消费topic __consumer-offset 默认50个分区,均匀分配在kafka的多台brokers 上。--from-beginning 所有数据都可以消费
(PS: 这幅图中描述的重点信息都在图中有描述了,可以对照记忆一下)
新概念: 1个partition 是由 n 个 segment 片段数据组成的,如0...0.log 0...1.log one segment 的最大大小为sever.properties 中配置的 log.segment.bytes=1073741824
默认为1G大小 那么产生两个问题:
如下是上面两个问题对应的kafka 解决方案: 由于生产者会不断追加到log文件末尾,为了防止log文件过大导致数据定位效率底下,kafka采取了 分片
and 索引
机制,且分片文件命名与索引文件命名的前缀保持一致 索引文件命名成 *.index
数据分片 segment 命名为 *.log
, 这些文件位于一个文件夹下,该文件夹的命名规则为: [topic name] + [partition name]
关于kafka架构的深入探索,会在出现在后续的推文中,敬请期待!