Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >TDSQL-subscribe-connector最佳实践(上)

TDSQL-subscribe-connector最佳实践(上)

原创
作者头像
姚琦
修改于 2022-05-17 07:29:05
修改于 2022-05-17 07:29:05
9440
举报

本文介绍了如何在 Oceanus 平台使用 tdsql-subscribe-connector 1 ,从 TDSQL-MySQL 订阅任务 2 创建,到 Oceanus 作业创建、最终数据验证,实现全流程的操作指导。需要注意的是,本文默认已经创建 TDSQL-MySQL 实例和 Oceanus 集群,并且二者在同一 VPC 下或者不同 VPC 下但网络已经打通。

1-tdsql-subscribe-connector数据流程图.png
1-tdsql-subscribe-connector数据流程图.png

上述流程图简要说明了使用 tdsql-subscribe-connector 时,整个数据流向情况。TDSQL 的 binlog 数据,会通过订阅任务发送到 Kafka(这里的 Kafka 已经包含在订阅任务中,无需重新创建实例),然后 Oceanus 可以通过 tdsql-subscribe-connector 接入 Kafka 的数据,由于 Kafka 中的消息格式比较特殊,无法用常规 Kafka Connector 接入。

创建订阅任务

创建订阅任务可以参考 数据传输服务 TDSQL MySQL 数据订阅 3 ,在订阅任务创建过程中,需要选择订阅的对象,可以选择不同数据库下的不同表,或者同一数据库下的不同表,当订阅多个表的 binlog 时,多个表中的任意一个的数据变更都会发送到 Kafka ,前提是多个表的 Schema 信息必须是相同的。

例如,以下订阅任务中,就指定了同一个库下的多张表:

2-订阅任务-多表.png
2-订阅任务-多表.png

创建 Oceanus SQL 作业

创建 SQL 作业

目前 tdsql-subscribe-connector 仅支持在 SQL 作业中使用,JAR 作业暂时不支持;

流计算 Oceanus 控制台 4 的 作业管理 > 新建作业 中新建 SQL 作业,选择在新建的集群中新建作业。然后在作业的 开发调试 > 作业参数 中添加必要的 connector,tdsql-subscribe-connector 目前需要手动上传到依赖管理中,然后在作业参数里引用该 JAR 包,Connector 的 JAR 包文件可以联系腾讯云 Oceanus 团队获取;

3-SQL作业参数.png
3-SQL作业参数.png

创建 Source 端

代码语言:sql
AI代码解释
复制
CREATE TABLE `DataInput` (

      `id` INT,

      `name` VARCHAR

) WITH (

    'connector' = 'tdsql-subscribe',   -- 注意选择对应的内置  Connecto

    'topic' = 'topic-subs-xxx-tdsqlshard-xxx',  -- 替换为订阅任务消费的 Topic

    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种

    'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:321',  -- 替换为您的订阅任务 Kafka 连接地址

    'properties.group.id' = 'consumer-grp-subs-xxx-group_2', 

    'format' = 'protobuf', -- 只能是protobuf格式

    'properties.security.protocol'='SASL_PLAINTEXT', -- 认证协议

    'properties.sasl.mechanism'='SCRAM-SHA-512', -- 认证方式

    'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx!";' --用户名和密码

);

正常情况下,以上的 Source 端参数,出了字段定义外,WITH 参数中需要根据具体订阅任务填写;这里列出 Source 端的相关配置项在订阅任务的具体位置:

  • topic 数据订阅 > 查看订阅详情 > 订阅信息
  • properties.bootstrap.servers 数据订阅 > 查看订阅详情 > 订阅信息
  • properties.group.id 数据订阅 > 查看订阅详情 > 消费管理
  • properties.sasl.jaas.config 只需要替换 username 和 password 数据订阅 > 查看订阅详情 > 消费管理

创建 Sink 端

代码语言:sql
AI代码解释
复制
-- Logger Sink 可以将输出数据打印到 TaskManager 的日志中  
-- 程序包下载地址:https://github.com/tencentyun/flink-hello-world/releases  
-- 需要先在【程序包管理】中上传该程序包,然后在【作业参数】中引用它  
-- 参见 https://cloud.tencent.com/document/product/849/58713

CREATE TABLE logger_sink_table ( 

    id INT PRIMARY KEY NOT ENFORCED,  

    name STRING 

) WITH ( 

    'connector' = 'logger',

    'print-identifier' = 'DebugData'

);

为了验证方便,这里 Sink 端采用了 Logger 5 ,可以把数据打印到日志文件中,在使用 Logger Connector 前,同样需要下载相关的 JAR ,上传到依赖管理,然后在作业参数中引用;

同时,为了更好地验证日志中数据打印情况,推荐使用 CLS ,可以更方便地在作业控制台查看作业运行日志;

4-作业参数-CLS.png
4-作业参数-CLS.png

算子操作

代码语言:sql
AI代码解释
复制
INSERT INTO logger_sink_table SELECT * FROM DataInput;

最后,把 Source 端数据插入到 Sink 端;

结果验证

完成 SQL 作业开发后,发布草稿 > 运行作业 ,然后可以在 Source 表中修改或者新增一些数据:

代码语言:sql
AI代码解释
复制
UPDATE `source_table11` SET `name`='test' WHERE `id`=300001;

