Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >[离线计算-Spark|Hive] 数据近实时同步数仓方案设计

[离线计算-Spark|Hive] 数据近实时同步数仓方案设计

作者头像
awwewwbbb
发布于 2022-04-27 01:07:27
发布于 2022-04-27 01:07:27
97200
代码可运行
举报
运行总次数:0
代码可运行

背景

最近阅读了大量关于hudi相关文章, 下面结合对Hudi的调研, 设计一套技术方案用于支持 MySQL数据CDC同步至数仓中,避免繁琐的ETL流程,借助Hudi的upsert, delete 能力,来缩短数据的交付时间.

组件版本:

  • Hadoop 2.6.0
  • Hive 1.1.0
  • hudi 0.7.0
  • spark 2.4.6

架构设计

  1. 使用canal(阿里巴巴MySQL Binlog增量订阅&消费组件)dump mysql binlog 数据
  2. 采集后将binlog 数据采集到kafka中, 按照库名创建topic, 并按照表名将数据写入topic 固定分区
  3. spark 消费数据将数据生成DF
  4. 将DF数据写入hudi表
  5. 同步hudi元数据到hive中

写入主要分成两部分全量数据和增量数据:

  • 历史数据通过bulkinsert 方式 同步写入hudi
  • 增量数据直接消费写入使用hudi的upsert能力,完成数据合并

写入hudi在hdfs的格式如下:

hudi

hudi 如何处理binlog upsert,delete 事件进行数据的合并?

upsert好理解, 依赖本身的能力.

针对mysql binlog的delete 事件,使用记录级别删除:

  1. 需要在数据中添加 '_HOODIE_IS_DELETED' 且值为true的列
  2. 需要在dataFrame中添加此列,如果此值为false或者不存在则当作常规写入记录

如果此值为true则为删除记录

示例代码如下:

StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
dataFrame.write.format("org.apache.hudi")
               .option("hoodie.table.name", "test123")
               .option("hoodie.datasource.write.operation", "upsert")
               .option("hoodie.datasource.write.recordkey.field", "uuid")
               .option("hoodie.datasource.write.partitionpath.field", "partitionpath")
               .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE")
               .option("hoodie.datasource.write.precombine.field", "ts")
               .mode(Append)
               .save(basePath)

写入hudi及同步数据至hive,需要注意的事情和如何处理?

  1. 声明为hudi表的path路径, 非分区表 使用tablename/, 分区表根据分区路径层次定义/个数
  2. 在创建表时需添加 TBLPROPERTIES 'spark.sql.sources.provider'='hudi' 声明为datasource为hudi类型的表

hudi如何处理新增字段?

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的

hudi 写入时指定mergeSchema参数 为true

spark如何实现hudi表数据的写入和读取?

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi/ hudi来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource,其实现了 CreatableRelationProvider#createRelation接口,并实现了读写逻辑

kyuubi

如何读取hudi表数据?

使用网易开源的kyuubi

kyuubi架构图:

支持HiveServer2 Thrift API协议,可以通过beeline 连接

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
hive: beeline -u jdbc:hive2://ip:10000 -n userName -p 

kyuubi: beeline -u jdbc:hive2://ip:8333 -n userName -p 

hudi 元数据使用hive metastore

spark来识别加载hudi表

实现hudi表与hive表关联查询

kyuubi 支持SparkContext的动态缓存,让用户不需要每次查询都动态创建SparkContext。作为一个应用在yarn 上一直运行,终止beeline 连接后,应用仍在运行,下次登录,使用SQL可以直接查询

总结

本文主要针对hudi进行调研, 设计MySQL CDC 近实时同步至数仓中方案, 写入主要利用hudi的upsert以及delete能力. 针对hudi 表的查询,引入kyuubi 框架,除 了增强平台 spark sql作为即席查询服务的能力外,同时支持查询hudi表,并可以实现hudi表与hive表的联合查询, 同时对原有hive相关服务没有太大影响.

参考

  1. https://blog.csdn.net/weixin_38166318/article/details/111825032
  2. https://blog.csdn.net/qq_37933018/article/details/120864648
  3. https://cxymm.net/article/qq_37933018/120864648
  4. https://www.jianshu.com/p/a271524adcc3
  5. https://jishuin.proginn.com/p/763bfbd65b70
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark读取变更Hudi数据集Schema实现分析
Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load便可加载Hudi数据集,本篇文章分析具体的实现。
ApacheHudi
2021/04/13
2.8K0
「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。
王知无-import_bigdata
2022/03/11
2.6K0
「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
实战 | 将Kafka流式数据摄取至Hudi
使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启--enable-hive-sync 即可同步数据到hive表。
ApacheHudi
2021/04/13
2.3K0
数据湖(五):Hudi与Hive集成
Hudi与Hive集成原理是通过代码方式将数据写入到HDFS目录中,那么同时映射Hive表,让Hive表映射的数据对应到此路径上,这时Hudi需要通过JDBC方式连接Hive进行元数据操作,这时需要配置HiveServer2。
Lansonli
2022/05/31
2.7K0
数据湖(五):Hudi与Hive集成
Apache Hudi入门指南(含代码示例)
hudi详细介绍见hudi官网 http://hudi.apache.org/cn/docs/0.5.0-quick-start-guide.html
ApacheHudi
2021/04/13
3.4K0
数据湖(四):Hudi与Spark整合
默认Spark操作Hudi使用表类型为Copy On Write模式。Hudi与Spark整合时有很多参数配置,可以参照https://hudi.apache.org/docs/configurations.html配置项来查询,此外,整合时有几个需要注意的点,如下:
Lansonli
2022/05/30
3.3K2
数据湖(四):Hudi与Spark整合
Streaming与Hudi、Hive湖仓一体!
也就是,可以将HDFS和Hudi结合起来,提供对流处理的支持能力。例如:支持记录级别的更新、删除,以及获取基于HDFS之上的Change Streams。哪些数据发生了变更。
ApacheHudi
2021/07/05
3.4K0
Streaming与Hudi、Hive湖仓一体!
ApacheHudi使用问题汇总(二)
Hudi Cleaner(清理程序)通常在 commit和 deltacommit之后立即运行,删除不再需要的旧文件。如果在使用增量拉取功能,请确保配置了清理项来保留足够数量的commit(提交),以便可以回退,另一个考虑因素是为长时间运行的作业提供足够的时间来完成运行。否则,Cleaner可能会删除该作业正在读取或可能被其读取的文件,并使该作业失败。通常,默认配置为10会允许每30分钟运行一次提取,以保留长达5(10 * 0.5)个小时的数据。如果以繁进行摄取,或者为查询提供更多运行时间,可增加 hoodie.cleaner.commits.retained配置项的值。
ApacheHudi
2021/04/13
1.8K0
「Hudi系列」Hudi查询&写入&常见问题汇总
2. 「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
王知无-import_bigdata
2022/06/05
6.9K0
「Hudi系列」Hudi查询&写入&常见问题汇总
hudi中的写操作
在本节中,我们将介绍如何使用DeltaStreamer工具从外部数据源甚至其他Hudi表中获取新的更改,以及如何使用Hudi数据源通过upserts加速大型Spark作业。 然后可以使用各种查询引擎查询这些表。
从大数据到人工智能
2022/01/19
1.7K0
hudi中的写操作
真香!PySpark整合Apache Hudi实战
Hudi支持Spark-2.x版本,你可以点击如下链接安装Spark,并使用pyspark启动
ApacheHudi
2021/04/13
1.8K0
基于Apache Hudi + MinIO 构建流式数据湖
Apache Hudi 是一个流式数据湖平台,将核心仓库和数据库功能直接引入数据湖。Hudi 不满足于将自己称为 Delta 或 Apache Iceberg 之类的开放文件格式,它提供表、事务、更新/删除、高级索引、流式摄取服务、数据聚簇/压缩优化和并发性。Hudi 于 2016 年推出,牢牢扎根于 Hadoop 生态系统,解释了名称背后的含义:Hadoop Upserts Deletes and Incrementals。它是为管理 HDFS 上大型分析数据集的存储而开发的。Hudi 的主要目的是减少流数据摄取过程中的延迟。
从大数据到人工智能
2022/10/28
1.6K0
基于Apache Hudi + MinIO 构建流式数据湖
详解Apache Hudi如何配置各种类型分区
Apache Hudi支持多种分区方式数据集,如多级分区、单分区、时间日期分区、无分区数据集等,用户可根据实际需求选择合适的分区方式,下面来详细了解Hudi如何配置何种类型分区。
ApacheHudi
2021/04/13
1.2K0
Apache Hudi 入门学习总结
学习和使用Hudi近一年了,由于之前忙于工作和学习,没时间总结,现在从头开始总结一下,先从入门开始
小明互联网技术分享社区
2022/10/31
1.5K0
解锁Apache Hudi删除记录新姿势
在0.5.1版本之前,用户若想删除某条记录,可以使用Spark DataSource,并将 DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY设置为 EmptyHoodieRecordPayload.class.getName,便可删除指定记录,在Hudi新发布的0.5.1版本,可不使用上述配置项删除记录,而提供三种方式删除记录:Hudi API,Spark DataSource,DeltaStreamer,下面逐一介绍如何使用。
ApacheHudi
2021/04/13
2K0
写入 Hudi 数据集
这一节我们将介绍使用DeltaStreamer工具从外部源甚至其他Hudi数据集摄取新更改的方法, 以及通过使用Hudi数据源的upserts加快大型Spark作业的方法。 对于此类数据集,我们可以使用各种查询引擎查询它们。
ApacheHudi
2021/04/13
1.5K0
实战|使用Spark Streaming写入Hudi
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
ApacheHudi
2021/04/13
2.3K0
查询hudi数据集
从概念上讲,Hudi物理存储一次数据到DFS上,同时在其上提供三个逻辑视图,如之前所述。 数据集同步到Hive Metastore后,它将提供由Hudi的自定义输入格式支持的Hive外部表。一旦提供了适当的Hudi捆绑包, 就可以通过Hive、Spark和Presto之类的常用查询引擎来查询数据集。
ApacheHudi
2021/04/13
1.8K0
使用spark3操作hudi数据湖初探
本文基于上述组件版本使用spark插入数据到hudi数据湖中。为了确保以下各步骤能够成功完成,请确保hadoop集群正常启动。
从大数据到人工智能
2022/01/19
1.7K0
使用spark3操作hudi数据湖初探
速度!Apache Hudi又双叕被国内顶级云服务提供商集成了!
Apache Hudi 在 HDFS 的数据集上提供了插入更新和增量拉取的流原语。
ApacheHudi
2021/04/13
8640
相关推荐
Spark读取变更Hudi数据集Schema实现分析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验