目前我们的大数据系统里,主要承接的业务是部门内的一些业务日志数据的统计、分析等,比如网关日志数据,服务器监控数据,k8s容器的相关日志数据,app的打点日志等。主要的流任务是flink任务是消费kafka的数据,经过各种处理之后通过flink sql或者flink jar实时写入hive,由于业务对数据的实时性要求比较高,希望数据能尽快的展示出来,所以我们很多的flink任务的checkpoint设置为1分钟,而数据格式采用的是orc格式,所以不可避免的出现了一个在大数据处理领域非常常见但是很麻烦的问题,即hdfs小文件问题。
开始的时候我们的小文件解决方案是自己用spark写的一个小文件压缩工具,定期的去合并,我们的hive分区一般都是天级别的,所以这个工具的原理就是每天的凌晨启动一个定时任务去压缩昨天的数据,首先把昨天的数据写入一个临时文件夹,压缩完,和原来的数据进行记录数的比对检验,数据条数一致之后,用压缩后的数据覆盖原来的数据,但是由于无法保证事务,所以出现了很多的问题:
所以基于以上的一些问题,我调研了数据湖技术,由于我们的计算引擎主要是flink为主,查询引擎是prestosql,prestosql也提供了原生的connector支持。在对iceberg进行功能测试和简单代码review之后,发现iceberg还有一些功能需要优化和bug修复,不过我觉得应该能hold的住,不完善的地方和需要优化的地方我们自己来补全,所以最终引入了iceberg来解决小文件的问题。
除此之外,对于一些其他的问题,比如cdc数据的接入,以及使用sql进行删除和更新等一些hive不好完成的工作,后续也可以通过数据湖技术来解决。
经过一系列对iceberg的测试,包括流式数据写入、批任务读写,数据查询等,在测试通过之后决定将原来的hive表迁移到iceberg。
我们的主要使用场景是使用flink将kafka的流式数据写入到Iceberg,具体的flink+iceberg的使用方式我就不在赘述了,大家可以参考官方的文档:https://iceberg.apache.org/flink/ 。
为了代码的简洁以及可维护性,我们尽量将程序使用sql来编写,也有部分jar任务,使用的任务管理工具是zeppelin,在里面添加iceberg 的catalog,提交sql任务是使用zeppelin自带的功能,提交jar任务是我自己写的一个zeppelin插件。
在社区上看到过一些小问题,有不止一个人遇到过,在这里给大家强调一下:
CREATE CATALOG iceberg
创建iceberg catalog,然后使用use catalog iceberg
之后,不要在这里创建非iceberg的table,这时候会出现不报错,但是也写不进去数据的情况。目前压缩小文件是采用的一个额外批任务来进行的,Iceberg提供了一个spark版本的action,我在做功能测试的时候发现了一些问题,比如会对一些文件重复压缩,对orc数据文件获取文件长度不正确等等,此外我对spark也不是非常熟悉,担心出了问题不好排查,所以参照spark版本的自己实现了一个flink版本,并修复了一些bug,进行了一些功能的优化。
由于我们的iceberg的元数据都是存储在hive中的,也就是我们使用了HiveCatalog,所以压缩程序的逻辑是我把hive中所有的iceberg表全部都查出来,依次压缩。压缩没有过滤条件,不管是分区表还是非分区表,都进行全表的压缩。这样做是为了处理某些使用eventtime的flink任务,如果有延迟的数据的到来。就会把数据写入以前的分区,如果不是全表压缩只压缩当天分区的话,新写入的其他天的数据就不会被压缩。
之所以没有开启定时任务来压缩,是因为比如我定时五分钟压缩一个表,如果五分钟之内这个压缩任务没完成,没有提交新的snapshot,下一个定时任务又开启了,就会把上一个没有完成的压缩任务中的数据重新压缩一次,所以每个表依次压缩的策略可以保证某一时刻一个表只有一个任务在压缩。
代码示例参考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Actions.forTable(env, table)
.rewriteDataFiles()
//.maxParallelism(parallelism)
//.filter(Expressions.equal("day", day))
//.targetSizeInBytes(targetSizeInBytes)
.execute();
目前系统运行稳定,已经完成了18000多次任务的压缩
注意:
不过目前对于新发布的iceberg 0.11来说,还有一个已知的bug,就是当压缩前的文件大小大于要压缩的大小(targetSizeInBytes)的时候,会造成数据丢失,其实这个问题我在最开始测试小文件压缩的时候就发现了,并且提了一个pr,我的策略是大于目标文件的数据文件不参与压缩,不过这个pr没有合并到0.11版本中,后来社区另外一个兄弟也发现了相同的问题,提交了一个pr( https://github.com/apache/iceberg/pull/2196 ) ,策略是将这个大文件拆分到目标文件大小,目前已经合并到master,会在下一个bug fix版本0.11.1中发布。
我们的快照过期策略,我是和压缩小文件的批处理任务写在一起的,压缩完小文件之后,进行表的快照过期处理,目前保留的时间是一个小时,这是因为对于有一些比较大的表,分区比较多,而且checkpoint比较短,如果保留的快照过长的话,还是会保留过多小文件,我们暂时没有查询历史快照的需求,所以我将快照的保留时间设置了一个小时。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
table.expireSnapshots()
// .retainLast(20)
.expireOlderThan(olderThanTimestamp)
.commit();
写入了数据之后,有时候我想查看一下相应的快照下面有多少数据文件,直接查询hdfs你不知道哪个是有用的,哪个是没用的。所以需要有对应的管理工具。目前flink这块还不太成熟,我们可以使用spark3提供的工具来查看。
目前create table 这些操作我们是通过flink sql client来做的。 其他相关的ddl的操作可以使用spark来做:
https://iceberg.apache.org/spark/#ddl-commands
一些相关的数据的操作,比如删除数据等可以通过spark来实现,presto目前只支持分区级别的删除功能。
在我们操作hive的时候,有一些很常用的操作,比如show partitions、 show create table 等,这些目前flink还没有支持,所以在操作iceberg的时候就很不方便,我们自己基于flink 1.12做了修改,不过目前还没有完全提交到社区,后续有时间会提交到flink 和iceberg 社区。
在使用iceberg的过程中,有时候会有这样的情况,我提交了一个flink任务,由于各种原因,我把它给停了,这个时候iceberg还没提交相应的快照。还有由于一些异常导致程序失败,就会产生一些不在iceberg元数据里面的孤立的数据文件,这些文件对iceberg来说是不可达的,也是没用的。所以我们需要像jvm的垃圾回收一样来清理这些文件。
目前iceberg提供了一个spark版本的action来进行处理这些没用的文件,我们采取的策略和压缩小文件一样,获取hive中的所有的iceberg表。每隔一个小时执行一次定时任务来删除这些没用的文件。
SparkSession spark = ......
Actions.forTable(spark, table)
.removeOrphanFiles()
//.deleteWith(...)
.execute();
在程序运行过程中出现了正常的数据文件被删除的问题,经过调研,由于我的快照保留设置是一小时,这个清理程序清理时间也是设置一个小时,通过日志发现是这个清理程序删除了正常的数据。查了查代码,应该是他们设置了一样的时间,在清理孤立文件的时候,有其他程序正在读取这个要expired的snapshot,导致删除了正常的数据。最后把这个清理程序的清理时间改成默认的三天,没有再出现删除数据文件的问题。 当然,为了保险起见,我们可以覆盖原来的删除文件的方法,改成将文件到一个备份文件夹,检查没有问题之后,手工删除。
目前我们使用的版本是prestosql 346,这个版本安装的时候需要jdk11,presto查询iceberg比较简单。官方提供了相应的conncter,我们配置一下就行,
//iceberg.properties
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
目前查询iceberg的批处理任务,使用的flink的客户端,首先我们启动一个基于yarn session 的flink集群,然后通过sql客户端提交任务到集群。
主要的配置就是我们需要根据数据的大小设置sql任务执行的并行度,可以通过以下参数设置。
set table.exec.resource.default-parallelism = 100;
此外我在sql客户端的配置文件里配置了hive和iceberg相应的catalog,这样每次客户端启动的时候就不需要建catalog了。
catalogs: # empty list
- name: iceberg
type: iceberg
warehouse: hdfs://localhost/user/hive2/warehouse
uri: thrift://localhost:9083
catalog-type: hive
cache-enabled: false
- name: hive
type: hive
hive-conf-dir: /Users/user/work/hive/conf
default-database: default
目前对于定时调度中的批处理任务,flink的sql客户端还没hive那样做的很完善,比如执行hive -f来执行一个文件。而且不同的任务需要不同的资源,并行度等。 所以我自己封装了一个flink程序,通过调用这个程序来进行处理,读取一个指定文件里面的sql,来提交批任务。在命令行控制任务的资源和并行度等。
/home/flink/bin/flink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
目前我们的所有数据都是存储在hive表的,在验证完iceberg之后,我们决定将hive的数据迁移到iceberg,所以我写了一个工具,可以使用hive的数据,然后新建一个iceberg表,为其建立相应的元数据,但是测试的时候发现,如果采用这种方式,就需要把写入hive的程序停止,因为如果iceberg和hive使用同一个数据文件,而压缩程序会不断地压缩iceberg表的小文件,压缩完之后,不会马上删除旧数据,所以hive表就会查到双份的数据。 鉴于iceberg测试的时候还有一些不稳定,所以我们采用双写的策略,原来写入hive的程序不动,新启动一套程序写入iceberg,这样能对iceberg表观察一段时间。还能和原来hive中的数据进行比对,来验证程序的正确性。
经过一段时间观察,每天将近20亿条数据、压缩后1.2T大小的hive表和iceberg表,一条数据也不差。所以在最终对比数据没有问题之后,把hive表停止写入,使用新的iceberg表,然后把hive中的旧数据导入到iceberg。
我将这个hive表迁移iceberg表的工具做成了一个基于flink batch job的iceberg Action,提交了社区,不过目前还没合并:https://github.com/apache/iceberg/pull/2217 , 这个功能的思路是使用hive原始的数据不动,然后新建一个iceberg table,然后为这个新的iceberg table 生成对应的元数据,大家有需要的话可以先看看。
此外,iceberg社区,还有一个把现有的数据迁移到已存在的iceberg table的工具,类似hive的LOAD DATA INPATH ... INTO TABLE
,是用spark的存储过程做的,大家也可以关注下:https://github.com/apache/iceberg/pull/2210
目前在我们内部的版本中,我已经测试通过可以使用flink sql 将cdc数据(比如mysql binlog)写入iceberg,社区的版本中实现该功能还需要做一些工作,比如目前的IcebergTableSink
还没实现UpsertStreamTableSink
,不过这个接口在flink1.12中已经过期了,所以我把iceberg的table source和table sink使用新接口重构了,提交了相应的pr,还没有合并到master。
此外还有一些其他的问题,比如RewriteDataFileAction对于压缩cdc的数据还有一些序列化相关的问题,也正在处理中。如果顺利的话,这个功能我觉得会在iceberg 0.12版本中发布。
目前trino(原来的prestosql)可以使用sql删除对应的iceberg表。(只支持删除分区数据)。
对于copy-on-write表,我们可以使用spark sql来进行行级的删除和删除。具体的支持的语法可以参考源码中的测试类:org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,这些功能我在测试环境测试是可以的,但是还没有来得及更新到生产。
目前由于flink暂时还不支持delete、update等语法,所以我们还暂时无法用flink来操作iceberg。我提了一些flink issue来跟踪这些问题:https://issues.apache.org/jira/browse/FLINK-21282 & https://issues.apache.org/jira/browse/FLINK-21281 ,等后续flink支持之后,我们就可以完善flink模块的这部分功能。
在工作中会有一些这样的需求,比如我们有一个天级别的窗口,去消费kafka的数据,然后发给下游,但是由于数据比较大,kafka的数据只存了两个小时,如果很不幸,你的flink程序挂了,两个小时之内因为各种原因没有修复,当你再启动的时候就会造成数据丢失。
此外,如果我们逻辑写错了,想从某一时刻开始消费,在kafka中,我们可以指定相应的offset,但是如果kafka只保留了两个小时,我们想从一天前开始消费,那么kafka就无能为力了。
当引入了iceberg的streaming read之后,这些问题就可以解决了,因为iceberg存储了所有的数据,当然这里有一个前提就是对于数据没有要求特别精确,比如达到秒级别,因为目前flink写入iceberg的事务提交是基于flink checkpoint间隔的。
经过对iceberg大概一个季度的调研,测试,优化和bug修复,我们将现有的hive表都迁移到了iceberg,完美解决了原来的所有的痛点问题,目前系统稳定运行,而且相对hive得到了很多的收益:
总结一下,我们目前可以实现使用flink sql 对iceberg进行批、流的读写,并可以对小文件进行实时的压缩,使用spark sql做一些delete和update工作以及一些DDL操作,后续可以使用flink sql 将cdc的数据写入iceberg,目前对iceberg的所有的优化和bug fix,我已经贡献给社区,剩下的优化工作我后续也会陆续提交相应的pr,推回社区。由于笔者水平有限,有时候也难免有错误,还请大家不吝赐教。