
简介
Kafka适合什么样的场景?
它可以用于两大类别的应用:
为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。
定位
手机用户请
横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可以关注公众号后回复『知识星球』加入并获取长期知识分享服务。
依赖 Zookeeper 实现配置和节点管理

如上图所示,一个 Kafka 集群架构中:

pollTopic的订阅和发送,来实现生产者和消费者的关联,多对多



数据分段(针对文件过大,超出 1G)


业务场景:数据同步存储到 mysql、ES

canal 伪装为 slave 节点,进行数据同步,解析 binlog,可以对接 kafka,实现数据的多写。

MYSQL的数据修改,通过 kafka 完成数据变更的自动推送,实现多写操作。
enable.idempotence=true
PID Producer ID 生产者编号操作 API
producer.initTransactions()producer.beginTransaction()producer.commitTransaction()场景:
consume-process-produce实现原理:
__transaction_stateUUID

KafkaProducer



服务端 ACKreplica.lag.time.max.ms不等待 ACK => props.put("acks","0") 效率最高,可靠性低

默认:Leader 落盘成功则返回 ACK => props.put("acks","1") 可靠性较低,还是有丢失消息风险

Leader 和全部 Follpwers 落盘返回 ACK => props.put("acks","-1") 或 props.put("acks","all") 性能最差,可靠性高,但是还是有可能会带来问题,最后环节响应 ACK 失败,发送端如果设置了 retries 重发参数,会发生消息重复的问题。


特点
xxx-[分区下标]
如图:topic 对应 3 个分区,每个分区一共 3 个副本。

查看副本情况

如图:topic 对应 4 个分区,每个分区 2 个副本

查看副本情况

AdminUtils.scala -> assignReplicasToBrokers
随机从 brokerList 选择的

分割方式
log.segment.bytes 单个日志段的最大大小,默认 1073741824 -> 1Glog.roll.hours 新日志段轮转时间间隔(小时为单位),次要配置为log.roll.mslog.roll.ms 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hourslog.index.size.max.bytes offset 索引的最大字节数,默认10485760 -> 10Moffset index 偏移量索引

如上图,通过kafka-dump-log.sh脚本查看索引文件。
索引特点:
n 表示文件个数,m 表示稀疏程度log.index.interval.bytes 添加 offset 索引字段大小间隔,默认 4096, 4KB
time index 时间戳索引
log.message.timestamp.type=CreateTime/logAppendTime索引检索过程
为什么不用 B+tree 做索引结构?
log.cleaner.enable=true 默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据 log.cleaner.dedupe.buffer.size,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看 log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。(0.9.0.1中的显著变化)log.cleanup.policy=delete/compact 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:delete和compact

log.retention.check.interval.ms=300 000 日志清理器检查是否有日志符合删除的频率(以毫秒为单位)log.retention.hours 日志删除的时间阈值(小时为单位) 默认 168 小时,即 1 个星期log.retention.minutes 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值。log.retention.ms 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值。log.retention.bytes 日志删除的大小阈值。log.segment.bytes 单个日志段文件最大大小。unclean.leader.election.enable 指定副本是否能够不在 ISR 中选举为 Leader,会导致数据丢失,默认为 false。

主从同步


Follower 故障
Leader 故障
根据 offset 和时间戳进行消费

__consumer_offsets => topic 的存储结构

auto.offset.reset,默认值为 lastestenable.auto.commit=trueenable.auto.commit=false消费者分配






rebalance针对分区少,消费者多的情况

磁盘寻址

Kafka 日志文件顺序存放 -> 磁盘顺序读写。

Sequential disk (磁盘顺序读写) 比 Random SSD (固态的随机读写)要更快。
零拷贝



零拷贝实现代码 传输层封装代码:PlaintextTransportLayer

MessageSet 接口的 writeTo 方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo 实现,而不在使用进程内的写缓存。线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接。这种设计方式经过大量的测试,发现它是实现简单而且快速的。协议保持简单以允许未来实现其他语言的客户端.