在上一篇文章中,我们讨论了 Hudi 查询类型及其与 Spark 的集成。在这篇文章中,我们将深入研究另一个方面——写入流程,以 Spark 作为示例引擎。在写入数据时可以调整多种配置和设置。因此这篇文章的目的并不是作为完整的使用指南。相反主要目标是呈现内部数据流并分解所涉及的步骤。这将使读者更深入地了解运行和微调 Hudi 应用程序。各种实际使用示例请查阅Hudi的官方文档页面。
下图说明了执行引擎上下文中 Hudi 写入操作所涉及的典型高级步骤。我将简要介绍本节中的每个步骤。
Hudi写客户端作为写操作的入口点,Hudi写支持是通过创建引擎兼容的写客户端实例来实现的。例如,Spark 使用 SparkRDDWriteClient ,Flink 使用 HoodieFlinkWriteClient ,Kafka Connect 生成 HoodieJavaWriteClient 。通常此步骤涉及将用户提供的配置与现有 Hudi 表属性进行协调,然后将最终配置集传递给客户端。
在写入客户端处理输入数据之前,会发生多个转换,包括 HoodieRecord 的构造和架构协调。让我们更深入地研究 HoodieRecord ,因为它是写入路径中的基本模型。
Hudi使用 HoodieKey 模型来标识唯一记录,该模型由“recordKey”和“partitionPath”组成。这些值是通过实现 KeyGenerator API 来填充的。该 API 可以灵活地根据输入模式提取自定义字段并将其转换为键。“currentLocation”和“newLocation”均由 Hudi 时间线的操作时间戳和文件组的 ID 组成。回顾第 1 篇文章中的逻辑 FileGroup 和 FileSlice 概念,时间戳指向特定 FileGroup 内的 FileSlice。“位置”属性用于使用逻辑信息来定位物理文件。如果“currentLocation”不为空,则表示表中存在具有相同键的记录,而“newLocation”则指定应将传入记录写入何处。“数据”字段是一个通用类型,包含记录的实际字节,也称为有效负载。通常,此属性实现 HoodieRecordPayload ,它指导引擎如何将旧记录与新记录合并。从 0.13.0 版本开始,引入了新的实验接口 HoodieRecordMerger 来替代 HoodieRecordPayload 并作为统一的合并 API。
在此步骤中,写入客户端始终检查表的时间轴上是否还存在任何失败的操作,并通过在时间轴上创建“请求的”提交操作来启动写入操作之前相应地执行回滚。
所提供的 HoodieRecord 可以根据用户配置和操作类型选择性地进行重复数据删除和索引。如果需要重复数据删除,具有相同键的记录将被合并为一条。如果需要索引,如果记录存在,则将填充“currentLocation”。
这是一个重要的预写入步骤,它确定哪个记录进入哪个文件组,并最终进入哪个物理文件。传入的记录将被分配到更新桶和插入桶,这意味着后续文件写入的策略不同。每个桶代表一个 RDD 分区,用于分布式处理,就像 Spark 的情况一样。
这是实际 I/O 操作发生的时间。使用文件写入句柄创建或附加物理数据文件。在此之前,还可以在 .hoodie/.temp/ 目录中创建标记文件,以指示将对相应数据文件执行的写入操作类型。这对于高效回滚和冲突解决方案非常有价值。
数据写入磁盘后,可能需要立即更新索引数据以保证读写的正确性。这特别适用于写入期间不同步更新的索引类型,例如托管在 HBase 服务器中的 HBase 索引。
在最后一步中,写入客户端将承担多个任务以正确完成事务写入。例如,它可以运行预提交验证(如果已配置)、检查与并发编写器的冲突、将提交元数据保存到时间线、使 WriteStatus 与标记文件协调一致,等等。
更新插入数据是 Lakehouse 管道中的常见场景。在本节中我们将详细研究 CoW 表的 Upsert 流程,然后简要概述所有其他支持的写入操作。
更新插入到 MoR 表遵循非常相似的流程,使用一组不同的条件来确定用于更新和插入的文件写入句柄的类型。
插入流程与更新插入非常相似,主要区别在于缺少索引步骤。这意味着整个写入过程会更快(如果关闭重复数据删除会更快),但可能会导致表中出现重复。批量插入遵循与插入相同的语义,这意味着它也可能由于缺乏索引而导致重复。然而,区别在于批量插入缺乏小文件处理。记录分区策略通过设置 BulkInsertSortMode 确定,也可以通过实现 BulkInsertPartitioner 自定义。Bulk Insert 还默认为 Spark 启用行写入模式,绕过“转换输入”步骤中的 Avro 数据模型转换,并直接使用引擎原生数据模型 Row 。此模式提供更高效的写入。总体而言,批量插入通常比插入性能更高,但可能需要额外的配置调整来解决小文件问题。
删除流程可以视为更新插入流程的特例。主要区别在于,在“转换输入”步骤中,输入记录被转换为 HoodieKey 并传递到后续阶段,因为这些是识别要删除的记录所需的最少数据。需要注意的是,此过程会导致硬删除,这意味着目标记录将不会存在于相应文件组的新文件切片中。
与上面介绍的流程相比,删除分区遵循完全不同的流程。它采用物理分区路径列表,而不是输入记录,该列表是通过 hoodie.datasource.write.partitions.to.delete 配置的。由于没有输入记录,因此索引、分区和写入存储等过程不适用。删除分区将目标分区路径的所有文件组 ID 保存在时间轴上的 .replacecommit 操作中,确保后续写入者和读取者将它们视为已删除。
插入覆盖用提供的记录完全重写分区。此流程可以有效地视为删除分区和批量插入的组合:它从输入记录中提取受影响的分区路径,将这些分区中的所有现有文件组标记为已删除,并同时创建新的文件组来存储传入记录。
插入覆盖表是插入覆盖的变体。它不是从输入记录中提取受影响的分区路径,而是获取表的所有分区路径以进行覆盖。
在这篇文章中,我们探索了 Hudi 写入路径中的常见高级步骤,深入研究了 CoW Upsert 流程并详细解释了记录分区逻辑,并介绍了所有其他写入操作。