Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >一次成功的FlinkSQL功能测试及实战演练

一次成功的FlinkSQL功能测试及实战演练

作者头像
大数据真好玩
发布于 2021-05-28 06:37:03
发布于 2021-05-28 06:37:03
2.8K00
代码可运行
举报
文章被收录于专栏:暴走大数据暴走大数据
运行总次数:0
代码可运行
FlinkSQL功能测试及实战演练

前言:Flink在国内的占有率逐步提升的情况下,各项组件的功能与稳定性也得到逐步提升。为了解决目前已有的复杂需求,尝试研究flinksql的特性与功能,作为是否引入该组件的依据。同时尝试将现有需求通过简单demo的形式进行测试。本次测试主要集中在Kafka、mysql、Impala三个组件上,同时将结合官方文档进行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/
1 前期准备
1.1 环境配置

本次研究测试需要用到以下组件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CDH 6.3.2
Flink 1.12.2
mysql 5.7
impala 3.2.0-cdh6.3.2
kafka 2.2.1-cdh6.3.2
1.2 依赖关系

本次测试会将FlinkSql与kafka、mysql、impala等组件进行conn,因此需要以下依赖包:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
flink-connector-kafka_2.11-1.12.2.jar
flink-connector-jdbc_2.11-1.11.2.jar
mysql-connector-java-5.1.47.jar
ImpalaJDBC4.jar
ImpalaJDBC41.jar
flink-sql-connector-kafka_2.11-1.12.2.jar
1.3 重启flink

将上述所需的jar包放入$FLINK_HOME/lib中之后(所有部署flink的服务器都需要放),重启yarn-session

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
yarn-session.sh --detached
sql-client.sh embedded
2 FlinkSql-kafka测试

FlinkSql-kafka相关资料:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
2.1 FlinkSql-kafka常规功能测试

通过FlinkSql将Kafka中的数据映射成一张表

2.1.1 创建常规topic

1、创建topic kafka-topics --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01

2、模拟消费者 kafka-console-consumer --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --from-beginning

3、模拟生产者 kafka-console-producer --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01

4、删除topic kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181

2.1.2 FlinkSql建表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE t1 (
    name string,
    age BIGINT,
    isStu INT,
    opt STRING,
    optDate TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'test01',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
    'format' = 'csv'  -- 数据源格式为 csv,
);
select * from t1;

2.1.3 写入数据

往kafka中写入数据,同时查看flinksql中t1表的变化

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
lisi,18,1,2
wangwu,30,2,2

2.1.4 小结

通过kafka数据映射成表这个步骤,可以将数据实时的汇入表中,通过sql再进行后续操作,相对代码编写来说更为简易,同时有问题也相对好排查

2.2 FlinkSql-upsertKafka常规功能测试

upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。

如果有key则update,没有key则insert,如果value的值为空,则表示删除

2.2.1 FlinkSql建upsert表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table t2;
CREATE TABLE t2 (
  name STRING,
  age bigint,
  isStu INT,
  opt STRING,
  optDate TIMESTAMP(3) ,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'test02',
  'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
  'key.format' = 'csv',
  'value.format' = 'csv'
);

2.2.2 建立映射关系

将t1表中的数据写入到t2中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO t2 SELECT * FROM t1 ;
select * from t2;

结果如下:

2.2.3 更新数据

继续模拟kafka生产者,写入如下数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
zhangsan,25,1,2
risen,8,8,8
lisi,0,0,

结果如下:

2.2.4小结

通过如上测试,两条更新,一条插入,都已经实现了,

根据官方文档描述,指定key的情况下,当value为空则判断为删除操作

但是假如我插入一条数据到kafka,例如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
lisi,,,

只有key,没有value,t1表就会报如下错误

因为建表的时候有几个类型定义为了Int类型,这里为空它默认为是""空字符串,有点呆,推测如果是json格式这类可以指定数据类型的,才能直接使用。对于csv这种数据类型不确定的,会存在无法推断类型的情况。

鉴于此,为了探究是否真的具备删除操作,我又将上述所有表结构都进行了修改。为了试验简单,我直接修改表结构再次测试。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop TABLE t1;
CREATE TABLE t1 (
    name STRING,
    age STRING,
    isStu STRING,
    opt STRING,
    optDate TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'test01',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
    'format' = 'csv'  -- 数据源格式为 csv,
);
drop table t2;
CREATE TABLE t2 (
  name STRING,
  age STRING,
  isStu STRING,
  opt STRING,
  optDate TIMESTAMP(3) ,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'test02',
  'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
  'key.format' = 'csv',
  'value.format' = 'csv'
);
INSERT INTO t2 SELECT * FROM t1 ;
select * from t2;

