最近测试Hadoop数据一致性,发现Hadoop SequenceFile BLOCK压缩类型(io.seqfile.compression.type=BLOCK)的文件存在数据丢失情况,对此进行研究并解决。
先来了解SequenceFile BLOCK压缩类型的数据写入机制:
BLOCK压缩类型的SequenceFile数据结构图
BLOCK压缩类型的SequenceFile.Writer实现类为SequenceFile.BlockCompressWriter,写入流程如下:
1.写入头部信息:版本信息,压缩类型信息,压缩算法类信息,keyClass/valueClass类名,Metadata等;
2.写入Sync标记;
3.将key和value序列化并写入缓存,当缓存大小达到阈值(默认io.seqfile.compress.blocksize=1000000字节),触发sync()操作,sync操作:先写入sync标记,再将缓存中的key和value进行压缩写入FSDataOutputStream,格式如上图中的Block compression。 这样就成功写入了一个block;
4. 后续的数据写入流程和3一样;
5. 当最后数据写入完成后,会存在最后一个block的数据小于io.seqfile.compress.blocksize,这样不会触发sync()操作,所以必须调用BlockCompressWriter的close()方法,其中会去调用sync()操作将最后剩余的数据写入FSDataOutputStream,并对FSDataOutputStream做close(),至此完成整个写入流程。
解决我的问题:
我的问题是由于在写入数据结束后只对FSDataOutputStream做了close()操作,而根据BLOCK压缩类型的写入必须调用Writer的close()操作,才能触发sync()操作将剩余数据压缩写入FSDataOutputStream。而在我的实现中,当设置io.seqfile.compression.type为NONE和RECORD,不会出现数据丢失。 因为这两种压缩类型,是来一条数据就写入outputStream,没有BLOCK这样的缓存后再写入机制。