Loading [MathJax]/jax/input/TeX/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >flink etl

flink etl

作者头像
yiduwangkai
发布于 2021-07-14 05:55:24
发布于 2021-07-14 05:55:24
1.2K00
代码可运行
举报
文章被收录于专栏:大数据进阶大数据进阶
运行总次数:0
代码可运行

一.Regular Joins(双流join)

这种 join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在state里面,那么 state 又不能存的过大,因此这个场景的只适合有界数据流或者结合ttl state配合使用。它的语法可以看一下,比较像离线批处理的 SQL

left join,right join,full join, inner join

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE NOC (
  agent_id STRING,
  codename STRING
)
WITH (
  'connector' = 'kafka'
);
 
CREATE TABLE RealNames (
  agent_id STRING,
  name     STRING
)
WITH (
  'connector' = 'kafka'
);
 
SELECT
    name,
    codename
FROM NOC
INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;

二.Interval Joins(区间 join)

加入了一个时间窗口的限定,要求在两个流做 join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 join key 相同才能够完成 join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。Interval join 是同时支持 processing time 和 even time去定义时间的。如果使用的是 processing time,Flink 内部会使用系统时间去划分窗口,并且去做相关的 state 清理。如果使用 even time 就会利用 Watermark 的机制去划分窗口,并且做 State 清理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)
)
WITH (
  'connector' = 'kafka'
);
 
 
CREATE TABLE shipments (
  id INT,
  order_id INT,
  shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5+1)) AS INT), CURRENT_TIMESTAMP)
)
WITH (
 'connector' = 'kafka'
);
 
SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

三.Temporal Table Join

Interval Joins 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);
 
CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);
 
SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total,
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

四.Lookup Joins(维表join)

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。默认情况下,lookup cache 是未启用的, 你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。 lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'group-offsets'
  ,'format' = 'json'
);
 
CREATE TEMPORARY TABLE mysql_behavior_conf (
   id int
  ,code STRING
  ,map_val STRING
  ,update_time TIMESTAMP(3)
--   ,primary key (id) not enforced
--   ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
   'connector' = 'jdbc'
   ,'url' = 'jdbc:mysql://localhost:3306/venn'
   ,'table-name' = 'lookup_join_config'
   ,'username' = 'root'
   ,'password' = '123456'
   ,'lookup.cache.max-rows' = '1000'
   ,'lookup.cache.ttl' = '1 minute' -- 缓存时间,即使一直在访问也会删除
);
 
SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.map_val, a.ts
FROM user_log a
  left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
  ON a.behavior = c.code
where a.behavior is not null;

五.Lateral Table Join (横向join)

lateral Table Join基本相当于SQL Server的CROSS APPLY,功能上要强于CROSS APPLY

1.表和表关联

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE People (
    id           INT,
    city         STRING,
    state        STRING,
    arrival_time TIMESTAMP(3),
    WATERMARK FOR arrival_time AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'kafka'
);
 
CREATE TEMPORARY VIEW CurrentPopulation AS
SELECT
    city,
    state,
    COUNT(*) as population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
)
WHERE rownum = 1
GROUP BY city, state;
 
SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM CurrentPopulation) States,
    LATERAL (
        SELECT city, population
        FROM CurrentPopulation
        WHERE state = States.state
        ORDER BY population DESC
        LIMIT 2
);

2.函数表关联

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
SELECT
data, name, age
FROM
userTab,
LATERAL TABLE(splitTVF(data)) AS T(name, age)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.6K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
公众号:大数据羊说
2022/07/07
2.6K0
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
Flink SQL on Zeppelin - 打造自己的可视化Flink SQL开发平台
目前开发Flink的方式有很多,一般来说都是开发同学写JAVA/SCALA/PYTHON项目,然后提交到集群上运行。这种做法较为灵活,因为你在代码里面可以写任务东西,什么维表JOIN、参数调优,都能很轻松的搞定。但是对开发同学的要求较高,有一定的学习成本。比如有些同学擅长JAVA,有些擅长PYTHON,而在我们的项目开发过程中,是不会允许多种语言共存的,一般来说都是选择JAVA作为我们的开发语言,那么,对于擅长PYTHON的同学来说,再从头开始攀爬JAVA这座大山,而且还得短期能够熟练使用,无疑是难上加难。
王知无-import_bigdata
2021/03/15
5.4K0
Flink SQL on Zeppelin - 打造自己的可视化Flink SQL开发平台
Apache-Flink深度解析-Temporal-Table-JOIN
在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:
王知无-import_bigdata
2019/04/24
4.6K0
Apache-Flink深度解析-Temporal-Table-JOIN
【Flink】第十一篇:join 之 interval join
上一篇 【Flink】第十篇:join 之 regular join 验证了Flink SQL中的regular join的一些设计逻辑。
章鱼carl
2022/03/31
5.3K0
【Flink】第十一篇:join 之 interval join
Flink SQL 双表 JOIN 介绍与原理简析
Flink 作为流式数据处理框架的领跑者,在吞吐量、时延、准确型、容错性等方面都有优异的表现。在 API 方面,它为用户提供了较底层的 DataStream API,也推出了 Table API 和 SQL 等编程接口。特别来看,SQL 以其易用、易迁移的特点,深受广大用户的欢迎。
KyleMeow
2022/03/22
7.7K2
Flink1.18新特性生产环境应用的重点解读!
Flink 1.18已经于近期发布了。在这个新版本中新增了很多新的功能和特性。在这些特性中,有一些是生产环境非常重要的能力,大家在使用过程中可以重点参考和了解其中的原理。
王知无-import_bigdata
2023/10/30
1.6K1
Flink1.18新特性生产环境应用的重点解读!
Flink on Hive构建流批一体数仓
Flink使用HiveCatalog可以通过批或者流的方式来处理Hive中的表。这就意味着Flink既可以作为Hive的一个批处理引擎,也可以通过流处理的方式来读写Hive中的表,从而为实时数仓的应用和流批一体的落地实践奠定了坚实的基础。本文将以Flink1.12为例,介绍Flink集成Hive的另外一个非常重要的方面——Hive维表JOIN(Temporal Table Join)与Flink读写Hive表的方式。以下是全文,希望本文对你有所帮助。
大数据老哥
2021/02/04
4.5K0
Flink on Hive构建流批一体数仓
通过 Flink SQL 使用 Hive 表丰富流
流处理是通过在数据运动时对数据应用逻辑来创造商业价值。很多时候,这涉及组合数据源以丰富数据流。Flink SQL 执行此操作并将您应用于数据的任何函数的结果定向到接收器中。业务用例,例如欺诈检测、广告印象跟踪、医疗保健数据丰富、增加财务支出信息、GPS 设备数据丰富或个性化客户通信,都是使用Hive表来丰富数据流的很好的例子。 因此,Hive 表与 Flink SQL 有两种常见的用例:
大数据杂货铺
2022/12/02
1.4K0
通过 Flink SQL 使用 Hive 表丰富流
Flink SQL 知其所以然(二十六):万字详述 Flink SQL 4 种时间窗口语义!(收藏)
大家好我是老羊,由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走。思路如下:
公众号:大数据羊说
2022/07/07
3.9K1
Flink SQL 知其所以然(二十六):万字详述 Flink SQL 4 种时间窗口语义!(收藏)
Flink SQL中的Join操作
Flink SQL 支持对动态表进行复杂灵活的连接操作。 有几种不同类型的连接来解决可能需要的各种语义查询。
从大数据到人工智能
2022/02/24
5.6K0
Flink 实践教程:进阶11-SQL 关联:Regular Join
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2022/03/28
1.1K5
Flink 实践教程:进阶11-SQL 关联:Regular Join
Flink SQL 双表 JOIN 介绍与原理简析
作者:董伟柯,腾讯 CSIG 高级工程师 综述 Flink 作为流式数据处理框架的领跑者,在吞吐量、时延、准确型、容错性等方面都有优异的表现。在 API 方面,它为用户提供了较底层的 DataStream API,也推出了 Table API 和 SQL 等编程接口。特别来看,SQL 以其易用、易迁移的特点,深受广大用户的欢迎。 在常见的数据分析场景中,JOIN(关联)操作是一项很有挑战性的工作,因为它涉及到左右两个表(流)的状态匹配,对内存的压力较大;而相比恒定的批数据而言,流数据更加难以预测,例如数据可
腾讯云大数据
2022/04/18
1.1K0
Flink SQL 双表 JOIN 介绍与原理简析
flink sql实战案例
断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。
chimchim
2022/11/13
1.1K0
flink sql实战案例
【Flink】第三篇:维表Join之版本表(2)
上一篇“【Flink】第二篇:维表Join之版本表”写的有些仓促,最后的结论部分总结的不够精炼,本篇对其进行进一步总结,并给出Demo的输出示例,目的在于探索Flink SQL 版本表join的一些设计规则。
章鱼carl
2022/03/31
1.3K0
Flink1.12新特性之Flink SQL时态表小总结
Flink 1.12正式发布后,带来了很多新的特性,本文重点学习和总结一下Flink 1.11和 Flink1.12中时态表的使用和自己的一个小总结,文章如有问题,请大家留言交流讨论,我会及时改正。
大数据真好玩
2021/09/18
1.2K0
实战自定义Flink SQL Connector( Flink 1.11 & Redis)
Flink SQL之所以简洁易用而功能强大,其中一个重要因素就是其拥有丰富的Connector(连接器)组件。Connector是Flink与外部系统交互的载体,并分为负责读取的Source和负责写入的Sink两大类。不过,Flink SQL内置的Connector有可能无法cover实际业务中的种种需求,需要我们自行定制。好在社区已经提供了一套标准化、易于扩展的体系,用户只要按照规范面向接口编程,就能轻松打造自己的Connector。本文就在现有Bahir Flink项目的基础上逐步实现一个SQL化的Redis Connector。
小晨说数据
2022/03/10
3.8K0
实战自定义Flink SQL Connector( Flink 1.11 & Redis)
Flink SQL Client综合实战
在《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;
程序员欣宸
2021/04/21
1.3K0
Flink SQL Client综合实战
Flink SQL 知其所以然(二十四):SQL DDL!
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/07/07
1.5K0
【Flink】第十篇:join 之 regular join
每篇会讨论一种Flink SQL的join方式,其实已经在之前写过两篇用upsert-kafka做temporal join的文章,但是限于当时对于Flink SQL、CDC、撤回语义等的认知水平有限,并且时间仓促,写的不尽如人意。
章鱼carl
2022/03/31
4.6K0
【Flink】第十篇:join 之 regular join
相关推荐
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
更多 >
交个朋友
加入[数据] 腾讯云技术交流站
获取数据实战干货 共享技术经验心得
加入数据技术工作实战群
获取实战干货 交流技术经验
加入[数据库] 腾讯云官方技术交流站
数据库问题秒解答 分享实践经验
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档