简介
Kafka适合什么样的场景?
它可以用于两大类别的应用:
为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。
定位
手机用户请
横屏
获取最佳阅读体验,REFERENCES
中是本文参考的链接,如需要链接和更多资源,可以关注公众号后回复『知识星球』加入并获取长期知识分享服务。
依赖 Zookeeper 实现配置和节点管理
如上图所示,一个 Kafka 集群架构中:
poll
Topic
的订阅和发送,来实现生产者和消费者的关联,多对多数据分段(针对文件过大,超出 1G)
业务场景:数据同步存储到 mysql、ES
canal 伪装为 slave 节点,进行数据同步,解析 binlog,可以对接 kafka,实现数据的多写。
MYSQL的数据修改,通过 kafka 完成数据变更的自动推送,实现多写操作。
enable.idempotence=true
PID
Producer ID 生产者编号操作 API
producer.initTransactions()
producer.beginTransaction()
producer.commitTransaction()
场景:
consume-process-produce
实现原理:
__transaction_state
UUID
KafkaProducer
服务端 ACK
replica.lag.time.max.ms
不等待 ACK => props.put("acks","0") 效率最高,可靠性低
默认:Leader 落盘成功则返回 ACK => props.put("acks","1") 可靠性较低,还是有丢失消息风险
Leader 和全部 Follpwers 落盘返回 ACK => props.put("acks","-1") 或 props.put("acks","all") 性能最差,可靠性高,但是还是有可能会带来问题,最后环节响应 ACK 失败,发送端如果设置了 retries 重发参数,会发生消息重复的问题。
特点
xxx-[分区下标]
如图:topic 对应 3 个分区,每个分区一共 3 个副本。
查看副本情况
如图:topic 对应 4 个分区,每个分区 2 个副本
查看副本情况
AdminUtils.scala -> assignReplicasToBrokers
随机
从 brokerList 选择的分割方式
log.segment.bytes
单个日志段的最大大小,默认 1073741824 -> 1Glog.roll.hours
新日志段轮转时间间隔(小时为单位),次要配置为log.roll.ms
log.roll.ms
新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hours
log.index.size.max.bytes
offset 索引的最大字节数,默认10485760 -> 10Moffset index 偏移量索引
如上图,通过kafka-dump-log.sh
脚本查看索引文件。
索引特点:
n 表示文件个数,m 表示稀疏程度
log.index.interval.bytes
添加 offset 索引字段大小间隔,默认 4096, 4KBtime index 时间戳索引
log.message.timestamp.type=CreateTime/logAppendTime
索引检索过程
为什么不用 B+tree 做索引结构?
log.cleaner.enable=true
默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据 log.cleaner.dedupe.buffer.size
,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看 log.cleaner.dedupe.buffer.size
和其他log.cleaner配置值。(0.9.0.1中的显著变化)log.cleanup.policy=delete/compact
超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:delete
和compact
log.retention.check.interval.ms=300 000
日志清理器检查是否有日志符合删除的频率(以毫秒为单位)log.retention.hours
日志删除的时间阈值(小时为单位) 默认 168 小时,即 1 个星期log.retention.minutes
日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值。log.retention.ms
日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值。log.retention.bytes
日志删除的大小阈值。log.segment.bytes
单个日志段文件最大大小。unclean.leader.election.enable
指定副本是否能够不在 ISR 中选举为 Leader,会导致数据丢失,默认为 false。主从同步
Follower 故障
Leader 故障
根据 offset 和时间戳进行消费
__consumer_offsets => topic 的存储结构
auto.offset.reset
,默认值为 lastestenable.auto.commit=true
enable.auto.commit=false
消费者分配
rebalance
针对分区少,消费者多的情况
磁盘寻址
Kafka 日志文件顺序存放 -> 磁盘顺序读写。
Sequential disk (磁盘顺序读写) 比 Random SSD (固态的随机读写)要更快。
零拷贝
零拷贝实现代码 传输层封装代码:PlaintextTransportLayer
MessageSet
接口的 writeTo
方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo
实现,而不在使用进程内的写缓存。线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接。这种设计方式经过大量的测试,发现它是实现简单而且快速的。协议保持简单以允许未来实现其他语言的客户端.