Kafka的存储层是顺序文件,如果你要是开发一个应用,自己写文件来保存状态,那么有一件事情你就必须关注:这个文件我写的完整么?文件有没有损坏?说到这里很多同学可能想到了checksum,就是类似使用MD5等哈希算法计算文件的哈希,下次读取的时候再算一遍,看看哈希是否对的上,那么Kafka又是怎么来确保文件的完整性的呢?
其实在前一篇文章中,已经提到过,Kafka启动的时候会检查每个目录是否有一个.kafka_cleanshutdown文件,如果存在这个文件说明Kafka是走正常的关闭逻辑关闭的,否则认为是异常关闭,异常关闭的时候会走一个恢复逻辑。
那么首先看Kafka的关闭逻辑。
Kafka.scala这个类里注册一个shutdown hook,在正常关闭的时候走shutdown逻辑。
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
这里我们最关注的是LogManager的shutdown。
log.flush很好理解,我们都知道在Linux,如果不是O_Direct这种方式打开的文件,write都是先写到page cache里的,Linux会在后台将内容flush到物理存储介质,比如磁盘。而如果调用flush方法则会强制同步的flush到磁盘,但是一般为了提高写性能,在接收消息的时候不会同步flush,所以Kafka停止的时候就会强制flush一次了。每次flush的时候Kafka都会更新一个叫recoveryPoint的offset到当前offset。这个recoveryPoint就代表每个partition哪个offset之前的已经flush到磁盘了,这个在每个日志目录里有一个recovery-point-offset-checkpoint的文件,这个文件会周期性刷新,记录最新的各个partition的recoverPoint。
然后是close就不说了。
做完这些事情如果没有任何问题后,Kafka会在每个日志创建一个.kafka_cleanshutdown的标识文件,文件内容是空的,就是起一个标识的作用,表示Kafka是走正常关闭逻辑。
以上就是Kafka的停止逻辑了,遗憾的是我们并没有看到Kafka写了什么checksum之类的,这个我们下文还会再次提到。
现在回到启动逻辑,在上一篇文章里我们稍微提到了,Kafka启动的时候如果检查日志目录里没有.kafka_cleanshutdown这个文件,就会进入一个recover逻辑。这个逻辑还是比较重的,不过还好的是他不是对所有日志文件都进行recover,还记得前面提到的recoveryPoint么,启动的时候会读取这个文件,这个文件里记录的就是上一次flush到磁盘时候的offset,Kafka会从这个offset开始recover。
那我们再来看这个recover逻辑主要干了啥呢,首先会把对应LogSegment的.index干掉,然后就开始遍历了,遍历消息就是顺序往下读取一个个消息了,Kafka中每个消息格式如下:
offset(8 bytes)
size(4 bytes)
MessageHeader(14 bytes)
Crc(4 bytes)
Magic(1 byte)
Attributes(1 byte)
KeySize(4 bytes)
ValueSize(4 bytes)
读取消息之后这个header里不是有一个crc么,这相当于每个消息的checksum了,这里会计算一下读取的buffer的crc和header里记录的crc相比较,如果不想等则校验不通过。那么这里其实就是两个校验逻辑:消息的大小是否大于配置的最大消息大小,crc校验是否通过。如果这里校验不通过则这个LogSegment就截断了,后面的整个都不要了。
不仅如此,假设这个partition有10个log文件,检查到第8个的时候有不合法的消息,那么从这个消息开始位置到这个log文件末尾内容全部丢弃,而且第9, 10两个log也删掉。嗯,这样删掉怎么办呢?如果这个topic有多个replicas,那是可以同步过来的。
recover逻辑执行完毕后,我们就可以将.kafka_cleanshutdown标识文件删掉了。
上面差不多就是Kafka确保文件完整的整个机制了,这个机制非常依赖flush操作,Kafka好像认为只要flush到磁盘,那么文件就不会损坏了。但其实这种机制是不够的,比如:
在Kafka正常停止后,人为破坏log文件,虽然有crc,但是仍然无法检查出来。因为正常关闭的,Kafka根本就不走recover逻辑
如果正常关闭后,磁盘出现故障,导致文件损坏,那么Kafka也是不会进行检测的,原理同上(这个在我们实际中确实碰到了)。
所以如果碰到上面这两种情况就比较悲剧了,因为启动的时候Kafka根本没有进行检查,而实际消费消息的时候又碰到文件损坏,那就会出错了,不过如果真遇到这种情况,我们是否就可以停止kafka,然后是删除.kafka_cleanshutdown文件,让Kafka做一次自检呢?
另外不知道为啥kafka不做完整文件的checksum呢?
领取专属 10元无门槛券
私享最新 技术干货