在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms
来配置,默认值为300000,即5分钟。当前日志分段的保留策略有3种:
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志分段文件集合,如下图所示。阈值可以通过 broker 端参数 log.retention.hours
、log.retention.minutes
和 log.retention.ms
来配置,其中 log.retention.ms
的优先级最高,log.retention.minutes
次之,log.retention.hours
最低。默认情况下只配置了 log.retention.hours
参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间 lastModifiedTime 来计算的,而是根据日志分段中最大的时间戳 largestTimeStamp 来计算的。因为日志分段的 lastModifiedTime 可以被有意或无意地修改,比如执行了 touch 操作,或者分区副本进行了重新分配,lastModifiedTime 并不能真实地反映出日志分段在磁盘的保留时间。
要获取日志分段中的最大时间戳 largestTimeStamp 的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则才设置为最近修改时间 lastModifiedTime。
一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用 KafkaAdminClient 的 deleteRecords() 方法、使用 kafka-delete-records.sh 脚本)、日志的清理和截断等操作进行修改。
基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,若是,则可以删除此日志分段。如上图所示,假设 logStartOffset 等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移量为23,通过如下动作收集可删除的日志分段的文件集合 deletableSegments:
收集完可删除的日志分段的文件集合之后的删除操作同基于日志大小的保留策略和基于时间的保留策略相同,这里不再赘述。
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合,如下图所示。阈值可以通过 broker 端参数 log.retention.bytes
来配置,默认值为-1,表示无穷大。注意 log.retention.bytes
配置的是 Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为 .log 日志文件)的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes
来限制,默认值为1073741824,即 1GB。
基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小 size 和阈值的差值 diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合。查找出它之后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同,这里不再赘述。
总结
Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 中每一个分区副本都对应一个 Log,而 Log 又可以分为多个日志分段,这样也便于日志的清理操作。Kafka 提供了删除的方式来清理日志:
其实Kafka还有一种日志清理策略那就是通过针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本的方式来对Kafka日志进行清理。对于压缩的细节这里不再赘述。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。