首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

kafka源码剖析(三)之日志管理-LogManager

## 1 入口

```

/* start log manager */

// 启动日志管理模块

logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)

logManager.startup()

```

## 2 开启代码

```

/**

* Start the background threads to flush logs and do log cleanup

* 启动后台线程来冲洗日志和日志清理 依赖多线程

*/

def startup() {

/* Schedule the cleanup task to delete old logs */

if(scheduler != null) {

info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))

scheduler.schedule("kafka-log-retention",

cleanupLogs _,

delay = InitialTaskDelayMs,

period = retentionCheckMs,

TimeUnit.MILLISECONDS)

info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))

scheduler.schedule("kafka-log-flusher",

flushDirtyLogs _,

delay = InitialTaskDelayMs,

period = flushCheckMs,

TimeUnit.MILLISECONDS)

scheduler.schedule("kafka-recovery-point-checkpoint",

checkpointRecoveryPointOffsets _,

delay = InitialTaskDelayMs,

period = flushRecoveryOffsetCheckpointMs,

TimeUnit.MILLISECONDS)

scheduler.schedule("kafka-log-start-offset-checkpoint",

checkpointLogStartOffsets _,

delay = InitialTaskDelayMs,

period = flushStartOffsetCheckpointMs,

TimeUnit.MILLISECONDS)

scheduler.schedule("kafka-delete-logs",

deleteLogs _,

delay = InitialTaskDelayMs,

period = defaultConfig.fileDeleteDelayMs,

TimeUnit.MILLISECONDS)

}

if(cleanerConfig.enableCleaner)

cleaner.startup()

}

```

## 3核心代码

