消费 Demo-流处理

最近更新时间:2026-03-11 16:35:12

我的收藏
本文介绍使用腾讯云 Oceanus、Flink 两种流处理框架,自定义消费日志。

腾讯云 Oceanus

1. 打开 Oceanus 控制台,新建依赖,上传 flink-connector-cls-1.0.0.jar 包。

2. 新建 SQL 作业。注意 Flink 版本要选择 Flink-1.16,如下图所示。

3. 配置作业参数,选择步骤1中上传的 jar 包依赖,并确认,其它作业参数可以按需调整。

4. 
编辑建表语句。

CREATE TABLE `cls_source_table` (
`key1` STRING,
`key2` STRING,
`key3` STRING,
`key4` STRING,
`__TIMESTAMP__` BIGINT
) WITH (
'connector' = 'cls',
'region' = '地域,如 ap-guangzhou',
'logset_id' = '日志集 ID,仅支持一个日志集',
'topic_ids' = '日志主题 ID,多个主题请使用','隔开',
'access_id' = '您的Secret_id,请前往 腾讯云访问管理 CAM 获取。',
'access_key' = '您的Secret_key,请前往 腾讯云访问管理 CAM 获取。',
'consumer_group_name' = '消费者组名称,如flink-connector-cls-5bXXXXb2',
'offset_start_time' = 'begin',
'internal' = 'true',
'format' = 'json',
'scan.parallelism' = '3'
);
参数说明如下:
表字段要与 cls 侧的日志字段匹配方能生效,其中 __TIMESTAMP__ 为日志时间,可以按需使用,其它元数据字段如 __FILENAME__ 和 __SOURCE__ 以及 __TAG__ 等暂未加入。
connector 为必填项,需为 cls。
更多信息请参见 自定义消费者参数说明
注意:
目前 connector 本身支持基于 checkpoint 进行作业恢复。需要在每次停止时触发创建快照流程。
后续基于快照恢复后,可以基于 checkpoint 的点位继续消费,保证 Exactly Once 语义。如果异常故障宕掉场景下,checkpoint 未正常触发,后续基于之前的 checkpoint 进行恢复时可能会有数据重复,只保证 At Least Once 语义。

自建 Flink

1. 安装 Java 8版本 OpenJDK。(CentOS/RHEL 系统)
1.1 为避免冲突,请先检查系统是否已安装 Java。
java -version
若未安装,执行以下命令安装 OpenJDK 8。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
1.2 验证安装结果(关键步骤,确认版本正确)。
java -version
javac -version # 需确保 javac 也安装成功(devel 包包含编译器,Flink 需要)
2. 以 Flink-1.16.3版本为例,下载安装启动 Flink 集群。
2.1 下载 Flink,并配置权限。
# 1. 下载 Flink 1.16.3(scala 2.12版本,主流版本)
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz

# 2. 解压到指定目录(推荐/usr/local/)
sudo tar zxvf flink-1.16.3-bin-scala_2.12.tgz -C /usr/local/

# 3. 重命名目录(简化后续操作)
sudo mv /usr/local/flink-1.16.3 /usr/local/flink

# 4. 赋予普通用户操作权限(避免后续启动权限问题)
sudo chown -R $USER:$USER /usr/local/flink
2.2 启动 Flink 集群。
# 1. 进入 Flink 安装目录
cd /usr/local/flink

# 2. 启动集群(单节点:1个 JobManager + 1个 TaskManager)
./bin/start-cluster.sh
2.3 校验启动状态(核心步骤)。
方式1:启动后可以通过 JDK 自带的 jps 命令查看 Java 进程,可检查是否已启动 TaskManager 进程。
jps
正常输出应包含:
- StandaloneSessionClusterEntrypoint(JobManager 进程)
- TaskManagerRunner(TaskManager 进程)
- Jps(自身进程)

方式2:访问 Web UI(验证集群可访问)。
浏览器访问:http://服务器IP:8081,能看到 Flink 控制台,且 Task Managers 数量为1,则表示集群启动成功。
3. 启动 SQL client 来建表,进入 Flink SQL 命令行,编辑建表语句
3.1 进入 Flink 安装目录(确保路径正确)。
cd /usr/local/flink
3.2 启动 SQL Client(交互式命令行工具)。
./bin/sql-client.sh
启动成功后将展示如下内容:

4. 退出 Flink 集群。
./bin/stop-cluster.sh
5. 进入 Flink Web UI 查看任务执行情况:http://localhost:8081/#/overview