INSERT INTO `source_table11` (`id`, `name`) VALUES (6000000, 'test');

DELETE FROM source_table11 WHERE id = 6000000

观察 taskmanager 的日志,可以看到 logger 打印出对应的 RowData 信息:

代码语言:text
AI代码解释
复制
DebugData-toString: +U(300001,test)

DebugData-toString: +I(6000000,test)

DebugData-toString: -D(6000000,test)

注意事项

  1. TDSQL-MySQL 和 Oceanus 的 VPC 需要连通或者使用同一 VPC;
  2. 使用 tdsql-subscribe-connector 前,需要构建数据订阅任务;
  3. tdsql-subscribe-connector 目前只支持增量阶段,没有全量阶段;
  4. 当订阅任务指定了多个表时,多个表的 Schema 需要保持一致;

参考链接

1 tdsql-subscribe-connector: https://cloud.tencent.com/document/product/849/71448

2 订阅任务: https://cloud.tencent.com/document/product/571/68060

3 数据传输服务 TDSQL MySQL 数据订阅: https://cloud.tencent.com/document/product/571/68060

4 流计算 Oceanus 控制台: https://console.cloud.tencent.com/oceanus

5 Logger: https://cloud.tencent.com/document/product/849/58713

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink 实践教程-进阶(2):复杂格式数据抽取
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过数据抽取、平铺转换后存入 MySQL 中。 前置准备 创建流计算 Oceanus
腾讯云大数据
2021/12/06
8250
Flink 实践教程:进阶3-窗口操作
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/12/21
6961
Flink 实践教程:进阶3-窗口操作
Flink 实践教程:进阶11-SQL 关联:Regular Join
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2022/03/28
1K5
Flink 实践教程:进阶11-SQL 关联:Regular Join
Flink 实践教程:入门(3):读取 MySQL 数据
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何取 MySQL 数据,经过流计算 Oceanus 实时计算引擎分析,输出数据到日志(Logger Sink)当中。 前置准备 创建 流计算
腾讯云大数据
2021/11/01
1.2K0
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/12
1.6K0
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
Flink 实践教程:进阶4-窗口 TOP N
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/12/21
1.1K1
Flink 实践教程:进阶4-窗口 TOP N
Flink 实践教程:进阶7-基础运维
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2022/01/20
2.7K1
Flink 实践教程:进阶7-基础运维
Flink 实践教程:入门(2):写入 Elasticsearch
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。 前置准备 创建
腾讯云大数据
2021/11/01
6230
Oceanus 实践-从0到1接入 CKafka SQL 作业
流计算 Oceanus 是位于云端的流式数据汇聚、计算服务。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。
于乐
2021/08/25
8740
Flink 实践教程:进阶5-乱序调整
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/12/28
7202
Flink 实践教程:进阶5-乱序调整
Flink 实践教程:入门3-读取 MySQL 数据
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/03
2K0
Flink 实践教程:入门3-读取 MySQL 数据
Flink 实践教程:进阶2-复杂格式数据抽取
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/12/04
1.2K0
Flink 实践教程:进阶2-复杂格式数据抽取
Oceanus实践-从0到1开发PG SQL作业
在Oceanus控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
吴云涛
2021/07/21
8460
Oceanus实践-从0到1开发PG SQL作业
Flink 实践教程-入门(4):读取 MySQL 数据写入到 ES
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何使用 MySQL 接入数据,经过流计算 Oceanus 对数据进行处理分析(示例中采用小写转换函数对name字段进行了小写转换),最终将处
腾讯云大数据
2021/11/09
1.4K0
Flink 实践教程-入门(5):写入 ClickHouse
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何使用 Datagen Connector 模拟生成客户视频点击量数据,并利用滚动窗口函数对每分钟内客户的视频点击量进行聚合分析,最后将
腾讯云大数据
2021/11/09
9140
实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控
---- 作者:吴云涛,腾讯 CSIG 高级工程师 本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告警信息,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。 一、解决方案描述 (一)概述 本方案结合腾讯云 CKafka、流计算 Oceanus (Flink)、 Elasticsearch、Promethe
腾讯技术工程官方号
2021/10/14
2.4K0
HiveCatalog 介绍与使用
作者:苏文鹏,腾讯 CSIG 工程师 一、背景 Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和 ETL 场景的 SQL 引擎,同样它也是一个数据管理平台,可用于发现、定义和演化数据。Flink 与 Hive 的集成包含两个层面: 一是利用了 Hive 的 Metastore 作为持久化的 Catalog,用户可通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,用户可以使用 HiveCatalog 将其
腾讯云大数据
2022/07/20
1.3K0
HiveCatalog 介绍与使用
Oceanus 实践-从0到1开发PG SQL作业
实时即未来,最近在腾讯云流计算 Oceanus 进行 Flink 实时计算服务,分享给大家~
吴云涛
2021/08/09
1.1K0
Oceanus 实践-从0到1开发PG SQL作业
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/06
1.5K1
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
Flink 实践教程:入门5-写入 ClickHouse
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/06
2K0
Flink 实践教程:入门5-写入 ClickHouse
推荐阅读
相关推荐
Flink 实践教程-进阶(2):复杂格式数据抽取
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档