有两种方式同步数据到Hudi
注意:
write.precombine.field
changelog.enabled
转换到change log模式主要用于数据初始化导入。Bulk Insert不会进行数据去重,需要用户在数据插入前进行数据去重
Bulk Insert在batch execution mode下更高效
使用参数如下:
用于snapshot data + incremental data数据导入。snapshot data部分使用Bulk insert方式完成。incremental data进行实时导入
使用参数如下:
但是incremental data如何不丢失数据,又不重复导入数据:
详细使用步骤如下:
execution.checkpointing.tolerable-failed-checkpoints = n
index.bootstrap.enabled = true
index.bootstrap.enabled = false
注意:
使用参数如下:
保留消息的all changes(I / -U / U / D),Hudi MOR类型的表将all changes append到file log中,但是compaction会对all changes进行merge。如果想消费all changes,需要调整compaction参数:compaction.delta_commits
和 compaction.delta_seconds
Snapshot读取,永远读取merge后的结果数据
使用参数如下:
场景:使用Flink消费历史数据 + 实时增量数据,然后写入到Hudi。会造成写入吞吐量巨大 + 写入分区乱序严重,影响集群和application的稳定性。所以需要限制速率
使用参数如下:
默认是Batch query,查询最新的Snapshot
Streaming Query需要设置read.streaming.enabled = true
。再设置read.start-commit
,如果想消费所以数据,设置值为earliest
使用参数如下:
注意:如果开启read.streaming.skip_compaction
,但stream reader的速度比clean.retain_commits
慢,可能会造成数据丢失
有3种使用场景
read.start-commit
read.start-commit
和read.end-commit
,start commit和end commit都包含read.end-commit
为大于当前的一个instant time,read.start-commit
默认为latest使用参数如下: