与流式处理工作负载相关的主要挑战之一是传入事件的无序性质。在典型的流式处理方案中,由于网络延迟、处理延迟或其他因素,事件可能会不按顺序到达。随着从各种来源(尤其是在移动应用程序和 IoT 平台中)摄取的数据量和速度不断增加,数据处理框架必须能够处理变化(即记录的更改)和乱序事件。传统的数据存储系统和文件格式(例如针对批处理优化的系统和文件格式)通常难以有效地管理这些场景。Hudi 通过专门为应对此类挑战而设计的功能介入。当事件或记录更改在不同时间到达时,它们的顺序可能与最初生成的顺序不同。
例如,在智能城市交通监控系统中,传感器可以实时报告各个十字路口的车速。但是,由于网络问题或延迟,某些传感器数据可能会比其他传感器数据晚到达,并且可能会按顺序出错。为了处理这个问题,系统需要有效地将新的传入数据与现有记录合并。就像 Hudi 的合并模式如何控制存储系统中具有相同键的记录的合并,确保一致性和准确性一样,它确保最终流量数据反映正确的事件时间,即使某些数据延迟到达。这些合并模式有助于在高负载下保持一致的确定性结果,确保延迟数据更新正确的记录,而不会导致不一致。这可能会导致以下几个问题:
当事件被无序处理时,可能会导致数据状态不正确或不一致。例如,如果表示交易的事件在指示账户余额的事件之前处理,则生成的数据可能无法准确反映系统的真实状态。
处理无序事件通常需要额外的逻辑来确保以正确的顺序处理数据。这可能会使数据管道复杂化并增加出错的可能性。
借助 1.0.0 版中引入的新 API,Hudi 支持三种主要的合并模式,每种模式都适用于数据处理的不同阶段:写入、压缩和查询。
当现有记录的新数据到达时,Hudi 会对输入数据集执行重复数据删除。此过程涉及在写入阶段之前合并同一记录键的多个更改记录。这是一项优化,还有助于减少写入日志文件的记录数(在 MOR 的情况下)。通过预先合并更改,Hudi 减少了不必要的记录,从而提高了查询和写入的效率。此步骤对于实时处理流数据至关重要,其中更改可能会迅速到达,并确保仅将记录的最终版本写入系统。通常这些乱序事件通常在同一批次中聚集在一起,使用像 spark 这样的处理引擎来处理微批次,合并输入更改有助于减少需要写入的记录数量。
在写入时复制 (CoW) 表中,通过为记录创建新的文件版本来应用更改。当发生更新、部分更新或删除作时,Hudi 会将此最终更改与存储中的现有记录合并。合并模式控制这些更新的应用方式,确保仅反映最近的更改并且表的数据保持一致。这在 CoW 表中尤其重要,因为它们通过写入新版本的记录而不是覆盖现有数据来保持历史数据的不变性。合并模式可确保记录的新版本与之前的所有更改一致。
Hudi 使用日志文件(增量日志)和基本文件(原始数据)的概念。随着记录的更改随着时间的推移而累积,Hudi 的压缩服务会将存储在日志文件中的更改记录与基本文件合并,以保持数据的一致性和查询优化。合并模式定义在压缩过程中如何将这些日志记录与基本文件合并。压缩有助于保持存储效率,并通过减少可能需要读取的小日志文件的数量来确保查询运行得更快。
在 Merge-on-Read (MOR) 表中,数据同时存储在日志文件和基本文件中。执行查询时,Hudi 会根据合并模式将日志文件中的更改记录与基本文件合并。merge作在查询时进行,以提供最终的一致数据视图。通过在查询时合并记录,Hudi 可确保查询反映最新更改,同时保持查询性能。
在常见情况下,输入数据包含一个可用于标识最新记录的字段。通常,表具有 updated_at 或其他排序列等字段。如果 input 中没有这样的列,我们只能依赖传入的 order。
在 Hudi 1.0.0 发布后,引入了新配置 hoodie.record.merge.mode 来定义负责处理记录更新的合并模式。这些合并模式规定了在管道的不同阶段(从数据摄取到查询结果)如何处理具有相同键的记录。它可以具有以下三个值:
当输入数据中没有可用的字段时,使用此合并模式来显式确定哪条记录是最新的。系统将依赖摄取顺序(提交时间)来确定记录的顺序。Hudi 希望记录严格按照其提交的顺序到达。因此,假定最近的记录(就摄取时间而言)是记录的最新版本。当没有可以指示记录顺序的专用列(如 updated_at、timestamp 或 versioning 字段)时,通常使用此模式。此处的合并逻辑只是根据摄取顺序(提交时间)选择最新的写入。在某种程度上,它相当于覆盖仅考虑最新记录的语义。
SET hoodie.spark.sql.insert.into.operation=upsert;
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey ='uuid', hoodie.record.merge.mode='COMMIT_TIME_ORDERING');
INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');
select*from hudi_table;
-- Result - 20250106162911278 20250106162911278_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-43-47_20250106162911278.parquet 1695091554788 334e26e9-8355-45cc-97c6-c31daf0df330 rider-C driver-M 27.7 san_francisco
INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',19.10,'san_francisco');
select*from hudi_table;
-- Result - 20250106163449812 20250106163449812_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 08218473-f72a-480d-90e6-c6764f062e5c-0_0-71-68_20250106163449812.parquet 1 334e26e9-8355-45cc-97c6-c31daf0df330 rider-D driver-K 19.1 san_francisco
在上面的示例中,我们使用 COMMIT_TIME_ORDERING 合并模式创建了表。使用此模式时,无需指定 precombine 或 ordering 字段。在第一次插入期间,将提供两条具有相同记录键的记录。系统将对它们进行重复数据删除,并保留稍后处理的记录。在第二次插入中,将插入具有相同记录键的新记录。正如预期的那样,该表将使用新记录进行更新,因为无论任何字段中的值如何,该记录都会稍后提交。
当输入数据中确实有可用于确定事件顺序的字段(如时间戳字段,如 updated_at 或版本号)时,将使用此合并模式。如果记录包含可用于跟踪记录上次更新时间的字段(例如,updated_at、last_modified或序列号),Hudi 将使用此字段来确定哪条记录是最新的。在这种情况下,Hudi 不依赖于摄取顺序,而是使用 ordering 字段的值(例如 updated_at)来确定正确的记录。当拥有临时或事件驱动型数据,并且希望根据事件时间戳维护 “最新” 记录时,此方法非常理想。
DROP TABLE hudi_table;
SET hoodie.spark.sql.insert.into.operation=upsert;
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey ='uuid',preCombineField ='ts', hoodie.record.merge.mode='EVENT_TIME_ORDERING');
INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');
select*from hudi_table;
-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-56-57_20250106165902806.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco
INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');
select*from hudi_table;
-- Result - 20250106165902806 20250106165902806_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 568ce7bc-9b71-4e15-b557-cbaeb5b4d2ea-0_0-84-78_20250106165918731.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco
在上面的示例中,我们使用 EVENT_TIME_ORDERING 合并模式创建了表。当使用这种模式时,我们需要指定 precombineField。在本例中,我们将 ts 指定为 precombineField。在第一次插入期间,将提供两条具有相同记录键的记录。系统将对它们进行重复数据删除,并保留稍后处理的记录。在第二次插入中,将插入具有相同记录键的新记录。正如预期的那样,该表将使用新记录进行更新,因为无论任何字段中的值如何,该记录都会稍后提交。
对于更复杂的用例,有时之前讨论的合并模式不起作用。我们可能需要实现特定于用例的合并逻辑。此处提供了实施的详细信息 - https://hudi.apache.org/docs/record_merger/#custom
在 1.0.0 之前,Hudi 使用旧版 Record Payload API,请参阅 Record Payloads 部分以了解实现和一些现有的记录有效负载。
除了现有的有效负载外,Hudi 还通过实施 HoodieRecordPayload 接口提供了实现自定义记录有效负载的灵活性
以下示例演示了 Record Payload 的用法,它实现了与 EVENT_TIME_ORDERING 类似的结果。我们使用了与上面相同的示例来说明此功能的工作原理。
DROP TABLE hudi_table;
SET hoodie.spark.sql.insert.into.operation=upsert;
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI TBLPROPERTIES (primaryKey ='uuid',preCombineField ='ts', hoodie.datasource.write.payload.class='org.apache.hudi.common.model.DefaultHoodieRecordPayload');
INSERT INTO hudi_table
VALUES
(3,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(2,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-C','driver-M',27.70 ,'san_francisco');
select*from hudi_table;
-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-14-17_20250203164444124.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco
INSERT INTO hudi_table
VALUES
(1,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-D','driver-K',18.00,'san_francisco');
select*from hudi_table;
-- Result - 20250203164444124 20250203164444124_0_0 334e26e9-8355-45cc-97c6-c31daf0df330 4549ed8e-0346-4d59-8878-9e047fb6c651-0_0-53-51_20250203164537068.parquet 3 334e26e9-8355-45cc-97c6-c31daf0df330 rider-A driver-K 19.1 san_francisco
总之管理延迟到达和乱序的数据是现代数据处理系统中的一项关键挑战,尤其是在处理大规模实时数据管道时。Hudi 等工具提供强大的合并模式,通过在管道的不同阶段智能地处理记录更新来确保数据的一致性、准确性和效率。无论是在处理流数据、IoT 传感器还是社交媒体帖子,了解如何配置和使用这些合并模式都可以大大提高数据存储和查询过程的性能和可靠性。通过利用正确的合并策略,可以确保系统即使在高负载和延迟数据的情况下也能保持稳健,最终从数据中做出更好的决策和洞察。