![image.png](http://upload-images.jianshu.io/upload_images/1731949-3cfdfc76991bb2a5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

### 3.1 相关配置信息

### 3.2 启动步骤zk 模块

```

// 首先先在zk 读取日志 这块就不多解释了

val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,

dedupeBufferSize = config.logCleanerDedupeBufferSize,

dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,

ioBufferSize = config.logCleanerIoBufferSize,

maxMessageSize = config.messageMaxBytes,

maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,

backOffMs = config.logCleanerBackoffMs,

enableCleaner = config.logCleanerEnable)

new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,

topicConfigs = topicConfigs,

defaultConfig = defaultLogConfig,

cleanerConfig = cleanerConfig,

ioThreads = config.numRecoveryThreadsPerDataDir,

flushCheckMs = config.logFlushSchedulerIntervalMs,

flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,

flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,

retentionCheckMs = config.logCleanupIntervalMs,

maxPidExpirationMs = config.transactionIdExpirationMs,

scheduler = kafkaScheduler,

brokerState = brokerState,

time = time,

brokerTopicStats = brokerTopicStats)

}

```

### 3.3 启动运行流程

```

threadsafe

class LogManager(val logDirs: Array[File],

val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation

val defaultConfig: LogConfig,

val cleanerConfig: CleanerConfig,

ioThreads: Int,

val flushCheckMs: Long,

val flushRecoveryOffsetCheckpointMs: Long,

val flushStartOffsetCheckpointMs: Long,

val retentionCheckMs: Long,

val maxPidExpirationMs: Int,

scheduler: Scheduler,

val brokerState: BrokerState,

brokerTopicStats: BrokerTopicStats,

time: Time) extends Logging {

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"

val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"

val LockFile = ".lock"

val InitialTaskDelayMs = 30*1000

private val logCreationOrDeletionLock = new Object

private val logs = new Pool[TopicPartition, Log]()

private val logsToBeDeleted = new LinkedBlockingQueue[Log]()

// 检查日志目录是否被创建,如果没有创建目录,同时检查目录是否有读写的权限.

createAndValidateLogDirs(logDirs)

// 生成每个目录的.lock文件,并通过这个文件锁定这个目录.

private val dirLocks = lockLogDirs(logDirs)

// 根据每个目录下的recovery-point-offset-checkpoint文件,生成出checkpoints的集合.这个用于定期更新每个partition的offset记录.

private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap

private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap

// 根据每一个目录,生成一个线程池,线程池的大小是num.recovery.threads.per.data.dir配置的值,

// 读取每个目录下的topic-partitionid的目录,并根据zk中针对此topic的配置文件(或者默认的配置文件),通过offset-checkpoint中记录的此partition对应的offset,生成Log实例.并通过线程池来执行Log实例的加载,也就是日志的恢复.

loadLogs()

// public, so we can access this from kafka.admin.DeleteTopicTest

val cleaner: LogCleaner =

if(cleanerConfig.enableCleaner)

new LogCleaner(cleanerConfig, logDirs, logs, time = time)

else

null

```

### 3.4 清理过期日志

```

/**

* Runs through the log removing segments older than a certain age

*/

private def cleanupExpiredSegments(log: Log): Int = {

return 0

val startMs = time.milliseconds

}

```

这块又涉及到一个配置:retention.ms,这个参数表示日志保存的时间。如果小于0,表示永不失效,也就没有了删除这一说。

当然,如果文件的修改时间跟当前时间差,大于设置的日志保存时间,就要执行删除动作了。具体的删除方法为:

```

/**

* Delete any log segments matching the given predicate function,

* starting with the oldest segment and moving forward until a segment doesn't match.

* @param predicate A function that takes in a single log segment and returns true iff it is deletable

* @return The number of segments deleted

*/

def deleteOldSegments(predicate: LogSegment => Boolean): Int = {

lock synchronized {

//find any segments that match the user-supplied predicate UNLESS it is the final segment

//and it is empty (since we would just end up re-creating it)

val lastEntry = segments.lastEntry

val deletable =

if (lastEntry == null) Seq.empty

else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))

val numToDelete = deletable.size

if (numToDelete > 0) {

// we must always have at least one segment, so if we are going to delete all the segments, create a new one first

if (segments.size == numToDelete)

roll()

// remove the segments for lookups

deletable.foreach(deleteSegment(_))

}

numToDelete

}

}

```

这块的逻辑是:根据传入的predicate来判断哪些日志符合被删除的要求,放入到deletable中,最后遍历deletable,进行删除操作。

```

private def deleteSegment(segment: LogSegment) {

info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))

lock synchronized {

segments.remove(segment.baseOffset)

asyncDeleteSegment(segment)

}

}

private def asyncDeleteSegment(segment: LogSegment) {

segment.changeFileSuffixes("", Log.DeletedFileSuffix)

def deleteSeg() {

info("Deleting segment %d from log %s.".format(segment.baseOffset, name))

segment.delete()

}

scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)

}

```

### 3.5 清理过大日志

```

/**

* Runs through the log removing segments until the size of the log

* is at least logRetentionSize bytes in size

*/

private def cleanupSegmentsToMaintainSize(log: Log): Int = {

return 0

def shouldDelete(segment: LogSegment) = {

if(diff - segment.size >= 0) {

diff -= segment.size

true

} else {

false

}

}

log.deleteOldSegments(shouldDelete)

}

```

这块代码比较清晰,如果日志大小大于retention.bytes,那么就会被标记为待删除,然后调用的方法是一样的,也是deleteOldSegments。就不赘述了。

### 3.6 定期对log的磁盘缓冲区进行flush:

这个通过后台的调度组件定期去执行LogManager中的flushDirtyLogs的函数,

这个函数中迭代所有的partition的log,并执行flush的操作,这个操作中通过当前最后一个offset找到上一次进行checkpoint的offset与当前的offset中间的segment,并执行segment中log与index的flush操作.对应log文件执行文件管道的force函数,对于index文件,执行文件管道map的force函数.

```

private def flushDirtyLogs() = {

debug("Checking for dirty logs to flush...")

for ((topicAndPartition, log)

try {

val timeSinceLastFlush = time.milliseconds - log.lastFlushTime

debug("Checking if flush is needed on " + topicAndPartition.topic

" last flushed " + log.lastFlushTime + " time since last flush: "

+ timeSinceLastFlush)

log.flush

} catch {

case e: Throwable =>

error("Error flushing topic " + topicAndPartition.topic, e)

}

}

}

```

### 3.7 定期对partition的offset进行checkpoint操作:

这个通过后台的调度组件定期去

执行LogManager中的checkpointRecoveryPointOffsets的函数,

```

def checkpointRecoveryPointOffsets() {

this.logDirs.foreach(checkpointLogsInDir)

}

```

这里对每个dir中存储的partition的最后一个offset进行checkpoint的操作.

在这个函数中,迭代每个dir中对应的partition的offset记录到对应目录下的checkpoint文件中.

第一行写入一个0,表示是checkpoint文件的版本.

第二行写入的是partition的个数,当前checkpoint时,这个dir已经存在数据的partition的个数.

后面对应第二个的值个数的条数的数据,每条数据写入topic partition offset的值.

```

private def checkpointLogsInDir(dir: File): Unit = {

val recoveryPoints = this.logsByDir.get(dir.toString)

if (recoveryPoints.isDefined) {

_.recoveryPoint))

}

}

```

LogCleaner实例中,定期执行的日志压缩:

这个实例中,通过CleanerThread的线程进行处理:

1.

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180322G1HHHY00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券