Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink集成数据湖之实时数据写入iceberg

Flink集成数据湖之实时数据写入iceberg

作者头像
大数据技术与应用实战
修改于 2021-02-07 11:13:12
修改于 2021-02-07 11:13:12
6.4K50
代码可运行
举报
运行总次数:0
代码可运行

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并
  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。
  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办
  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?
  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz
  • 下载 iceberg-flink-runtime-xxx.jar
  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
execution.checkpointing.interval: 10s   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

目前系统提供的catalog有hivecatalog和hadoopcatalog以及自定义catlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE CATALOG iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

执行完之后,显示如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flink SQL> show catalogs;
default_catalog
iceberg

如果不想每次启动sql client都重新执行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
catalogs: # empty list
# A typical catalog definition looks like:
  - name: hive
    type: hive
    hive-conf-dir: /Users/user/work/hive/conf
    default-database: default
  - name: iceberg
    type: iceberg
    warehouse: hdfs://localhost/user/hive2/warehouse
    uri: thrift://localhost:9083
    catalog-type: hive

创建db

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

创建table

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
    id BIGINT COMMENT 'unique id',
    data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE sourceTable (
 userid int,
 f_random_str STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

这时候我们看到有两个表了

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Flink SQL> show tables;
iceberg_001
sourcetable

然后执行insert into插入数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

代码版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Flink2Iceberg{

	public static void main(String[] args) throws Exception{
		StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		env.enableCheckpointing(10000);
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +
		                "  'type'='iceberg',\n" +
		                "  'catalog-type'='hive'," +
		                "  'hive-conf-dir'='/Users/user/work/hive/conf/'" +
		                ")");

		tenv.useCatalog("iceberg");
		tenv.executeSql("CREATE DATABASE iceberg_db");
		tenv.useDatabase("iceberg_db");

		tenv.executeSql("CREATE TABLE sourceTable (\n" +
		                " userid int,\n" +
		                " f_random_str STRING\n" +
		                ") WITH (\n" +
		                " 'connector' = 'datagen',\n" +
		                " 'rows-per-second'='100',\n" +
		                " 'fields.userid.kind'='random',\n" +
		                " 'fields.userid.min'='1',\n" +
		                " 'fields.userid.max'='100',\n" +
		                "'fields.f_random_str.length'='10'\n" +
		                ")");

		tenv.executeSql(
				"insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");
	}
}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
5 条评论
热度
最新
请问如何对数据进行upsert和delete操作呢?
请问如何对数据进行upsert和delete操作呢?
回复回复点赞举报
kafka表写入不了数据,运行的时候直接finished
kafka表写入不了数据,运行的时候直接finished
回复回复点赞举报
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z at org.apache.iceberg.flink.FlinkCatalog.lambda$validateFlinkTable$2(FlinkCatalog.java:430) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.iceberg.flink.FlinkCatalog.va
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.table.api.TableColumn.isGenerated()Z at org.apache.iceberg.flink.FlinkCatalog.lambda$validateFlinkTable$2(FlinkCatalog.java:430) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.iceberg.flink.FlinkCatalog.va
11点赞举报
您好,请问您这个问题解决了吗?创建iceberg表的时候我也是这个异常
您好,请问您这个问题解决了吗?创建iceberg表的时候我也是这个异常
回复回复点赞举报
按 pom.xml 引入的包,scala 版本 2.11,执行会报错
按 pom.xml 引入的包,scala 版本 2.11,执行会报错
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
数据湖YYDS! Flink+IceBerg实时数据湖实践
互联网技术发展的当下,数据是各大公司最宝贵的资源之一已经是不争的事实。收据的收集、存储和分析已经成为科技公司最重要的技术组成部分。大数据领域经过近十年的高速发展,无论是实时计算还是离线计算、无论是数据仓库还是数据中台,都已经深入各大公司的各个业务。
王知无-import_bigdata
2021/10/13
4.5K0
Flink集成Iceberg小小实战
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
大数据真好玩
2021/07/30
6.1K1
Flink结合Kafka实时写入Iceberg实践笔记
环境:本地测试环境 JDK1.8 、Flink 1.11.2 、Hadoop3.0.0 、Hive2.1.1
大数据真好玩
2021/03/27
1.9K0
Flink集成iceberg在生产环境中的实践
目前我们的大数据系统里,主要承接的业务是部门内的一些业务日志数据的统计、分析等,比如网关日志数据,服务器监控数据,k8s容器的相关日志数据,app的打点日志等。主要的流任务是flink任务是消费kafka的数据,经过各种处理之后通过flink sql或者flink jar实时写入hive,由于业务对数据的实时性要求比较高,希望数据能尽快的展示出来,所以我们很多的flink任务的checkpoint设置为1分钟,而数据格式采用的是orc格式,所以不可避免的出现了一个在大数据处理领域非常常见但是很麻烦的问题,即hdfs小文件问题。
大数据技术与应用实战
2021/01/08
5.8K5
数据湖(十八):Flink与Iceberg整合SQL API操作
Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。
Lansonli
2022/07/22
1.3K0
数据湖(十八):Flink与Iceberg整合SQL API操作
基于Flink1.14 + Iceberg0.13构建实时数据湖实战
Iceberg默认支持Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服务器的lib目录下,然后重启Flink
王知无-import_bigdata
2022/06/05
1.7K0
Flink + Iceberg 在去哪儿的实时数仓实践
摘要:本文介绍去哪儿数据平台在使用 Flink + Iceberg 0.11 的一些实践。内容包括:
大数据技术架构
2021/07/05
1.1K0
Flink + Iceberg 在去哪儿的实时数仓实践
数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表
我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。
Lansonli
2022/07/24
9150
数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表
Iceberg+Amoro+Cloudeon体验云原生数据湖
之前的文章有介绍过怎么在Kubernetes上快速搭建大数据基础环境,这里就不重复介绍了。安装完后,可以看到如下图各个基础服务都启动完成。
CloudEon开源
2023/09/08
6340
Iceberg+Amoro+Cloudeon体验云原生数据湖
Dinky 扩展 iceberg 的实践分享
摘要:本文介绍了 Dinky 实时计算平台扩展 iceberg 的实践分享。内容包括:
文末丶
2022/09/02
1.8K1
Dinky 扩展 iceberg 的实践分享
Flink SQL 知其所以然(三十):Explain、Show、Load、Set 子句
大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。
公众号:大数据羊说
2022/12/16
6650
Apache Iceberg技术调研&在各大公司的实践应用大总结
作者在实际工作中调研了Iceberg的一些优缺点和在各大厂的应用,总结在下面。希望能给大家带来一些启示。
大数据真好玩
2021/07/07
4.4K0
Flink CDC同步MySQL分库分表数据到Iceberg数据湖实践
Flink CDC: 捕获数据库完整的变更日志记录增、删、改等所有数据. Flink在1.11版本开始引入了Flink CDC功能,并且同时支持Table & SQL两种形式。Flink SQL CDC是以SQL的形式编写实时任务,并对CDC数据进行实时解析同步。相比于传统的数据同步方案,该方案在实时性、易用性等方面有了极大的改善。
awwewwbbb
2022/04/27
2.5K1
Flink CDC同步MySQL分库分表数据到Iceberg数据湖实践
flink sql实时计算当天pv写入mysql
今天我们主要来讲一个很简单但是很常见的需求,实时计算出网站当天的pv值,然后将结果实时更新到mysql数据库,以供前端查询显示。
大数据技术与应用实战
2020/09/24
3.3K0
flink sql实时计算当天pv写入mysql
数据湖(十六):Structured Streaming实时写入Iceberg
目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。
Lansonli
2022/07/11
9190
数据湖(十六):Structured Streaming实时写入Iceberg
Flink集成Iceberg在同程艺龙的实践
过去几年,数据仓库和数据湖方案在快速演进和弥补自身缺陷的同时,二者之间的边界也逐渐淡化。云原生的新一代数据架构不再遵循数据湖或数据仓库的单一经典架构,而是在一定程度上结合二者的优势重新构建。在云厂商和开源技术方案的共同推动之下,2021 年我们将会看到更多“湖仓一体”的实际落地案例。InfoQ 希望通过选题的方式对数据湖和数仓融合架构在不同企业的落地情况、实践过程、改进优化方案等内容进行呈现。本文将分享同程艺龙将 Flink 与 Iceberg 深度集成的落地经验和思考。
深度学习与Python
2023/04/01
4740
Flink集成Iceberg在同程艺龙的实践
正面超越Spark | 几大特性垫定Flink1.12流计算领域真正大规模生产可用(下)
我们书接上文,我们在之前的文章《正面超越Spark | 几大特性垫定Flink1.12流计算领域真正大规模生产可用(上)》详细描述了Flink的生产级别Flink on K8s高可用方案和DataStream API 对批执行模式的支持。
王知无-import_bigdata
2021/02/05
6420
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.3K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
基于 Flink+Iceberg 构建企业级实时数据湖
Apache Flink 是大数据领域非常流行的流批统一的计算引擎,数据湖是顺应云时代发展潮流的新型技术架构。那么当 Apache Flink 遇见数据湖时,会碰撞出什么样的火花呢?本次分享主要包括以下核心内容:
Spark学习技巧
2021/03/05
2.3K0
基于 Flink+Iceberg 构建企业级实时数据湖
Flink教程-使用sql将流式数据写入文件系统
flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法写入数据,支持的写入格式包括json、csv、avro、parquet、orc。
大数据技术与应用实战
2020/09/15
2.7K0
推荐阅读
相关推荐
数据湖YYDS! Flink+IceBerg实时数据湖实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验