Apache Hudi 0.13.0引入了一系列新特性,包括Metaserver, Change Data Capture, new Record Merge API, new sources for Deltastreamer等。虽然此版本不需要表版本升级,但希望用户在使用 0.13.0 版本之前按照下面的迁移指南采取相关重大更改和行为更改的操作。
此版本与 0.12.0 版本保持相同的表版本 (5),如果您从 0.12.0 升级,则无需升级表版本。 如下所述,存在一些重大更改和行为更改,希望用户在使用 0.13.0 版本之前采取相应的措施。
从现在开始,hudi-spark3.2-bundle 可与 Apache Spark 3.2.1 和 Spark 3.2.x 的更新版本一起使用。 由于 HiveClientImpl 的 getHive 方法的 Spark 实现更改在 Spark 版本 3.2.0 和 3.2.1 之间不兼容,因此放弃了对带有 hudi-spark3.2-bundle 的 Spark 3.2.0 的支持。
AWS 和 GCP bundle jar 与 hudi-utilities-bundle 是分开的。 用户在使用云服务时需要使用 hudi-aws-bundle 或 hudi-gcp-bundle 以及 hudi-utilities-bundle。
Hudi 现在通过新的 hudi-flink1.16-bundle 在 Flink 1.16.x 上得到支持。
Hudi 在 Spark 中的文件索引默认切换为惰性列出:这意味着它只会列出查询请求的分区(即,在分区修剪之后),而不是在此版本之前总是列出整个表。 这有望为大型表带来相当大的性能提升。
如果用户想要更改列表行为,则会添加一个新的配置属性:hoodie.datasource.read.file.index.listing.mode(现在默认为惰性)。 您可以设置两个可能的值:
要保留 0.13.0 之前的行为,用户需要设置 hoodie.datasource.read.file.index.listing.mode=eager。 重大更改:只有当表同时具有以下两种情况时才会发生重大更改:多个分区列和分区值包含未进行 URL 编码的斜杠。
例如,假设我们要从分区路径 2022/01/03 解析两个分区列 – 月 (2022/01) 和日 (03)。 由于分区列的数量(此处为 2 – 月和日)与分区路径中由 / 分隔的组件数量(在本例中为 3 – 月、年和日)不匹配,因此会导致歧义。 在这种情况下,不可能恢复每个分区列对应的分区值。
有两种方法可以避免重大更改:
如果您使用 Spark streaming 摄取到 Hudi,Hudi 会在内部自行管理检查点。 我们现在正在添加对多个编写器的支持,每个编写器都通过流式摄取摄取到同一个 Hudi 表中。 在旧版本的 hudi 中,您不能将多个流式摄取编写器摄取到同一个 hudi 表中(一个具有并发 Spark 数据源编写器的流式摄取编写器与锁提供程序一起工作;但是,不支持两个 Spark 流式摄取编写器)。 在 0.13.0 中,我们添加了对同一个表进行多个流式摄取的支持。 如果是单个流摄取,用户无需执行任何操作; 旧管道无需任何额外更改即可工作。 但是,如果您有多个流式写入器到同一个 Hudi 表,则每个表都必须为配置 hoodie.datasource.write.streaming.checkpoint.identifier 设置一个唯一的值。 此外,用户应该设置通常的多写入器配置。 更多详情可在这找到。
此版本中删除了对 Spark 2.x 的 ORC 支持,因为 Hudi 中对 orc-core:nohive 的依赖现在被 orc-core 取代,以与 Spark 3 兼容。ORC 支持现在可用于 Spark 3.x ,这在以前的版本中被破坏了。
设置record key字段的配置hoodie.datasource.write.recordkey.field现在需要设置,没有默认值。 以前,默认值为 uuid。
许多用户已请求将 Hudi 用于 CDC 用例,他们希望在新模式中删除现有列时能够实现模式自动演化。 从 0.13.0 版本开始,Hudi 现在具有此功能。 您可以允许模式自动演化,其中可以将现有列删除到新模式中。
由于根据源架构在目标表中删除列构成了相当大的行为更改,因此默认情况下禁用此功能并由以下配置保护:hoodie.datasource.write.schema.allow.auto.evolution.column.drop。 要启用自动删除列以及传入批次的新演变模式,请将其设置为 true。
此配置不需要通过使用例如 ALTER TABLE … Spark 中的 DROP COLUMN 手动演变模式。
此版本更改了 Hudi 决定写入操作的shuffle并行度的方式,包括 INSERT、BULK_INSERT、UPSERT 和 DELETE (hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这最终会影响写入性能。
之前,如果用户不配置,Hudi 会使用 200 作为默认的 shuffle 并行度。 从 0.13.0 开始,默认情况下,Hudi 通过使用由 Spark 确定的输出 RDD 分区数(如果可用)或使用 spark.default.parallelism 值自动推导shuffle并行度。 如果上述Hudi shuffle并行度是用户明确配置的,那么用户配置的并行度仍然用于定义实际的并行度。 对于具有合理输入大小的工作负载,此类行为更改可将开箱即用的性能提高 20%。
如果输入数据文件很小,例如小于 10MB,我们建议显式配置 Hudi shuffle 并行度(hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这样并行度至少为 total_input_data_size/500MB,以 避免潜在的性能下降(有关更多信息,请参阅调整指南)。
对于插入/更新插入操作的执行,Hudi 过去使用执行器的概念,依靠内存中的队列将摄取操作(以前通常由 I/O 操作获取shuffle blocks)与写入操作分离。 从那时起,Spark 架构有了很大的发展,使得这种编写架构变得多余。 为了发展这种编写模式并利用 Spark 中的变化,在 0.13.0 中,我们引入了一个新的简化版本的执行程序,(创造性地)命名为 SimpleExecutor 并将其设置为开箱即用的默认值。
SimpleExecutor 没有任何内部缓冲(即不在内存中保存记录),它在内部实现对提供的迭代器的简单迭代(类似于默认的 Spark 行为)。 它在现代 Spark 版本 (3.x) 上提供了约 10% 的开箱即用性能改进,与 Spark 的本机 SparkRecordMerger 一起使用时甚至更多。
此版本调整了 BULK_INSERT 写入操作的 NONE 排序模式(默认排序模式)的并行度。 从现在开始,默认情况下,使用输入并行性而不是shuffle并行性 (hoodie.bulkinsert.shuffle.parallelism) 来写入数据,以匹配默认的 parquet 写入行为。 这不会更改使用 NONE 排序模式的聚类行为。
BULK_INSERT 写入操作的这种行为更改提高了开箱即用的写入性能。
如果在默认的NONE排序方式下还是发现小文件问题,我们建议在写入Hudi表之前,先根据分区路径和记录键对输入数据进行排序。 您还可以使用 GLOBAL_SORT 来确保最佳文件大小。
在早期版本中,我们使用了一种快速失败的方法,如果任何目录同步失败,则不会尝试同步到剩余的目录。 在 0.13.0 中,在任何目录同步失败的操作失败之前尝试同步到所有配置的目录。 在一个目录同步失败的情况下,其他目录的同步仍然可以成功,所以用户现在只需要重试失败的目录即可。
由于错误配置可能导致数据完整性问题,在 0.13.0 中,我们努力使用户的元数据表配置更加简单。 在内部,Hudi 确定这些配置的最佳选择,以实现系统的最佳性能和稳定性。
以下与元数据表相关的配置是内部的; 您不能再显式配置这些配置:
hoodie.metadata.clean.async
hoodie.metadata.cleaner.commits.retained
hoodie.metadata.enable.full.scan.log.files
hoodie.metadata.populate.meta.fields
以前,由于配置错误,CTAS 写入操作被错误地设置为使用 UPSERT。 在 0.13.0 版本中,我们修复了这个问题,以确保 CTAS 使用 BULK_INSERT 操作来提高第一批写入 Hudi 表的性能(没有真正需要为此使用 UPSERT,因为正在创建表)。
在 0.13.0 之前,我们通过清理所有消息来引导 ckp 元数据(检查点相关元数据)。 一些极端情况没有得到正确处理。 例如:
重新启动作业时,写任务无法正确获取挂起的瞬间。
如果检查点成功并且作业突然崩溃,则瞬间没有时间提交。 数据丢失,因为最后一个挂起的瞬间被回滚; 然而,Flink 引擎仍然认为检查点/即时是成功的。
问:为什么我们要在 0.13.0 版本之前清理消息?
A:为了防止时间线和消息不一致。
问:为什么我们要保留 0.13.0 版本中的消息?
A:不一致有两种情况:
时间线即时完成但 ckp 消息正在传输(用于提交即时)。
时间线时刻处于待定状态,而 ckp 消息未启动(用于启动新时刻)。
对于case 1,不需要re-commit instant,如果write task在恢复的时候没有得到任何pending instant就可以了。
对于case 2,instant基本上是悬而未决的。 瞬间将被回滚(如预期的那样)。 因此,保持 ckp 消息原样实际上可以保持正确性。
在 0.13.0 中,我们引入了元数据集中管理服务 Metaserver。 这是我们在未来引入的首批平台服务组件之一。 Metaserver 帮助用户轻松管理数据湖平台中的大量表。
注意,这是实验性的特性
要在您的环境中设置元服务器,请使用 hudi-metaserver-server-bundle 并将其作为 java 服务器应用程序运行,例如 java -jar hudi-metaserver-server-bundle-.jar。 在客户端,添加以下选项以与元服务器集成:
hoodie.metaserver.enabled=true
hoodie.metaserver.uris=thrift://<server url>:9090
Metaserver 存储 Hudi 表的元数据,如表名、数据库、所有者; 以及时间线的元数据,如提交瞬间、动作、状态等。此外,Metaserver 通过 Hudi Spark 包支持 Spark 写入器和读取器。
在 Hudi 表用作流源的情况下,我们希望了解属于单个提交的记录的所有更改。 例如,我们想知道哪些记录被插入、删除和更新。 对于更新的记录,后续管道可能希望获取更新前的旧值和更新后的新值。 0.13.0之前,增量查询不包含硬删除记录,用户需要使用软删除流删除,可能不符合GDPR要求。
Change-Data-Capture (CDC) 功能使 Hudi 能够通过生成更改来显示记录是如何更改的,从而处理 CDC 查询用例。
CDC 是一项实验性功能,支持用于带有 Spark 和 Flink 引擎的 COW 表。 CDC 查询尚不支持 MOR 表。
要使用 CDC,用户需要先在写入表时启用它以记录额外的数据,这些数据由 CDC 增量查询返回。
对于写入,设置 hoodie.table.cdc.enabled=true 并通过 hoodie.datasource.query.incremental.format 指定 CDC 日志记录模式,以控制记录的数据。 有3种模式可供选择:
默认值为data_before_after
对于读,设置:
hoodie.datasource.query.type=incremental
hoodie.datasource.query.incremental.format=cdc
和其他通常的增量查询选项,如开始和结束即时时间,并返回 CDC 结果。
请注意,hoodie.table.cdc.enabled 是表配置。 一旦启用,就不允许为该表关闭它。 同样,您不能更改 hoodie.table.cdc.supplemental.logging.mode,一旦它被保存为表配置。
此版本引入了期待已久的支持,可将记录作为其引擎原生表示进行处理,从而避免将它们转换为中间形式 (Avro) 的需要。
此功能处于实验模式,目前仅支持 Spark。
通过引入新的 HoodieRecordMerger 抽象,RFC-46 使这成为可能。 HoodieRecordMerger 是未来在 Hudi 中实现任何合并语义的核心和真实来源。 在这种能力下,它取代了以前用于实现自定义合并语义的 HoodieRecordPayload 层次结构。 通过依赖 HoodieRecordMerger 形式的统一组件,我们可以在写入操作的整个生命周期内以统一的方式处理记录。 这大大减少了延迟,因为记录现在保存在引擎本机表示中,避免了不必要的复制、反序列化和转换为中间表示 (Avro)。 在我们的基准测试中,与 0.13.0 默认状态相比,upsert 性能提高了 10%,与 0.12.2 相比提高了 20%。
今天要尝试,您需要为每个 Hudi 表指定不同的配置:
对于 COW,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger
对于 MOR,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger 和 hoodie.logfile.data.block.format=parquet
请注意,当前的 HoodieSparkRecordMerger 实现仅支持与 OverwriteWithLatestAvroPayload 类等效的合并语义,这是当前用于合并记录的默认 HoodieRecordPayload 实现(设置为“hoodie.compaction.payload.class”)。 因此,如果您正在使用任何其他 HoodieRecordPayload 实现,不幸的是,您需要等到它被相应的 HoodieRecordMerger 实现替换。
Deltastreamer 是一个完全托管的增量 ETL 实用程序,支持各种来源。 在此版本中,我们在其曲目中添加了三个新来源。
Deltastreamer 已经支持使用 JSON 和 Avro 格式从 Kafka 中一次性摄取新事件。 ProtoKafkaSource 也将此支持扩展到基于 Protobuf 类的模式。 只需一个额外的配置,就可以轻松设置此源。 查看文档以获取更多详细信息。
沿着 S3 事件源的路线,我们现在有一种可靠且快速的方法来通过 GcsEventsHoodieIncrSource 从 Google Cloud Storage (GCS) 中的对象中摄取。 查看有关如何设置此源的文档。
Apache Pulsar 是一个为云构建的开源分布式消息传递和流媒体平台。 PulsarSource 支持通过 Deltastreamer 从 Apache Pulsar 摄取。 查看有关如何设置此源的文档。
部分更新是社区中的一个常见用例,它需要能够仅更新某些字段而不是替换整个记录。 以前,我们建议用户通过引入他们自己的自定义记录负载实现来满足此用例。 随着它的流行,在 0.13.0 版本中,我们添加了一个新的记录有效负载实现 PartialUpdateAvroPayload,以支持这种开箱即用的功能,因此用户可以使用该实现而不必编写自己的自定义实现。
我们引入了 Consistent Hashing Index 作为您使用 Hudi 写入的另一种索引选项。 这是对 0.11.0 版本中添加的 Bucket Index 的增强。 使用Bucket索引,每个分区的Bucket/文件组是静态分配的,而使用一致性哈希索引,Bucket可以动态增长,因此用户无需担心数据倾斜。 Bucket将根据每个分区的负载因子扩展和收缩。 您可以找到此功能设计的 RFC。
如果您想尝试一下,这里是您感兴趣的配置。
hoodie.index.type=bucket
hoodie.index.bucket.engine=CONSISTENT_HASHING
hoodie.bucket.index.max.num.buckets=128
hoodie.bucket.index.min.num.buckets=32
hoodie.bucket.index.num.buckets=4
## do split if the bucket size reach 1.5 * max_file_size
hoodie.bucket.index.split.threshold=1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
hoodie.bucket.index.merge.threshold=0.1
要强制缩小或扩大存储bucket,您需要使用以下配置启用clustering
## check resize for every 4 commit
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=4
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy
## for supporting concurrent write & resizing
hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy
Consistent Hashing Index 仍然是一个不断发展的特性,目前从 0.13.0 开始使用它有一些限制:
我们正在努力实现这些自动化,并使用户更容易利用 Consistent Hashing Index。 您可以在此处关注 Consistent Hashing Index 正在进行的工作。
Hudi提供乐观并发控制(OCC),允许多个写入者在没有重叠数据文件写入的情况下,并发写入并原子提交到Hudi表,保证数据的一致性、完整性和正确性。 在0.13.0版本之前,这种重叠数据文件的冲突检测是在提交元数据之前和数据写入完成之后进行的。 如果在最后阶段检测到任何冲突,则可能会浪费计算资源,因为数据写入已经完成。
为了提高并发控制,0.13.0版本引入了OCC早期冲突检测的新特性,利用Hudi的标记机制,在数据写入阶段检测到冲突,一旦检测到冲突就提前中止写入。 Hudi 现在可以更早地停止冲突写入器,因为它可以及早检测冲突并释放集群所需的计算资源,从而提高资源利用率。
OCC 中的早期冲突检测在 0.13.0 版本中是实验性的。
默认情况下,此功能处于关闭状态。 要尝试这一点,用户需要在使用 OCC 进行并发控制时将 hoodie.write.concurrency.early.conflict.detection.enable 设置为 true(有关更多详细信息,请参阅并发控制页面)。
在以前的版本中,Hudi 使用生产者-消费者模型通过有界内存队列将传入数据写入表中。 在此版本中,我们添加了一种新型队列,利用 Disruptor,它是无锁的。 当数据量很大时,这会增加写入吞吐量。 将 1 亿条记录写入云存储上的 Hudi 表中的 1000 个分区的基准显示,与现有的有界内存队列执行器类型相比,性能提高了 20%。
DisruptorExecutor 作为实验特性支持 Spark 插入和 Spark 批量插入操作
用户可以设置 hoodie.write.executor.type=DISRUPTOR_EXECUTOR 来启用该功能。 还有其他配置,如 hoodie.write.wait.strategy 和 hoodie.write.buffer.size 可以进一步调整性能。
我们为 Spark 3.x 引入了一个新的 Hudi CLI Bundle,hudi-cli-bundle_2.12,使 Hudi CLI 更简单易用。 用户现在可以使用这个单一的 bundle jar(发布到 Maven 存储库)和 Hudi Spark bundle 来启动脚本来启动带有 Spark 的 Hudi-CLI shell。 这为 Hudi-CLI 带来了轻松部署,因为用户不需要在本地编译 Hudi CLI 模块、上传 jar 和解决任何依赖冲突(如果有),而在此版本之前就是这种情况。 可以在 Hudi CLI 页面上找到详细说明。
Flink 1.16.x 集成Hudi,在编译源码时使用profile参数-Pflink1.16激活版本。 或者,使用 hudi-flink1.16-bundle。 Flink 1.15、Flink 1.14 和 Flink 1.13 将继续支持。 请查看迁移指南以获取捆绑包更新。
对于配置模式注册表的 DeltaStreamer 用户,添加了一个 JSON 模式转换器,以帮助将 JSON 模式转换为目标 Hudi 表的 AVRO。 将 hoodie.deltastreamer.schemaprovider.registry.schemaconverter 设置为 org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter 以使用此功能。 用户还可以实现此接口 org.apache.hudi.utilities.schema.SchemaRegistryProvider.SchemaConverter 以提供从原始模式到 AVRO 的自定义转换。
用户现在可以通过 Spark SQL conf 提供 Hudi 配置,例如,设置
spark.sql("set hoodie.sql.bulk.insert.enable = true")
确保 Hudi 在执行 INSERT INTO 语句时能够使用 BULK_INSERT 操作。
0 0 投票数
文章评分
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。