Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink集成数据湖之实时数据写入iceberg

Flink集成数据湖之实时数据写入iceberg

作者头像
大数据技术与应用实战
修改于 2021-02-07 11:13:12
修改于 2021-02-07 11:13:12
6.4K50
代码可运行
举报
运行总次数:0
代码可运行

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并
  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。
  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办
  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?
  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz
  • 下载 iceberg-flink-runtime-xxx.jar
  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
execution.checkpointing.interval: 10s   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

目前系统提供的catalog有hivecatalog和hadoopcatalog以及自定义catlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE CATALOG iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

执行完之后,显示如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flink SQL> show catalogs;
default_catalog
iceberg

如果不想每次启动sql client都重新执行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
catalogs: # empty list
# A typical catalog definition looks like:
  - name: hive
    type: hive
    hive-conf-dir: /Users/user/work/hive/conf
    default-database: default
  - name: iceberg
    type: iceberg
    warehouse: hdfs://localhost/user/hive2/warehouse
    uri: thrift://localhost:9083
    catalog-type: hive

创建db

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

创建table

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
    id BIGINT COMMENT 'unique id',
    data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE sourceTable (
 userid int,
 f_random_str STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

这时候我们看到有两个表了

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flink SQL> show tables;
iceberg_001
sourcetable

然后执行insert into插入数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

代码版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Flink2Iceberg{

	public static void main(String[] args) throws Exception{
		StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		env.enableCheckpointing(10000);
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +
		                "  'type'='iceberg',\n" +
		                "  'catalog-type'='hive'," +
		                "  'hive-conf-dir'='/Users/user/work/hive/conf/'" +
		                ")");

		tenv.useCatalog("iceberg");
		tenv.executeSql("CREATE DATABASE iceberg_db");
		tenv.useDatabase("iceberg_db");

		tenv.executeSql("CREATE TABLE sourceTable (\n" +
		                " userid int,\n" +
		                " f_random_str STRING\n" +
		                ") WITH (\n" +
		                " 'connector' = 'datagen',\n" +
		                " 'rows-per-second'='100',\n" +
		                " 'fields.userid.kind'='random',\n" +
		                " 'fields.userid.min'='1',\n" +
		                " 'fields.userid.max'='100',\n" +
		                "'fields.f_random_str.length'='10'\n" +
		                ")");

		tenv.executeSql(
				"insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");
	}
}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
5 条评论
热度
最新
请问如何对数据进行upsert和delete操作呢?
请问如何对数据进行upsert和delete操作呢?
回复回复点赞举报
kafka表写入不了数据,运行的时候直接finished
kafka表写入不了数据,运行的时候直接finished
回复回复点赞举报
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z at org.apache.iceberg.flink.FlinkCatalog.lambda$validateFlinkTable$2(FlinkCatalog.java:430) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.iceberg.flink.FlinkCatalog.va
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z at org.apache.iceberg.flink.FlinkCatalog.lambda$validateFlinkTable$2(FlinkCatalog.java:430) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.iceberg.flink.FlinkCatalog.va
11点赞举报
您好,请问您这个问题解决了吗?创建iceberg表的时候我也是这个异常
您好,请问您这个问题解决了吗?创建iceberg表的时候我也是这个异常
回复回复点赞举报
按 pom.xml 引入的包,scala 版本 2.11,执行会报错
按 pom.xml 引入的包,scala 版本 2.11,执行会报错
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
课后补充----单细胞数据检测变异
单细胞基因包括从单细胞测序数据中得出的生殖系和推测的体细胞SNV。它从单细胞测序数据的单个bam文件开始,这些数据由scRNA-seq、snRNA-seq、利用测序(snATAC-seq)、单细胞DNA-seq等进行转座酶可及染色质的单核测定产生。
追风少年i
2024/09/25
1360
课后补充----单细胞数据检测变异
顶刊方法补充---肿瘤演化与微环境相互作用
追风少年i
2024/11/15
1010
顶刊方法补充---肿瘤演化与微环境相互作用
课前准备----高通量单细胞分析数据集中的体细胞突变检测
体细胞突变在单细胞分辨率上的表征对于研究癌症进化、克隆嵌合和细胞可塑性至关重要。 体细胞突变在单细胞分辨率上的表征对于研究癌症中的遗传异质性和细胞可塑性、非肿瘤组织中的克隆嵌合体以及确定在恶性细胞和表型正常细胞中起作用的突变过程至关重要。单细胞基因组测序为研究单细胞突变提供了最直接的方法。 单细胞基因组测序的一种替代策略包括直接从高通量单细胞测定中检测测序reads中的体细胞突变,如scRNA-seq和scATAC-seq。这种方法的主要优点是可以利用单细胞分析的高通量来绘制细胞谱系的转录或调控程序,而不需要复杂的实验方案来联合分析来自同一细胞的DNA和RNA。然而,由于不同细胞类型的基因表达的可变性、等位基因脱落事件、RNA编辑、有限的覆盖深度和测序产物,突变的检测受到很大限制。因此,现有的算法依赖于检测突变,如单核苷酸变异(SNV)或indel,这些突变以前是通过匹配的bulk或单细胞DNA测序数据确定的。
追风少年i
2024/09/08
1410
课前准备----高通量单细胞分析数据集中的体细胞突变检测
你的单细胞数据集还可以分析体细胞突变
单细胞水平上的体细胞突变特征对于研究癌症进化、克隆镶嵌和细胞可塑性至关重要。传统的单细胞基因组测序方法在可扩展性和基因组丢失率方面存在挑战。另一策略是从scRNA-seq 和 scATAC-seq 检测体细胞突变,这种方法的主要优点是可以将细胞谱系映射到转录或调控程序,但由于不同细胞类型的基因表达差异、等位基因脱落事件、RNA 编辑、有限的覆盖深度和测序伪影,突变的检测受到严重限制。因此,现有算法检测体细胞突变具有局限性。因此,研究团队开发了一种名为SComatic的算法,旨在直接检测 scRNA-seq 和ATAC-seq(可及染色质序列测定)数据集中的体细胞突变,而无需匹配的大量或单细胞DNA测序数据。
生信菜鸟团
2024/07/31
3910
你的单细胞数据集还可以分析体细胞突变
RNA-seq这十年(3万字长文综述)
我们在生信菜鸟团公众号举办的每周文献精选活动,菜鸟团一周文献推荐(No.20)广受好评的是大神级的RNA-Seq综述,题目:RNA sequencing: the teenage years ,所以在我们生信技能树VIP交流群也得到了关注。
生信技能树
2019/08/08
13.8K0
RNA-seq这十年(3万字长文综述)
用WES和RNA-Seq数据提取到的somatic SNVs不一致
其实两者均可用于检测遗传变异,特别是在单核苷酸变异方面(SNVs)。如果大家对RNA-seq数据如何找变异位点的流程不是很清楚,可以看我们生信技能树以前的教程:
生信技能树
2020/09/14
2K0
用WES和RNA-Seq数据提取到的somatic SNVs不一致
课前准备---单细胞VDJ分析导论2
单细胞T细胞和B细胞抗原受体测序数据分析可以潜在地对适应性免疫细胞进行深入评估,从而为了解免疫细胞的发育提供信息,从而跟踪疾病和治疗中的克隆扩增。然而,由于数据的复杂性和潜在的生物学特性,在单细胞水平上分析和解释T细胞和B细胞及其适应性免疫受体谱一直是极具挑战性的。
追风少年i
2024/07/02
3760
课前准备---单细胞VDJ分析导论2
单细胞测序系列(1)--单细胞全基因组测序
仅2018年,他的研究团队就发表了11篇单细胞测序方向文章,获得了单细胞测序领域的接连重要成果。他众多学术成果中,有40余篇论文发表在Cell, Nature, Science, Cell Stem Cell, Nature Genetics, Nature Cell Biology, Cell Research, Genome Research等期刊上。单细胞测序领域的时代前沿性,以及持续的发展力可见一斑。
用户6317549
2019/09/24
5.7K0
单细胞测序系列(1)--单细胞全基因组测序
Nat. Methods | 新算法利用空间转录组数据构建肿瘤的“空间进化图谱”
癌症的进展由体细胞突变和染色质重塑共同驱动,包括核苷酸变异(SNV)、体细胞拷贝数变异(CNA)和大规模结构变异等。对批量肿瘤或分离单细胞中的体细胞突变进行测序可揭示肿瘤内部的遗传异质性,并能重建肿瘤的进化史;肿瘤之间也表现出异质性,并能够在物理空间内发生进展。分析肿瘤在时间和空间上的体细胞进化是癌症研究中的一个关键挑战,常因缺乏空间数据而受阻。
DrugAI
2024/11/23
1350
Nat. Methods | 新算法利用空间转录组数据构建肿瘤的“空间进化图谱”
前瞻 | MorPhiC:描述每个人类基因的分子和细胞功能,人类基因的功能性表征
- 图片说明- 点图显示了基于PubMed搜索的每种人类基因的出版物总数。统计包括在摘要或正文文本中出现基因符号的已发表文章。该图的代码由M. Hirshey提供。
生信菜鸟团
2025/02/20
1420
前瞻 | MorPhiC:描述每个人类基因的分子和细胞功能,人类基因的功能性表征
基于单基因SNV对癌症进行分类 | Nature | BRCA2变异的功能评估与临床分类
- 图片说明- a,6959个单核苷酸变异(SNVs)的原始功能评分分布,按变异类型着色。- b,VarCall模型中所有变异的调整后功能评分分布。- c,每个外显子中基于模型的功能评分分布,按变异类型分类。颜色表示变异类型。- d,柱状图展示了每个功能类别中每种变异类型的百分比。颜色表示功能类别。- e,柱状图展示了14个目标区域中按功能类别划分的SNVs的百分比。颜色表示功能类别。
生信菜鸟团
2025/02/20
1500
基于单基因SNV对癌症进行分类 | Nature | BRCA2变异的功能评估与临床分类
重构人胚胎着床的单细胞转录组与DNA甲基化组图谱
当你的才华还撑不起你的野心时,请潜下心来,脚踏实地,跟着我们慢慢进步。不知不觉在单细胞转录组领域做知识分析也快两年了,通过文献速递这个栏目很幸运聚集了一些小伙伴携手共进,一起成长。
生信技能树jimmy
2020/03/30
8760
scHLAcount || 单细胞转录组HLA等位基因分析
scHLAcount允许我们使用个性化的参考基因组计算HLA I类基因HLA-A、B和C的单细胞转录组序列数据中的分子数;和HLA II类基因DPA1, DPB1, DRA1, DRB1, DQA1, DQB1。可以使用由替代方法确定的提供的HLA类型,也可以使用此工具分析HLA类型,然后根据这些调用进行量化。
生信技能树jimmy
2020/09/14
1.1K0
RNA模型可以帮助发现疾病机制和候选药物
今天为大家介绍的是来自Tehmina Masud, Amit Deshwar, Shreshth Gandhi, Brendan J. Frey团队的一篇论文。精确地对RNA生物学进行建模和预测一直是一个长期存在的挑战,对于变异解释和定制治疗的制定具有重要的临床意义。作者提出了一个RNA生物学的基础模型,名为“BigRNA”,它经过了数千个基因组匹配数据集的训练,可以从DNA序列预测组织特异性的RNA表达、剪接、microRNA位点以及RNA结合蛋白的特异性。
DrugAI
2023/11/13
2620
RNA模型可以帮助发现疾病机制和候选药物
从空间解析转录组学推断等位基因特异性拷贝数畸变和肿瘤系统图谱
追风少年i
2024/11/04
1210
从空间解析转录组学推断等位基因特异性拷贝数畸变和肿瘤系统图谱
肿瘤异质性的空间和基因组综合分析(肿瘤克隆进化)
追风少年i
2024/10/30
1850
肿瘤异质性的空间和基因组综合分析(肿瘤克隆进化)
10X单细胞空间数据分析之SNP检测篇
Cellsnp-lite是在C/ c++中实现的,并执行每个细胞基因分型,supporting both with (mode 1) and without (mode 2) given SNPs。在后一种情况下,杂合snp将被自动检测。Cellsnp-lite适用于基于液滴的(例如10XGenomics数据)和well-based的平台(例如SMART-seq2数据)。
追风少年i
2023/04/29
8800
10X单细胞空间数据分析之SNP检测篇
单细胞测序技术在循环肿瘤细胞检测中的应用
循环肿瘤细胞(CTCs)是起源于上皮来源的原发性或转移性肿瘤并脱落到血液循环系统中的具有高活力和高转移潜能的肿瘤细胞。CTC 是液体活检的重要组成部分之一,为实时监测肿瘤进展提供了一个方法。
生信技能树jimmy
2022/03/14
1.9K0
单细胞测序技术在循环肿瘤细胞检测中的应用
单细胞时代 || NGS技术实现
Single-Cell RNA Sequencing and Its Combination with Protein and DNA Analyses
生信技能树jimmy
2021/03/10
1.7K0
单细胞时代 || NGS技术实现
单细胞转录组的时间序列数据分析
单细胞转录组数据分析在阐述多细胞生物发育与疾病进程方面已经开发了多种新的方法,如比较有名的轨迹推断(TI,trajectory inference)。但是,我们知道,各种轨迹推断方法只是一种利用表达量的排序手段而已,而且严重依赖先验的知识,如根节点的选择。有没有一种技术可以真正的在RNA转录的时候为转录的RNA打上时间的标签呢?
生信技能树jimmy
2021/10/09
1.9K0
推荐阅读
相关推荐
课后补充----单细胞数据检测变异
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档