依然没有在t2表中删除掉该条记录,该功能需要进一步探索,以后在跟进。

2.3 FlinkSql-upsertKafka关于kafka中数据过期测试

2.3.1 创建10分钟策略的topic

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka-topics  --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01   --config log.retention.minutes=10
kafka-console-producer  --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01
kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181
kafka-console-consumer  --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic output --from-beginning
kafka-topics --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --describe

2.3.2 创建flinksql的表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE t1 (
    name string,
    age BIGINT,
    isStu INT,
    opt STRING,
    optDate TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR optDate as optDate - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'test01',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
    'format' = 'csv'  -- 数据源格式为 csv,
);
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE t2 (
  name STRING,
  age bigint,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'output',
  'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
  'key.format' = 'csv',
  'value.format' = 'csv'
);
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO t2
SELECT
name,
max(age)
FROM t1
GROUP BY name;

2.3.3 写入数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
zhangsan,18,1,insert
lisi,20,2,update
wangwu,30,1,delete

2.3.4 等待策略过期

flink映射的kafka数据因为数据删除,导致t1表里为空

但是t2是基于t1的汇总表,在t1被清空的情况下,t2依旧存在

3 FlinkSql-JDBC

FlinkSql-JDBC相关资料:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
3.1 FlinkSql-JDBC-Mysql常规功能测试

3.1.1 mysql建表并写入数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
create table test.test01(name varchar(10),age int, primary key (name));
INSERT INTO test.test01(name, age)VALUES('zhangsan', 20);
INSERT INTO test.test01(name, age)VALUES('lisi', 30);
INSERT INTO test.test01(name, age)VALUES('wangwu', 18);

3.1.2 flinkSql建表

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table mysqlTest ;
create table mysqlTest (
name string,
age int,
PRIMARY KEY (name) NOT ENFORCED
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.168.5.187:3306/test',
 'username' = 'root',
 'password' = '123456',
 'table-name' = 'test01'

);
select * from mysqlTest;

3.1.3 flinksql写入/更新数据到mysql

3.1.3.1 写入

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO mysqlTest(name, age)VALUES('risen', 88);

在flink表与mysql表中,都多了该条记录

3.1.3.2 更新

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO mysqlTest (name, age) VALUES('zhangsan', 50);

3.1.3.3 删除

官方文档对delete简单提了一下,但是在实际中并没有

JDBC连接器允许使用JDBC驱动程序从任何关系数据库读取数据或将数据写入任何关系数据库。本文档介绍了如何设置JDBC连接器以对关系数据库运行SQL查询。

如果在DDL上定义了主键,则JDBC接收器将在upsert模式下运行以与外部系统交换UPDATE / DELETE消息,否则,它将在附加模式下运行,并且不支持使用UPDATE / DELETE消息。

尝试删除:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DELETE FROM mysqlTest where name='zhangsan';
INSERT INTO mysqlTest (name, age) VALUES('zhangsan', null);
3.1.4 小结

flinkSql连接mysql,增删改查,增加与查询很容易实现,但是修改一定要在建表的时候,指定主键才可以实现upsert,删除目前好像没办法实现

3.2 FlinkSql-JDBC-Impala常规测试
3.2.1 Impala创建kudu表
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table kudu_test.kuduTest;
CREAT TABLE kudu_test.kuduTest
(
    name string,
    age BIGINT,
    isStu INT,
    opt STRING,
    PRIMARY KEY(name)
)STORED AS KUDU;
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('zhangsan', 20,1,'1');
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('lisi', 30,1,'1');
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES('wangwu', 18,1,'1');
3.2.2 flinkSql建表
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table impalaTest ;
create table impalaTest (
name string,
age int,
PRIMARY KEY (name) NOT ENFORCED
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:impala://192.168.5.185:21050/kudu_test',
 'username' = 'root',
 'password' = '123456',
 'table-name' = 'kuduTest',
 'driver'='com.cloudera.impala.jdbc4.Driver'

);
select * from impalaTest;

呃,不支持impala

3.2.3 小结

目前暂不支持通过JDBC连接Impala

4 总结

1、Flinksql支持kafka、mysql,且已经支持upsert功能,但是在测试delete的时候,发现都无法直接实现,但是可以通过汇总一次,在逻辑上实现。在尝试将flinksql连接impala的时候报错,目前暂不支持,但是可以考虑通过将数据写入kafka,最后impala来消费来实现。

2、在大数据场景中,每条数据都是有价值的。当某天有"统计删除了多少条数据"的需求时,物理删除掉的数据再也无法捞回,导致需求无法实现。所以建议不删除任何数据,以保留数据状态的形式,实现逻辑上的删除,即不统计当前状态为"删除"的数据。

5 实战演练
5.1 场景及需求

通过简单demo实现:

1、维度表更新 2、实时统计指标

5.1.1 源数据

姓名,年龄,身份,在校状态(1:存在:2:不存在)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
zhangsan,20,1,1
lisi,18,1,2
wangwu,30,2,2

5.1.2 维度表

身份维度表

代码语言:javascript
代码运行次数:0
运行
复制
5.2 流程设计
5.3 代码实现
5.3.1 创建mysql维度表并插入数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
create table test.dim_identity(name varchar(10),identity int);
INSERT INTO test.dim_identity(name, identity)VALUES('学生', 1);
INSERT INTO test.dim_identity(name, identity)VALUES('老师', 2);
5.3.2 将mysql维度表映射成FlinkSql中的表
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table dim_identity ;
create table dim_identity (
name string,
identity int
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.168.5.187:3306/test',
 'username' = 'root',
 'password' = '123456',
 'table-name' = 'dim_identity'
);
select * from dim_identity;
5.3.3 创建映射kafka的源数据表
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE ods_kafka (
    name string,
    age BIGINT,
    identity INT,
    status STRING,
    insertDate TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'stuLog',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
    'format' = 'csv'  -- 数据源格式为 csv,
);
select * from ods_kafka;
5.3.4 根据源数据表汇总成用户状态表

采用upsert的方式,以最新一条数据作为用户的状态

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table tds_user_status;
CREATE TABLE tds_user_status (
  name STRING,
  age bigint,
  identity INT,
  status STRING,
  insertDate TIMESTAMP(3) ,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'tdsResult',
  'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
  'key.format' = 'csv',
  'value.format' = 'csv'
);
INSERT INTO tds_user_status SELECT * FROM ods_kafka ;
select * from tds_user_status;
5.3.5 统计指标

统计以下指标:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
当前在校的老师数量
当前总共多少学生
学生占总数的比例
当前状态为在校占总数的比例

建FlinkSql表接收指标

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
drop table rpt_result;
CREATE TABLE rpt_result (
  inStuTeatherNum int,
  StudentNum int,
  StudengRate FLOAT,
  inStuRate FLOAT,
  countDate TIMESTAMP(3) METADATA FROM 'timestamp',
  PRIMARY KEY (countDate) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'rptResult',
  'properties.bootstrap.servers' = '192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092',  -- kafka broker 地址
  'key.format' = 'csv',
  'value.format' = 'csv'
);

