最近在了解 Presto 和 Trino 对于 Deltalake Connector 的相关实现原理,这里了解完刚好用一篇文章总结下,一是可以帮助自己未来的回顾,二是也希望能够帮助大家,下面都是个人理解,若理解有误,欢迎指出,共勉。
Presto 和 Trino 当前支持通过 Hive Metastore 相关接口 + 自己解析 Deltalake 事务日志(事务 Json、Checkpoint Parquet)的方式,来获取 Deltalake 的元数据(Table Column、统计信息等)。
整体上 Hive Metastore 能够提供到一张 Deltalake 表在底层文件系统的 Location 信息,结合 Location 的信息,具体事务日志的解析逻辑,Presto 和 Trino 的实现有所不同:
Presto 和 Trino Deltalake 都支持三种兼容 HiveMetastore 接口实现的 MetaStore 类型:
1. AWS Glue -- 元数据信息在 AWS Glue,比如 Table、Schame 等等,列的信息需要从事务日志中读取
2. FileSystem Hive Metastore -- 元数据信息会从文件系统上面进行获取,比如查看某个 schema 下面有哪些表,那么直接读取底层这个 schema 所在的文件系统路径,来进行查看。
3. Thrift HiveMetastore -- 通过 Thrift 接口从 Hive MetaStore 中获取元数据信息
FileSystem Hive Metastore 好处在于减少了 Hive Metastore 部署依赖,缺点就是获取元数据信息的时候,需要从文件系统上面进行读取,访问文件系统的 API 操作,有一定时间,对于查询的 RT,会有影响。
Presto Connector 元数据的实现主要在DeltaMetadata,DeltaMetadata 中主要两个变量:
一个.prestoSchema 文件内容示例如下:
Trino Deltalake 的所有的元数据操作实现类为:DeltaLakeMetadata,在该类中,有一个 DeltaLakeMetastore 类型变量,该接口主要定义了与 Deltalake 元数据 Catalog 操作的接口,它有一个实现类:HiveMetastoreBackedDeltaLakeMetastore。
HiveMetastoreBackedDeltaLakeMetastore 中有一个 HiveMetastore 变量,HiveMetastore 定义 Hive Metastore 的元数据操作接口,它底层有多种不同的实现类:
以获取一张 Deltalake 表的列信息为例,从 Hive Metastore 中只能拿到 Deltalake 表的 Location 信息,接下来 Trino 会结合 Location 信息,自己解析表的事务日志信息。首先根据 Location 信息,会获取到其对应的 TableSnapshot,具体方法可以看:DeltaLakeMetadata#getSnapshot,接下来会调用 TransactionLogAccess#getTransactionLogEntries 方法,拿到所有事务日志的具体信息,最终结合 MetadataEntry 中的信息,获取列的具体信息。
主要是为了用户对于表的不同版本操作信息进行查看和追踪,方便表的管理和维护。
Trino 支持会为每张 Deltalake 表暴露两张元数据表:
"xxx_table$history" 表主要可以查看一张 Deltalake 在不同版本所对应的具体操作、操作时间、隔离级别等,主要是为了用户对于表的不同版本操作信息进行追踪,方便表的管理和维护,具体 SQL 如下:
SELECT * FROM "test_table$history"
查询出来的列的描述:
主要是对一个版本的底层事务 json 文件的 commit 信息,一个 commit 信息 Json 内容示例:
{
"commitInfo":{
"timestamp":1613741139539,
"userId":"6259558072923448",
"userName":"lukasz.osipiuk@starburstdata.com",
"operation":"CREATE TABLE",
"operationParameters":{
"isManaged":"false",
"description":null,
"partitionBy":"[\"number_partition\",\"string_partition\"]",
"properties":"{}"
},
"notebook":{
"notebookId":"3433682753164350"
},
"clusterId":"1103-105745-dial507",
"isolationLevel":"SnapshotIsolation",
"isBlindAppend":true,
"operationMetrics":{
}
}
}
Deltalake 支持的操作 Operation,参考:https://github.com/delta-io/delta/blob/master/connectors/standalone/src/main/java/io/delta/standalone/Operation.java
每个 Operation 对应的参数:https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
"xxx_table$properties" 表则是查看某张 Deltalake 表的配置、需要支持的 Feature、表的属性信息等,具体 SQL 示例如下:
SELECT * FROM "test_table$properties"
结果示例:
"xxx_table$properties" 会展示以下面信息:
Features 具体介绍可以参考:https://github.com/delta-io/delta/blob/master/PROTOCOL.md#supported-features
其他 Trino 支持查询 Deltalake 表的记录的元数据信息:
可以支持指定表的某个快照版本进行 Timetravel:
SELECT * FROM sales.apac."sales_data@v4" LIMIT 200;
支持指定表的某个时间截对应的快照版本进行数据查询:
SELECT * FROM sales.apac."sales_data@t2021-11-18 09:45" LIMIT 200;
Tirno 支持 Deltalake 表的元数据更新,主要用来更新表的 Column 的 NDV 和 total size,这两个信息属于 Trino 自定义扩展出来的,这两个信息存储在 Deltalake 表下面 _trino_meta 目录,使用 extended_stats.json 来进行存储:
Trino 支持两种 Deltalake 表统计信息更新:增量和全量,默认是增量:
ANALYZE table_schema.table_name WITH(mode = ‘full_refresh’);
ANALYZE table_schema.table_name;
也支持基于某个时间截之后的文件来更新元数据信息(比如之前已经更新过),以及可以指定更新具体列的元数据信息:
ANALYZE example_table WITH(files_modified_after
= TIMESTAMP '2021-08-2316:43:01.321 Z');
ANALYZE example_table WITH(columns = ARRAY['nationkey', 'regionkey'])
由于上面的元数据更新只对 Column 的 NDV 和 total Size 进行更新,对于 Trino 表的 RowCNT,以及表的 Column 的 min、max、null cnt,Trino 是基于 Deltalake 事务日志的 AddFile 的统计信息来进行计算。
Deltalake 在 1.2 版本之后,支持 AddFile 中生成 File 以及的列的统计信息,MR 参考:https://github.com/delta-io/delta/pull/924 ,一个有统计信息的 AddFile json 如下:
{
"add":{
"path":"part-00001-0d4d7e36-7318-461a-8895-69a2e8e7df76-c000.snappy.parquet",
"partitionValues":{
},
"size":1056,
"modificationTime":1614676970000,
"dataChange":true,
"stats":"{\"numRecords\":1,\"minValues\":{\"lower_case_string\":\"databricks\",\"UPPER_CASE_STRING\":\"DATABRICKS\"},\"maxValues\":{\"lower_case_string\":\"databricks\",\"UPPER_CASE_STRING\":\"DATABRICKS\"},\"nullCount\":{\"lower_case_string\":0,\"UPPER_CASE_STRING\":0,\"MiXeD_CaSe_StRiNg\":1}}"
}
}
整体 Trino 获取一张表的元数据流程如下:
不支持
Trino 支持 Deltalake 表的 Read 和 Write,Presto 支持 Deltalake 表的 Read。
Trino 没有看到 Deltalake 的 Time Travel 能力,Presto 本身基于 Deltalake standalone lib 包,能够直接使用其 API 读取到某个 Snapshot 的具体快照信息,所以 Presto Deltalake 支持基于版本号和 Timestamp 的时间旅行能力,可以查看某一个版本的数据。示例 SQL 如下:
SELECT * FROM sales.apac."sales_data@v4" LIMIT 200;
SELECT * FROM sales.apac."sales_data@t2021-11-18 09:45" LIMIT 200;