流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将为您详细介绍如何使用 Datagen Connector 模拟生成客户视频点击量数据,并利用滚动窗口函数对每分钟内客户的视频点击量进行聚合分析,最后将数据输出到 ClickHouse 的流程。
活动购买链接 1 元购买 Oceanus 集群。
进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,完成 Oceanus 集群的创建。具体可参考 Oceanus 官方文档创建独享集群[2]。
进入 ClickHouse 控制台[3],点击左上角【新建集群】,完成 ClickHouse 集群的创建。具体可参考 ClickHouse 快速入门[4]。
注意:创建 Oceanus 集群和 ClickHouse 集群时所选的 VPC 必须相同。
# 下载 ClickHouse-Client 命令
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86\_64/clickhouse-client-20.7.2.30-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86\_64/clickhouse-common-static-20.7.2.30-2.x86\_64.rpm
2. 安装客户端
rpm -ivh \*.rpm
3. 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看
clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
4. 登陆 ClickHouse 集群,建表。
CREATE TABLE default.datagen_to_ck on cluster default_cluster (
win_start TIMESTAMP,
win_end TIMESTAMP,
user_id String,
amount_total Int16,
Sign Int8 )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/datagen_to_ck', '{replica}',Sign)
ORDER BY (win_start,win_end,user_id);
CREATE TABLE random_source (
user_id VARCHAR,
amount INT,
pre_time AS CURRENT_TIMESTAMP,
WATERMARK FOR pre_time AS pre_time - INTERVAL '3' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒产生的数据条数
'fields.user_id.length' = '1', -- 随机字符串的长度
'fields.amount.kind' = 'random', -- 无界的随机数
'fields.amount.min' = '1', -- 随机数的最小值
'fields.amount.max' = '10' -- 随机数的最大值
);
CREATE TABLE clickhouse (
win_start TIMESTAMP(3),
win_end TIMESTAMP(3),
user_id VARCHAR,
amount_total BIGINT,
PRIMARY KEY (win_start,win_end,user_id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://10.0.0.178:8123',
--'username' = 'root', -- 如果ClickHouse集群未配置账号密码可以不指定
--'password' = 'root',
'database-name' = 'default',
'table-name' = 'datagen_to_ck',
'table.collapsing.field' = 'Sign' -- CollapsingMergeTree 类型列字段的名称
);
INSERT INTO clickhouse
SELECT
TUMBLE_START(pre_time,INTERVAL '1' MINUTE) AS win_start,
TUMBLE_END(pre_time,INTERVAL '1' MINUTE) AS win_end,
user_id,
CAST(SUM(amount) AS BIGINT) AS amount_total
FROM random_source
GROUP BY TUMBLE(pre_time,INTERVAL '1' MINUTE),user_id;
点击【作业参数】,在【内置 Connector】选择 flink-connector-clickhouse
,点击【保存】>【发布草稿】运行作业。
新版 Flink 1.13 集群不需要用户自己选择内置 Connector
本示例使用 datagen Connecor 模拟产生随机数据,使用 TUMBLE WINDOW(滚动窗口)统计各用户(user_id)每分钟的视频点击量(amount_total),然后将数据存储在 ClickHouse 中。 更多时间窗口函数示例请参考 Oceanus 官方文档 5。
1 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
2 创建独享集群:https://cloud.tencent.com/document/product/849/48298
3 ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou
4 ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824
5 Oceanus 窗口函数官方文档:https://cloud.tencent.com/document/product/849/18077
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有