开始统计:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO rpt_result 
SELECT 
sum(case when t2.name = '老师' and t1.status =1 then 1 else 0 end ) inStuTeatherNum
,sum(case when t2.name = '学生' then 1 else 0 end ) StudentNum
,sum(case when t2.name = '学生' then 1 else 0 end )/sum(1) StudengRate
,sum(case when t1.status = 1 then 1 else 0 end )/sum(1) inStuRate
FROM tds_user_status t1 
left join dim_identity t2
on t1.identity=t2.identity
;
select * from rpt_result
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
flink sql实战案例
断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。
chimchim
2022/11/13
1.1K0
flink sql实战案例
FlinkSQL实时计算Demo
Flink实时消费业务数据Demo Debezium监控MySQL用FlinkSQL实时消费 1、环境准备 ## 各组件版本 MySQL:5.7.21-log ## 开启binlog kafka_2.11-2.4.1 ## Kafka Flink:1.12.0 ## Flink_1.12.0官方推荐使用Kafka_2.4.1 Zookeeper:3.4.6 ## 所需组件下载地址 ## kafka_2.11-2.4.1.tgz 链接:https://pan.baidu.com/s/1-YUvHj8B10VG
用户8483969
2021/04/09
3.2K0
FlinkSQL 平台
由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x
shengjk1
2021/01/26
1.2K0
2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四
Apache Flink 1.12 Documentation: Table API & SQL
Lansonli
2021/10/11
3190
干货 | 五千字长文带你快速入门FlinkSQL
最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家带来关于FlinkSQL的教程,之后还会更新一些大数据实时数仓的内容,和一些热门的组件使用!希望小伙伴们能点个关注,第一时间关注技术干货!
大数据梦想家
2021/01/27
2.1K0
干货 | 五千字长文带你快速入门FlinkSQL
【Flink】第三篇:维表Join之版本表(2)
上一篇“【Flink】第二篇:维表Join之版本表”写的有些仓促,最后的结论部分总结的不够精炼,本篇对其进行进一步总结,并给出Demo的输出示例,目的在于探索Flink SQL 版本表join的一些设计规则。
章鱼carl
2022/03/31
1.2K0
快速了解Flink SQL Sink
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。
大数据老哥
2021/02/04
3.3K0
快速了解Flink SQL Sink
Cloudera流分析中引入FlinkSQL
由Apache Flink提供支持的Cloudera Streaming Analytics的1.2.0.0版本提供了广泛的新功能 ,包括通过Apache Atlas 支持血缘和元数据跟踪,支持连接到Apache Kudu 以及期待已久的FlinkSQL API 的第一次迭代。
大数据杂货铺
2020/07/14
6670
触宝科技基于Apache Hudi的流批一体架构实践
当前公司的大数据实时链路如下图,数据源是MySQL数据库,然后通过Binlog Query的方式消费或者直接客户端采集到Kafka,最终通过基于Spark/Flink实现的批流一体计算引擎处理,最后输出到下游对应的存储。
ApacheHudi
2021/07/05
1.2K0
触宝科技基于Apache Hudi的流批一体架构实践
【基于Flink的城市交通实时监控平台】需求四:车辆违法路径跟踪-使用FlinkSQL在Dlink写入HBase
通过Kafka发送模拟实时车辆JSON信息给Flink接收,FlinkSQL将实时车辆JSON信息与t_violation_list表中已经捕捉到的违规违章车辆做连接对比,选择出通过当前路段的违章记录车辆,将其存入HBase中。
火之高兴
2024/07/25
2020
【基于Flink的城市交通实时监控平台】需求四:车辆违法路径跟踪-使用FlinkSQL在Dlink写入HBase
【Flink】第二篇:维表Join之版本表
在数仓ETL中,事实表和维度表在维度码值之上做join、或者若干表之间进行join做数据打宽十分常见。数仓中的join本质上是以空间换时间,范式降低,以便后续olap数据分析之用。但是看似简单的join操作,一旦在Flink的流式语义中实现,做到实时Join就不是一件轻松的事了!
章鱼carl
2022/03/31
1.5K0
Flink kafka sink to RDBS 测试Demo
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。
小石头
2022/11/10
1.3K0
用户投稿 | Dinky 从保存点恢复 FlinkSQL 作业
导读:本文来自用户投稿,介绍了 Dinky 如何通过 SavePoint 来恢复 FlinkSQL 作业。
文末丶
2023/10/24
8650
用户投稿 | Dinky 从保存点恢复 FlinkSQL 作业
Flink + Debezium CDC 实现原理及代码实战
Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。
kk大数据
2020/12/29
8.6K0
Flink + Debezium CDC 实现原理及代码实战
Flink SQL 核心概念剖析与编程案例实战
本次,我们从 0 开始逐步剖析 Flink SQL 的来龙去脉以及核心概念,并附带完整的示例程序,希望对大家有帮助!
kk大数据
2021/01/12
7360
Flink SQL 核心概念剖析与编程案例实战
Upsert Kafka Connector - 让实时统计更简单
在某些场景中,例如读取 compacted topic 或者输出(更新)聚合结果的时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。为了实现该功能,社区为 Kafka 专门新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。
王知无-import_bigdata
2021/03/25
4.2K0
Flink新增特性 | CDC(Change Data Capture) 原理和实践应用
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。
王知无-import_bigdata
2020/12/08
4K0
Flink新增特性 | CDC(Change Data Capture) 原理和实践应用
Flink 1.11:更好用的流批一体 SQL 引擎
许多的数据科学家,分析师和 BI 用户依赖交互式 SQL 查询分析数据。Flink SQL 是 Flink 的核心模块之一。作为一个分布式的 SQL 查询引擎。Flink SQL 提供了各种异构数据源的联合查询。开发者可以很方便地在一个程序中通过 SQL 编写复杂的分析查询。通过 CBO 优化器、列式存储、和代码生成技术,Flink SQL 拥有非常高的查询效率。同时借助于 Flink runtime 良好的容错和扩展性,Flink SQL 可以轻松处理海量数据。
数据社
2020/07/14
1.8K0
Flink SQL DDL 和 窗口函数实战
2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位。
kk大数据
2019/12/19
5.3K0
Flink SQL DDL 和 窗口函数实战
Dinky 扩展 iceberg 的实践分享
摘要:本文介绍了 Dinky 实时计算平台扩展 iceberg 的实践分享。内容包括:
文末丶
2022/09/02
1.8K1
Dinky 扩展 iceberg 的实践分享
推荐阅读
相关推荐
flink sql实战案例
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验