文档中心>日志服务>实践教程>投递和消费>使用 Flink 消费 CLS 日志

使用 Flink 消费 CLS 日志

最近更新时间:2024-11-08 15:44:02

我的收藏

操作场景

本文详细描述了如何使用 Flink 实时消费和分析 CLS 中的 Nginx 日志数据,计算 Web 端的 PV/UV 值,并将结果数据实时写入到自建的 MySQL 数据库。
文中使用的组件/应用及版本如下:
技术组件
版本
Nginx
1.22
CLS 日志服务
-
Java OpenJDK
1.8.0_232
Scala
2.11.12
Flink
1.14.5
MySQL
5.7

操作步骤

步骤1:安装腾讯云 Nginx 网关

1. 购买腾讯云主机 CVM,请参见 创建 CVM 实例
2. 安装 Nginx 1.22版本。

步骤2:采集 Nginx 日志到腾讯云 CLS 日志服务

2. CLS 日志服务采集终端 Loglistener 的安装,Loglistener 类似于开源组件 Beats,用来采集日志数据的 Agent。
3. 日志主题开启索引后,可以正常查询到 Nginx 的日志数据。
4. 最后,在 CLS 控制台 开启 kafka 消费,使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。本文就是使用流计算框架 Flink,实时消费 Nginx 日志数据,将实时计算的结果写入到 MySQL。

步骤3:搭建 MySQL 数据库

1. 创建 MySQL 实例,登录数据库:
mysql -h 172.16.1.1 -uroot -p
2. 新建需要使用的 DB 和表,例子中的 DB 名为 flink_nginx,表名为 mysql_dest。
create database if not exists flink_nginx;
use flink_nginx;
create table if not exists mysql_dest(
ts timestamp,
pv bigint,
uv bigint
);

步骤4:部署 Flink

1. 部署 Flink 时,建议使用如下版本,否则可能会安装不成功。
购买腾讯云主机 CVM,资源配置最小8C16G。
2. 安装 Flink 1.14.5 ,并进入 SQL 界面,从 Apache Flink 官网 下载 Flink 二进制代码包并开始安装。
# 解压缩 Flink 二进制包
tar -xf flink-1.14.5-bin-scala_2.11.tgz
cd flink-1.14.5

# 下载 kafka 相关依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.14.5/flink-connector-kafka_2.11-1.14.5.jar
mv flink-connector-kafka_2.11-1.14.5.jar lib
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar
mv kafka-clients-2.4.1.jar lib

# 下载 MySQL 相关依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.14.5/flink-connector-jdbc_2.11-1.14.5.jar
mv flink-connector-jdbc_2.11-1.14.5.jar lib
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jar
mv mysql-connector-java-8.0.11.jar lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-common/1.14.5/flink-table-common-1.14.5.jar
mv flink-table-common-1.14.5.jar lib

# 启动 Flink
bin/start-cluster.sh
bin/sql-client.sh
3. 当出现以下画面则说明安装成功。



网页端口是8081,可以查看 Flink Dashboard。




步骤5:使用 Flink 消费 CLS 日志数据

1. 在 SQL Client 界面中,执行如下 SQL:
-- 消费CLS日志主题数据 ,建数据源表
CREATE TABLE `nginx_source`
(
`remote_user` STRING, -- 日志中字段,客户端名称
`time_local` STRING, -- 日志中字段,服务器本地时间
`body_bytes_sent` BIGINT, -- 日志中字段,发送给客户端的字节数
`http_x_forwarded_for` STRING, -- 日志中字段,当前端有代理服务器时,记录客户端真实 IP 地址的配置
`remote_addr` STRING, -- 日志中字段,客户端 IP 地址
`protocol` STRING, -- 日志中字段,协议类型
`status` INT, -- 日志中字段,HTTP 请求状态码
`url` STRING, -- 日志中字段,URL 地址
`http_referer` STRING, -- 日志中字段,该网页是从哪个页面链接过来的
`http_user_agent` STRING, -- 日志中字段,客户端浏览器信息
`method` STRING, -- 日志中字段,HTTP 请求方法
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- CLS日志主题分区
`ts` AS PROCTIME()
) WITH (
'connector' = 'kafka',--将CLS日志主题当作Kafka Topic来消费
'topic' = 'YourTopic', -- CLS Kafka协议消费控制台给出的主题名称,例如12345-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX
'properties.bootstrap.servers' = 'kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9096', -- CLS Kafka协议消费控制台给出的服务地址,例子中是广州地域的外网消费地址,请按照您的实际情况填写
'properties.group.id' = 'kafka_flink', -- Kafka 消费组名称
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="your username" password="your password";',--用户名是日志主题所属的日志集合ID,例如ca5cXXXX-dd2e-4ac0-af12-92d4b677d2c6,密码是用户的secretid#secrectkey组合的字符串,比AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);

--- 建立目标表
CREATE TABLE `mysql_dest`
(
`ts` TIMESTAMP,
`pv` BIGINT,
`uv` BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://11.150.2.1:3306/flink_nginx', -- 注意这边的时区设置
'username'= 'username', -- MYSQL账号
'password'= 'password', -- MYSQL密码
'table-name' = 'mysql_dest' -- MYSQL表名
);

--- 查询 CLS日志主题,对Nginx日志中的PV,UV进行聚合计算,结果写入 MYSQL目标表
INSERT INTO mysql_dest (ts,uv,pv)
SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) start_ts, COUNT(DISTINCT remote_addr) uv,count(*) pv
FROM nginx_source
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
2. 在 Flink 的任务监控页,我们可以看到 Flink 任务的监控数据:



3. 进入 MySql 数据库,可看到计算 PV、UV 的结果数据实时写入: