首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Upsert Kafka Connector - 让实时统计更简单

Upsert Kafka Connector - 让实时统计更简单

作者头像
王知无-import_bigdata
发布于 2021-03-25 13:15:04
发布于 2021-03-25 13:15:04
4.2K00
代码可运行
举报
运行总次数:0
代码可运行

在某些场景中,例如读取 compacted topic 或者输出(更新)聚合结果的时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。为了实现该功能,社区为 Kafka 专门新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。

要使用 upsert-kafka connector,必须在创建表时定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。

一、Upsert Kafka Connector是什么?

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

如果是更新,则同一个key会存储多条数据,但在读取该表数据时,只保留最后一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 消息写入(key被打上墓碑标记,表示对应 key 的消息被删除)。

Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

upsert-kafka connector相关参数

connector

必选。指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'。

topic 必选。用于读取和写入的 Kafka topic 名称。

properties.bootstrap.servers 必选。以逗号分隔的 Kafka brokers 列表。

key.format 必选。用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv'、'json'、'avro'。

value.format 必选。用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv'、'json'、'avro'。

properties 可选。该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。但是,某些选项,例如'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

value.fields-include 可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。

key.fields-prefix 可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY。

二、使用步骤

1.引入库

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        <!-- Flink kafka connector: kafka版本大于1.0.0可以直接使用通用的连接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.12.0</version>
            <scope>provided</scope>
        </dependency>

2.SQL计算

示例:实时地统计网页PV和UV的总量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 创建kafka数据源表(json格式)
-- 'format.type' = 'json',                   -- required: specify the format type
-- 'format.fail-on-missing-field' = 'true',  -- optional: flag whether to fail if a field is missing or not,'false' by default
-- 'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;

CREATE TABLE source_ods_fact_user_ippv (
    user_id      STRING,
    client_ip    STRING,     
    client_info  STRING,     
    pagecode     STRING,     
    access_time  TIMESTAMP,  
    dt           STRING,     
    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  -- 定义watermark
) WITH (
   'connector' = 'kafka',
    'topic' = 'user_ippv',
    'scan.startup.mode' = 'earliest-offset',
    'properties.group.id' = 'group1',
    'properties.bootstrap.servers' = 'xxx:9092', 
    'format' = 'json', 
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- 创建kafka upsert结果表且指定组合主键为:do_date,do_min
CREATE TABLE result_total_pvuv_min (
    do_date     STRING,     -- 统计日期
    do_min      STRING,      -- 统计分钟
    pv          BIGINT,     -- 点击量
    uv          BIGINT,     -- 一天内同个访客多次访问仅计算一个UV
    currenttime TIMESTAMP,  -- 当前时间
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min',
  'properties.bootstrap.servers' = 'xxx:9092',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false',
  'key.format' = 'json',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);
-- 创建视图
CREATE VIEW view_total_pvuv_min AS
SELECT
     dt AS do_date,                    -- 时间分区
     count (client_ip) AS pv,          -- 客户端的IP
     count (DISTINCT client_ip) AS uv, -- 客户端去重
     max(access_time) AS access_time   -- 请求的时间
FROM
    source_ods_fact_user_ippv
GROUP BY dt;


-- 将每分钟的pv/uv统计结果写入kafka upsert表
INSERT INTO result_total_pvuv_min
SELECT
  do_date,   
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime
from
  view_total_pvuv_min;

该处使用示例数据和验证结果如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafak 数据源:
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:32:24","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-23 11:32:55","dt":"2021-01-08"}
{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031",   "access_time":"2021-01-23 11:32:59","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-23 11:33:24","dt":"2021-01-08"}
{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001",   "access_time":"2021-01-23 11:33:30","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:34:24","dt":"2021-01-08"}


实时统计的结果表(TOPIC:result_total_pvuv_min){"do_date":"2021-01-08","do_min":"11:32","pv":1,"uv":1,"currenttime":"2021-01-23 08:22:06.431"}
{"do_date":"2021-01-08","do_min":"11:32","pv":2,"uv":1,"currenttime":"2021-01-23 08:22:06.526"}
{"do_date":"2021-01-08","do_min":"11:32","pv":3,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
{"do_date":"2021-01-08","do_min":"11:33","pv":4,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
{"do_date":"2021-01-08","do_min":"11:33","pv":5,"uv":3,"currenttime":"2021-01-23 08:22:06.528"}
{"do_date":"2021-01-08","do_min":"11:34","pv":6,"uv":3,"currenttime":"2021-01-23 08:22:06.529"}


----------------分割线--------------------

重测试输入如下示例数据:
{"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:10:24","dt":"2021-01-22"}
{"user_id":"11","client_ip":"192.168.12.2","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:10:24","dt":"2021-01-22"}
{"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:11:24","dt":"2021-01-22"}
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:12:14","dt":"2021-01-22"}


打印待更新结果:
+----+--------------------------------+--------------------------------+----------------------+----------------------+-----------------------+
| op |                        do_date |                         do_min |                   pv |                   uv |           currenttime |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-----------------------+
| +I |                     2021-01-22 |                          10:10 |                    1 |                    1 | 2021-01-23T08:33:2... |
| -U |                     2021-01-22 |                          10:10 |                    1 |                    1 | 2021-01-23T08:33:2... |
| +U |                     2021-01-22 |                          11:10 |                    2 |                    2 | 2021-01-23T08:33:2... |
| -U |                     2021-01-22 |                          11:10 |                    2 |                    2 | 2021-01-23T08:33:2... |
| +U |                     2021-01-22 |                          11:10 |                    3 |                    2 | 2021-01-23T08:33:2... |
| -U |                     2021-01-22 |                          11:10 |                    3 |                    2 | 2021-01-23T08:33:3... |
| +U |                     2021-01-22 |                          11:12 |                    4 |                    3 | 2021-01-23T08:33:3... |

3. Kafka -> FLINK -> TIDB

Flink on TIDB 在当前已经有小红书、贝壳金服等在使用,作为一个支持upsert的实时数据同步方案具备一定的可行性。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
select version(); -- 5.7.25-TiDB-v4.0.8
drop table if exists result_user_behavior;
CREATE TABLE `result_user_behavior` (
  `user_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `client_ip` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `client_info` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `page_code` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `access_time` TIMESTAMP COLLATE utf8mb4_general_ci DEFAULT NULL,
  `dt`varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 支持upsert的一种可行数据同步方案
tenv.executeSql("CREATE TABLE source_kafka_user_behavior (\n" +
          "    user_id      INT,\n" +
          "    client_ip    STRING,     \n" +
          "    client_info  STRING,     \n" +
          "    page_code     STRING,     \n" +
          "    access_time  TIMESTAMP,  \n" +
          "    dt           STRING,     \n" +
          "    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND \n" +
          ") WITH (\n" +
          "   'connector' = 'kafka',\n" +
          "    'topic' = 'user_ippv',\n" +
          "    'scan.startup.mode' = 'latest-offset',\n" +
          "    'properties.group.id' = 'test-group1',\n" +
          "    'properties.bootstrap.servers' = 'xx:9092', \n" +
          "    'format' = 'json', \n" +
          "    'json.fail-on-missing-field' = 'false',\n" +
          "    'json.ignore-parse-errors' = 'true'\n" +
          ")").print();

tenv.executeSql("CREATE TABLE sink_upsert_tidb (\n" +
      "    user_id      INT,\n" +
      "    client_ip    STRING,     \n" +
      "    client_info  STRING,     \n" +
      "    page_code     STRING,     \n" +
      "    access_time  TIMESTAMP,  \n" +
      "    dt           STRING,     \n" +
      "    PRIMARY KEY (user_id) NOT ENFORCED" +
      ") WITH (\n" +
      "  'connector' = 'jdbc',\n" +
      "  'url' = 'jdbc:mysql://xxx:4000/bi',\n" +
      "  'username' = 'bi_rw',\n" +
      "  'password' = 'xxx',\n" +
      "  'table-name' = 'result_user_behavior'\n" +
     ")");


tenv.executeSql("insert into sink_upsert_tidb" +
         " select " +
         "    user_id     ,\n" +
         "    client_ip   ,     \n" +
         "    client_info ,     \n" +
         "    page_code    ,     \n" +
         "    access_time ,  \n" +
         "    dt               \n" +
         "from source_kafka_user_behavior").print();

测试输入:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
测试数据:
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1002","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1003","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
{"user_id":"11"} -- 值全部置空
{"user_id":"11","client_ip":"192.168.12.4","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}
{"user_id":"12","client_ip":"192.168.12.5","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}

Tidb查询结果示例:

总结

这里演示了使用kaka作为source和sink的使用示例,其中我们把从kafka source中消费的数据进行视图查询的时候则显示以上更新结果,每一条以统计日期和统计分钟作为联合主键的数据插入都会被解析为+I(插入)-U(标记待删除值) +U (更新新值),这样在最新的result_total_pvuv_min 的kafka upsert 结果表中就是最新的数据。

当前kafka-upsert connector 适用于Flink-1.12的版本,作为一个数据聚合的中转对于很多业务场景有一定的普适性,比如kafka upsert结果表还可以作为维表join, 或者通过flink sink 到HDFS, iceberg table等进行离线分析。

如果想真正实时,Flink+Tidb就是一个很好的解决方案。虽然Tidb存储和计算不分离,但是能使用加机器解决的问题,性能都不是事,况且Tidb完全兼容MySQL语法,非常适合MySQL平迁,而且支持事务,和使用MySQL没有什么特别大的区别,

官方已出TiSpark查询引擎,虽还未实测性能,但想必会比MySQL 引擎查询的效率要高。我司也开始着手Tidb的使用,目前的实时的任务是基于微批的形式处理,还不能算是完全的实时,后面随着对其的了解原来越完善,完全实时化则指日可待。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.5K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
指标统计:基于流计算 Oceanus (Flink) 实现实时UVPV统计
最近梳理了一下如何用 Flink 来实现实时的 UV、PV 指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用 Flink SQL 来 实现这些指标的统计会更加便捷。
吴云涛
2021/09/10
3.3K1
指标统计:基于流计算 Oceanus (Flink) 实现实时UVPV统计
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。
公众号:大数据羊说
2022/04/04
3.5K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
流数据湖平台Apache Paimon(二)集成 Flink 引擎
Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。
Maynor
2023/07/31
3.4K0
流数据湖平台Apache Paimon(二)集成 Flink 引擎
flink sql实战案例
断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。
chimchim
2022/11/13
1.1K0
flink sql实战案例
项目实践|基于Flink的用户行为日志分析系统
用户行为日志分析是实时数据处理很常见的一个应用场景,比如常见的PV、UV统计。本文将基于Flink从0到1构建一个用户行为日志分析系统,包括架构设计与代码实现。本文分享将完整呈现日志分析系统的数据处理链路,通过本文,你可以了解到:
Spark学习技巧
2020/09/08
2.4K0
项目实践|基于Flink的用户行为日志分析系统
指标统计:基于流计算Oceanus(Flink) 实现实时UVPV统计
导语 | 最近梳理了一下如何用Flink来实现实时的UV、PV指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用Flink SQL来实现这些指标的统计会更加便捷。 一、解决方案描述 (一)概述 本方案结合本地自建Kafka集群、腾讯云流计算Oceanus(Flink)、云数据库Redis对博客、购物等网站UV、PV指标进行实时可视化分析。分析指标包含网站的独立访客数量(UV)、产品的点击量(PV)、转化率(转化率=成交次数/点击量)等。 相关概念介绍: UV(Unique
腾讯云开发者
2021/10/22
1.2K0
K8S部署Kafka集群 - 部署笔记
Kafka和zookeeper是两种典型的有状态的应用集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息;其次kafka和zookeeper每一个实例都需要有对应的实例Id (Kafka需broker.id, zookeeper需要my.id) 来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。
洗尽了浮华
2021/03/03
6.1K0
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
公众号:大数据羊说
2022/07/07
2.6K0
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
技术干货|如何利用 ChunJun 实现数据实时同步?
实时同步是 ChunJun 的⼀个重要特性,指在数据同步过程中,数据源与⽬标系统之间的数据传输和更新⼏乎在同⼀时间进⾏。
袋鼠云数栈
2023/04/24
2.3K0
Flink SQL 知其所以然(二十六):万字详述 Flink SQL 4 种时间窗口语义!(收藏)
大家好我是老羊,由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走。思路如下:
公众号:大数据羊说
2022/07/07
3.7K1
Flink SQL 知其所以然(二十六):万字详述 Flink SQL 4 种时间窗口语义!(收藏)
指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计
作者:吴云涛,腾讯 CSIG 高级工程师 导语 | 最近梳理了一下如何用 Flink 来实现实时的 UV、PV 指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用 Flink SQL 来 实现这些指标的统计会更加便捷。 一 解决方案描述 1.1 概述 本方案结合本地自建 Kafka 集群、腾讯云流计算 Oceanus(Flink)、云数据库 Redis 对博客、购物等网站 UV、PV 指标进行实时可视化分析。分析指标包含网站的独立访客数量(UV )、产品的点击量(PV)、转化率(
腾讯云大数据
2021/10/21
1.1K0
如何用SQL实现用户行为漏斗分析
1 每日活跃设备明细 dwd_start_log--->dws_uv_detail_day
大数据学习与分享
2022/05/19
2.3K0
如何用SQL实现用户行为漏斗分析
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
北京理工大学硕士毕业,2015 年加入阿里巴巴,参与阿里巴巴实时计算引擎 JStorm 的开发与设计。2016 年开始从事阿里新一代实时计算引擎 Blink SQL 的开发与优化,并活跃于 Flink 社区,于2017年2月成为ApacheFlink Committer,是国内早期 Flink Committer 之一。目前主要专注于分布式处理和实时计算,热爱开源,热爱分享。
小晨说数据
2019/09/19
5.4K0
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
Doris数据模型
列可以分为两大类:Key 和 Value。从业务角度看,Key 和 Value 可以分别对应维度列和指标列。
程裕强
2021/09/08
1.9K0
一次成功的FlinkSQL功能测试及实战演练
前言:Flink在国内的占有率逐步提升的情况下,各项组件的功能与稳定性也得到逐步提升。为了解决目前已有的复杂需求,尝试研究flinksql的特性与功能,作为是否引入该组件的依据。同时尝试将现有需求通过简单demo的形式进行测试。本次测试主要集中在Kafka、mysql、Impala三个组件上,同时将结合官方文档进行:
大数据真好玩
2021/05/28
2.8K0
一次成功的FlinkSQL功能测试及实战演练
有赞实时计算 Flink 1.13 升级实践
随着有赞实时计算业务场景全部以Flink SQL的方式接入,对有赞现有的引擎版本—Flink 1.10的SQL能力提出了越来越多无法满足的需求以及可以优化的功能点。目前有赞的Flink SQL是在Yarn上运行,但是在公司应用容器化的背景下,可以统一使用公司K8S资源池,同时考虑到任务之间的隔离性以及任务的弹性调度,Flink SQL任务K8S化是必须进行的,所以我们也希望通过这次升级直接利社区的on K8S能力,直接将FlinkSQL集群迁移到K8S上。特别是社区在Flink 1.13中on Native K8S能力的支持完善,为了紧跟社区同时提升有赞实时计算引擎的能力,经过一些列调研,我们决定将有赞实时计算引擎由Flink 1.10升级到Flink 1.13.2。
有赞coder
2021/12/20
1.6K0
有赞实时计算 Flink 1.13 升级实践
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:
用户1148526
2024/03/21
6450
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
用 Apache NiFi、Kafka和 Flink SQL 做股票智能分析
本文是关于如何在实时分析中使用云原生应用程序对股票数据进行连续 SQL 操作的教程。
吴云涛
2021/11/15
4.1K0
用 Apache NiFi、Kafka和 Flink SQL 做股票智能分析
Flink SQL Kafka Connector
Kafka Connector 提供了从 Kafka topic 中消费和写入数据的能力。
smartsi
2022/02/22
5.8K0
Flink SQL Kafka Connector
推荐阅读
相关推荐
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档