前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >厉害了!一文撕开Kafka Compact Topic神秘面纱

厉害了!一文撕开Kafka Compact Topic神秘面纱

作者头像
用户9421738
发布于 2024-08-06 07:18:06
发布于 2024-08-06 07:18:06
48400
代码可运行
举报
文章被收录于专栏:大数据从业者大数据从业者
运行总次数:0
代码可运行

背景

随着平台Kafka的对接客户越来越多,我发现很多人只知道Kafka Topic可以根据设置保存大小和保存时间触发数据清理机制,但是并不熟悉Kafka Topic另一种清理策略compact。遂有此文,本文主要介绍compact原理、相关配置、实践案例操作记录、相关源码分析等内容。欢迎关注微信公众号:大数据从业者

Compact原理

Kafka数据清理策略是由log.cleanup.policy参数决定的,当前支持两种策略:delete(普通主题默认)、compact(系统主题默认)。两种策略可以同时使用,互不冲突。所以,log.cleanup.policy可以设置为delete或compact或delete,compact。本文暂不涉及delete清理策略,只讲述compact清理策略。Kafka系统主题__consumer_offsets默认清理策略就是compact。

强调一点:compact策略仅对Topic内同时携带key和value的消息有效。换句话说,如果需要使用compact策略,那么producer发送的消息需要同时携带key和value。

我们知道Topic是由Partition组成的,producer将消息写入Partition,每条消息都会被分配一个唯一且不可变的offset。如下图所示:

如果清理策略是delete,那么当满足保存大小或者保存时间的条件时,触发数据清理机制。指定offset之前的消息都将被删除,也就是Delete Retention Point之前的消息,如下图所示:

换句话说,delete策略不会考虑消息的key或value是什么,更不考虑有没有相同的Key消息存在。而compact策略则会考虑同一分区内的相同key的消息,最终只保留相同key的消息中最新的value对应的消息。如下图所示,原始数据中K1有三条消息,经过compact处理,只保留K1:V4这一条消息。个人感觉,该过程称为compact并不是很贴切,应该称为update之类的。

Compact策略适用于只想保留当前快照而不是完整修改历史的场景。比如:为了保存员工工资信息,可以创建主题employee-salary且设置compact策略,如下图所示:

Compact关键保证

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1.不影响没有消费延迟的consumer获取所有消息。换句话说,compact只会操作非active segment,而没有消费延迟的consumer正在消费active segment。    

2.Compact不会改变消息的offset值、key值、partition值、前后顺序。只是删除一些消息。

3.在log.cleaner.delete.retention.ms(默认24H)时间内,消费者仍能消费到待删除的消息。 

除了正常携带key和value的消息之外,compact还有一种特殊消息:key正常但value=null,这种消息称为tombstone消息。tombstone消息进行合没有意义,所以compact会删除这类消息。欢迎关注公众号:大数据从业者

Compact配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
log.cleanup.policy:清理策略(delete或compact或delete,compact)

log.cleaner.enable:是否启用compact清理任务

log.cleaner.threads:compact清理任务的线程数

log.segment.bytes:segmemnt文件的最大字节

log.segment.ms:segment保持active的最大时间

log.cleaner.backoff.ms: 清理任务闲时休眠时间

log.cleaner.min.compaction.lag.ms:触发compact的最小延迟时间

log.cleaner.max.compaction.lag.ms:触发compact的最大延迟时间

log.cleaner.dedupe.buffer.size:清理任务线程用于去重的内存

log.cleaner.delete.retention.ms:compact删除消息延迟删除时间    

log.cleaner.io.buffer.load.factor:清理任务线程 IO buffer负载率

log.cleaner.io.buffer.size:清理任务线程IO buffer内存

log.cleaner.io.max.bytes.per.second:清理任务线程IO限速

log.cleaner.min.cleanable.ratio:触发compact的脏数据比例

实践案例

1.创建测试主题

为便于测试,设置min.cleanable.dirty.ratio=0.001、segment.ms=5000以保证compact清理任务尽快执行,设置partitions=1以保证测试消息写入相同分区。

2.描述测试主题

3.启动消费者

4.启动生产者,发送测试消息

消息内容故意加入重复key,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Patrick,salary: 10000

Lucy,salary: 20000

Bob,salary: 20000

Patrick,salary: 25000

Lucy,salary: 30000

Patrick,salary: 30000

5.查看第3步消费消息

可以看到消费到所有消息,证实上文Compact关键保证之一:不影响没有消费延迟的consumer获取所有消息。

6.等待一分钟,继续生产消息,如:Stephane,salary: 0

7.启动新的消费者

可以看到,经过compact清理,上述第4步发送的重复消息只保留最新value。

源码剖析

KafkaServer.startup会启动LogManager,LogManager.startup会启动一个Schedule线程池和一个LogCleaner(内部启动CleanerThread)。Schedule线程池中有一个任务为kafka-log-retention,对应于delete清理策略;而LogCleaner对应于compact清理策略。

本文只讲述compact相关的LogCleaner,其startup方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**

   * Start the background cleaning

   */

  def startup() {

    info("Starting the log cleaner")

// cleaner线程数通过参数log.cleaner.threads配置,默认为1

    (0 until config.numThreads).foreach { i =>

      val cleaner = new CleanerThread(i)

      cleaners += cleaner

      cleaner.start()

    }

  }

接下来主要看下CleanerThread类主流程,位置在LogCleaner.scala文件。

主流程如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
doWork -> cleanFilthiestLog ->  grabFilthiestCompactedLog -> cleanLog -> clean -> doClean

篇幅有限,doClean方法内容不再介绍。感兴趣请自行阅读。如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala#L594

另外LogCleaner提供metric,方便问题排查和性能调优,如下所示:

总结

通过阅读本文,可以掌握compact原理、配置、实践操作、源码分析等内容。至此,Kafka Compact Topic使用与调优轻松拿捏!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
k8s部署Kafka集群
本次的目的是通过使用k8s搭建一个三节点的kafka集群,因为kafka集群需要用到存储,所以我们需要准备三个持久卷(Persistent Volume) 简称就是PV。
yukong
2020/07/28
4.3K0
k8s部署Kafka集群
Kafka学习笔记之Kafka日志删出策略
kafka将topic分成不同的partitions,每个partition的日志分成不同的segments,最后以segment为单位将陈旧的日志从文件系统删除。
Jetpropelledsnake21
2019/07/03
1.9K0
Kafka-主题(Topic)介绍和使用
我们上一章介绍了中间件:Zookeeper,本章将介绍另外一个中间件:Kafka。目前这2个中间件都是基于JAVA语言。
运维小路
2025/06/07
2030
Kafka-主题(Topic)介绍和使用
Kafka初识
问题一 写出增加Kafka的Partition命令 bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 问题二 列出配置Kafka删除日志的配置参数 参数 说明(解释) log.roll.hours =24*7 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segme
小爷毛毛_卓寿杰
2019/02/13
3580
『互联网架构』kafka集群原理(117)
PS:原理至关重要,面试的时候不可能问你命令的,都是问原理,懂了原理线上如果使用kafka出了问题才可能快速定位,而不是一脸蒙圈。必须要明白原理,如果不说原理直接实战,就真成搬砖了。
IT架构圈
2019/07/30
7640
『互联网架构』kafka集群原理(117)
k8s部署kafka集群「建议收藏」
可以看到,zookeeper集群状态正常,zk-1 是 leader ,在 zk-0 创建的数据在集群中所有的服务上都是可用的。
全栈程序员站长
2022/09/02
2.6K0
k8s部署kafka集群「建议收藏」
【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议保存)
本文所有命令,博主均全部操作验证过,保证准确性; 非复制粘贴拼凑文章; 如果想了解更多工具命令,可在评论区留下评论,博主会择期加上;
石臻臻的杂货铺[同名公众号]
2021/12/31
2.3K1
【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议保存)
Kafka集群安装
①.kafka需要依赖zk管理,在搭建kafka集群之前需要先搭建zk集群: https://my.oschina.net/u/2486137/blog/1537389 ②.从apache kafka官网下载kafka( 二进制版本)        注意下载的版本否则会在启动时报错:找不到主类Kafka.kafka. 我这里使用的是2.10版本. ③.配置config/server.properties文件: # Licensed to the Apache Software Foundation (ASF
用户1215919
2018/02/27
1.2K0
kafka其他配置
#表示消息体的最大大小,单位是字节 message.max.bytes=6525000 #一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 background.threads =4 #等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息 queued.max.requests =500 #broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 host.name=loca
陈不成i
2021/07/07
4620
分布式实时消息队列Kafka(五)
分布式实时消息队列Kafka(五) 知识点01:课程回顾 一个消费者组中有多个消费者,消费多个Topic多个分区,分区分配给消费者的分配规则有哪些? 分配场景 第一次消费:将分区分配给消费者 负载均衡实现:在消费过程中,如果有部分消费者故障或者增加了新的消费 基本规则 一个分区只能被一个消费者所消费 一个消费者可以消费多个分区 分配规则 范围分配 规则:每个消费者消费一定范围的分区,尽量均分,如果不能均分,优先分配给标号小的 应用:消费比较少的Top
Maynor
2021/12/07
9810
分布式实时消息队列Kafka(五)
kafka 数据清除机制
在Kafka中,存在数据过期的机制,称为data expire。如何处理过期数据是根据指定的policy(策略)决定的,而处理过期数据的行为,即为log cleanup。
用户1217611
2022/05/06
2.4K0
kafka 数据清除机制
Kafka-3.配置-Topic Config
和主题有关的配置既有服务器的默认值,也有可选的per-topic的覆盖值。如果没有per-topic的配置值,就用服务器的默认值。覆盖值能在创建主题的时候用一个或多个--config选项来设置。以下示例用一个名为my-topic的主题,其中包含自定义最大消息大小和刷新率:
悠扬前奏
2019/06/11
1.9K0
Kafka 架构及原理分析
为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。
架构探险之道
2023/03/04
6020
Kafka 架构及原理分析
深入理解Kafka必知必会(2)
__consumer_offsets:作用是保存 Kafka 消费者的位移信息 __transaction_state:用来存储事务日志消息
luozhiyun
2019/11/22
1.2K0
2.【kafka运维】ConfigCommand运维脚本
删除配置: --delete-config k1=v1,k2=v2 添加/修改配置: --add-config k1,k2 选择类型: --entity-type (topics/clients/users/brokers/broker- loggers) 类型名称: --entity-name
石臻臻的杂货铺[同名公众号]
2022/04/13
2.6K0
2.【kafka运维】ConfigCommand运维脚本
刨根问底 Kafka,面试过程真好使
Kafka最初是由Linkedin公司开发的,是一个分布式的、可扩展的、容错的、支持分区的(Partition)、多副本的(replica)、基于Zookeeper框架的发布-订阅消息系统,Kafka适合离线和在线消息消费。它是分布式应用系统中的重要组件之一,也被广泛应用于大数据处理。Kafka是用Scala语言开发,它的Java版本称为Jafka。Linkedin于2010年将该系统贡献给了Apache基金会并成为顶级开源项目之一。
蔡不菜丶
2022/12/19
6070
刨根问底 Kafka,面试过程真好使
Kafka定时清除过期数据
Kafka将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。
挖掘大数据
2018/01/19
6.2K0
kafka实战教程(python操作kafka),kafka配置文件详解
应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?
全栈程序员站长
2022/08/12
3.3K0
kafka实战教程(python操作kafka),kafka配置文件详解
Kafka运维填坑Kafka源码分析-汇总
调用Runtime.getRuntime.halt(1)直接暴力退出了. 可参考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"
扫帚的影子
2018/09/05
2.2K0
[721]linux安装kafka
首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用
周小董
2020/01/13
2.8K0
[721]linux安装kafka
相关推荐
k8s部署Kafka集群
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验