本文介绍使用腾讯云 Oceanus、Flink 两种流处理框架,自定义消费日志。
腾讯云 Oceanus
1. 打开 Oceanus 控制台,新建依赖,上传 flink-connector-cls-1.0.0.jar 包。

2. 新建 SQL 作业。注意 Flink 版本要选择 Flink-1.16,如下图所示。

3. 配置作业参数,选择步骤1中上传的 jar 包依赖,并确认,其它作业参数可以按需调整。

4.
编辑建表语句。
CREATE TABLE `cls_source_table` (`key1` STRING,`key2` STRING,`key3` STRING,`key4` STRING,`__TIMESTAMP__` BIGINT) WITH ('connector' = 'cls','region' = '地域,如 ap-guangzhou','logset_id' = '日志集 ID,仅支持一个日志集','topic_ids' = '日志主题 ID,多个主题请使用','隔开','access_id' = '您的Secret_id,请前往 腾讯云访问管理 CAM 获取。','access_key' = '您的Secret_key,请前往 腾讯云访问管理 CAM 获取。','consumer_group_name' = '消费者组名称,如flink-connector-cls-5bXXXXb2','offset_start_time' = 'begin','internal' = 'true','format' = 'json','scan.parallelism' = '3');
参数说明如下:
表字段要与 cls 侧的日志字段匹配方能生效,其中 __TIMESTAMP__ 为日志时间,可以按需使用,其它元数据字段如 __FILENAME__ 和 __SOURCE__ 以及 __TAG__ 等暂未加入。
connector 为必填项,需为 cls。
更多信息请参见 自定义消费者参数说明。
注意:
目前 connector 本身支持基于 checkpoint 进行作业恢复。需要在每次停止时触发创建快照流程。
后续基于快照恢复后,可以基于 checkpoint 的点位继续消费,保证 Exactly Once 语义。如果异常故障宕掉场景下,checkpoint 未正常触发,后续基于之前的 checkpoint 进行恢复时可能会有数据重复,只保证 At Least Once 语义。
自建 Flink
1. 安装 Java 8版本 OpenJDK。(CentOS/RHEL 系统)
1.1 为避免冲突,请先检查系统是否已安装 Java。
java -version
若未安装,执行以下命令安装 OpenJDK 8。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
1.2 验证安装结果(关键步骤,确认版本正确)。
java -versionjavac -version # 需确保 javac 也安装成功(devel 包包含编译器,Flink 需要)
2. 以 Flink-1.16.3版本为例,下载安装启动 Flink 集群。
2.1 下载 Flink,并配置权限。
# 1. 下载 Flink 1.16.3(scala 2.12版本,主流版本)wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz# 2. 解压到指定目录(推荐/usr/local/)sudo tar zxvf flink-1.16.3-bin-scala_2.12.tgz -C /usr/local/# 3. 重命名目录(简化后续操作)sudo mv /usr/local/flink-1.16.3 /usr/local/flink# 4. 赋予普通用户操作权限(避免后续启动权限问题)sudo chown -R $USER:$USER /usr/local/flink
2.2 启动 Flink 集群。
# 1. 进入 Flink 安装目录cd /usr/local/flink# 2. 启动集群(单节点:1个 JobManager + 1个 TaskManager)./bin/start-cluster.sh
2.3 校验启动状态(核心步骤)。
方式1:启动后可以通过 JDK 自带的 jps 命令查看 Java 进程,可检查是否已启动 TaskManager 进程。
jps
正常输出应包含:
- StandaloneSessionClusterEntrypoint(JobManager 进程)- TaskManagerRunner(TaskManager 进程)- Jps(自身进程)
方式2:访问 Web UI(验证集群可访问)。
浏览器访问:
http://服务器IP:8081,能看到 Flink 控制台,且 Task Managers 数量为1,则表示集群启动成功。3. 启动 SQL client 来建表,进入 Flink SQL 命令行,编辑建表语句。
3.1 进入 Flink 安装目录(确保路径正确)。
cd /usr/local/flink
3.2 启动 SQL Client(交互式命令行工具)。
./bin/sql-client.sh
启动成功后将展示如下内容:

4. 退出 Flink 集群。
./bin/stop-cluster.sh
5. 进入 Flink Web UI 查看任务执行情况:
http://localhost:8081/#/overview。