首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Oceanus 实践-消费 CMQ 主题模型数据源

Oceanus 实践-消费 CMQ 主题模型数据源

作者头像
吴云涛
修改于 2021-09-30 08:48:53
修改于 2021-09-30 08:48:53
1.7K0
举报

实时即未来,最近在腾讯云流计算 Oceanus 进行Flink实时计算服务,以下为 Flink 消费腾讯云 CMQ 的数据实践。原文自Raigor,已获得授权,分享给大家~

Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。

1. 环境搭建

1.1 创建 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过VPC,日志,存储这些组件,需要先进行创建。

创建完后的集群如下:

Oceanus 集群
Oceanus 集群

1.2 新建 CMQ 主题

在 CMQ 控制台的【主题订阅】-> 【新建】主题,输入主题名,其他保持默认值即可。新建的主题如下:

CMQ 主题
CMQ 主题

1.3 新建 CMQ 队列

在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:

CMQ 队列
CMQ 队列

1.4 新建 CMQ 主题订阅

在 CMQ 主题列表页,点击主题操作列的【订阅】链接,进入【订阅者】列表,新建订阅,输入订阅名,终端类型选择 Queue 队列服务,订阅地址选择cs2的队列,其他保持默认值。新建的订阅者如下:

CMQ 主题订阅
CMQ 主题订阅

2. 作业创建

2.1 创建 SQL 作业

在 Oceanus 控制台【作业管理】->【新建作业】-> SQL作业,选择刚刚新建的集群创建作业。然后在作业的【开发调试】->【作业参数】里面添加必要的connector cmq-1.1.1。

SQL 作业
SQL 作业

2.2 创建数据源表和目的表

在作业的【开发调试】->【插入模板】选择 CMQ 读取 & 写入的模板,并添加。修改参数queue、secret-id、secret-key。

注意:强烈建议使用具有最小权限的secret-id和secret-key,并注意保密,防止泄漏带来的安全风险。

CMQ 读取 & 写入
CMQ 读取 & 写入
代码语言:txt
AI代码解释
复制
CREATE TABLE `CMQSourceTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'cs2',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

CREATE TABLE `CMQSinkTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'sink_queue',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

insert into CMQSinkTable select * from CMQSourceTable;

2.3 算子操作

这里只做最简单的数据插入。

代码语言:txt
AI代码解释
复制
insert into CMQSinkTable select *from CMQSourceTable;

3. 验证总结

在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。

发送主题消息
发送主题消息
接收队列消息
接收队列消息

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Oceanus 实践-消费 CMQ 主题模型数据源
Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。
Raigor
2021/08/06
1K0
Oceanus 实践-消费 CMQ 主题模型数据源
基于流计算 Oceanus 和 Elasticsearch 构建日志分析系统
实时即未来,最近在腾讯云流计算 Oceanus(Flink)进行实时计算服务,以下为MySQL 到 Flink 进行处理分析,再存储到ES的实践。分享给大家~
吴云涛
2021/08/09
1.1K0
基于流计算 Oceanus 和 Elasticsearch 构建日志分析系统
Oceanus实践-从0到1开发MySQL-cdc到ES SQL作业
实时即未来,最近在腾讯云Oceanus进行实时计算服务,以下为mysql到flink到ES实践。分享给大家~
吴云涛
2021/08/04
9910
Oceanus实践-从0到1开发MySQL-cdc到ES SQL作业
Oceanus实践-从0到1开发PG SQL作业
在Oceanus控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
吴云涛
2021/07/21
8730
Oceanus实践-从0到1开发PG SQL作业
消息队列 CMQ 七大功能实践案例
本文先简单介绍 CMQ 底层的架构实现,然后着重结合CMQ的功能特点来介绍 CMQ 的实践案例,让大家快速理解和上手 CMQ 的开发。
serena
2018/01/15
4.3K0
消息队列 CMQ 七大功能实践案例
Flink 最佳实践:TDSQL Connector 的使用(上)
作者:姚琦,腾讯 CSIG 工程师 本文介绍了如何在 Oceanus 平台使用 tdsql-subscribe-connector [1] ,从 TDSQL-MySQL 订阅任务 [2] 创建,到 Oceanus 作业创建、最终数据验证,实现全流程的操作指导。需要注意的是,本文默认已经创建 TDSQL-MySQL 实例和 Oceanus 集群,并且二者在同一 VPC 下或者不同 VPC 下但网络已经打通。 上述流程图简要说明了使用 tdsql-subscribe-connector 时,整个数据流向情况。
腾讯云大数据
2022/04/22
1K0
Flink 最佳实践:TDSQL Connector 的使用(上)
CKafka系列学习文章 - 对比RabbitMQ、RocketMQ、TDMQ-CMQ、kafka和Ckafka(二)
导语:上一章我们聊到了:什么是消息队列,为什么要用消息队列,有那些消息队列?下来我们聊聊什么样的消息队列适合我们公司。
发哥说消息队列
2019/08/22
5.2K0
Oceanus 实践-从0到1开发ClickHouse SQL作业
在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
吴云涛
2021/08/09
9050
Oceanus 实践-从0到1开发ClickHouse SQL作业
Oceanus 实践-从0到1开发PG SQL作业
实时即未来,最近在腾讯云流计算 Oceanus 进行 Flink 实时计算服务,分享给大家~
吴云涛
2021/08/09
1.1K0
Oceanus 实践-从0到1开发PG SQL作业
如何借助 Layer 实现云函数快速打包轻松部署
由于云函数在创建或更新时,需要将函数的业务代码,和依赖库一同打包上传,因此在本地开发时,也经常是将依赖库和业务代码放置在一个文件夹下。
腾讯云serverless团队
2020/05/20
2.2K0
Flink 实践教程-入门(4):读取 MySQL 数据写入到 ES
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何使用 MySQL 接入数据,经过流计算 Oceanus 对数据进行处理分析(示例中采用小写转换函数对name字段进行了小写转换),最终将处
腾讯云大数据
2021/11/09
1.4K0
Flink 实践教程:进阶11-SQL 关联:Regular Join
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2022/03/28
1.1K5
Flink 实践教程:进阶11-SQL 关联:Regular Join
最佳实践:MySQL CDC 同步数据到 ES
作者:于乐,腾讯 CSIG 工程师 一、 方案描述 1.1 概述 在线教育是一种利用大数据、人工智能等新型互联网技术与传统教育行业相结合的新型教育方式。发展在线教育可以更好的构建网络化、数字化、个性化、终生化的教育体系,有利于构建“人人皆学、处处能学、实时可学”的学习型社会。 本文针对某知名在线教育平台在腾讯云流计算 Oceanus 的业务案例,介绍了其中可能存在的一些性能问题,并针对这种问题进行了参数调优相关的介绍。 1.2 方案架构 某知名在线教育平台在流计算 Oceanus 上主要有两个业务应用场景
腾讯云大数据
2022/06/24
4K0
最佳实践:MySQL CDC 同步数据到 ES
视频直播:基于流计算 Oceanus(Flink) 的实时大屏分析
本方案结合腾讯云 CKafka、流计算 Oceanus、私有网络 VPC、商业智能分析BI等,对视频直播行业数字化运营进行实时可视化分析。分析指标包含观看直播人员的地区分布、各级别会员统计、各模块打赏礼物情况、在线人数等。
吴云涛
2021/08/17
2.7K0
视频直播:基于流计算 Oceanus(Flink) 的实时大屏分析
Flink 实践教程:入门(2):写入 Elasticsearch
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。 前置准备 创建
腾讯云大数据
2021/11/01
6490
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/06
1.6K1
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/11/12
1.7K0
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
实时监控:基于流计算 Oceanus(Flink) 实现系统和应用级实时监控
本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告警信息,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。
吴云涛
2021/09/09
6.7K3
实时监控:基于流计算 Oceanus(Flink) 实现系统和应用级实时监控
Flink 实践教程:进阶4-窗口 TOP N
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
吴云涛
2021/12/21
1.1K1
Flink 实践教程:进阶4-窗口 TOP N
基于流计算 Oceanus 和 Elasticsearch Service 实现实时监控系统
本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU 和内存等资源消耗数据,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。
于乐
2021/11/18
1.2K0
基于流计算 Oceanus 和 Elasticsearch Service 实现实时监控系统
推荐阅读
相关推荐
Oceanus 实践-消费 CMQ 主题模型数据源
更多 >
交个朋友
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档