本方案结合腾讯云 Ckafka、流计算 Oceanus、腾讯云数据库 Elasticsearch、腾讯云可观测平台等,通过 Filebeat 实时监控系统日志和应用日志,将监控数据传输到腾讯云 Ckafka,再将 Kafka 中数据接入流计算 Oceanus,经过简单的业务逻辑处理输出到云数据库 Elasticsearch,利用云 Prometheus 监控系统指标,利用云 Grafana 实现对 Oceanus 作业的个性化业务数据监控。
方案架构
前置准备
在使用前,请确保已购买并创建相应的大数据组件。
创建私有网络 VPC
注意:
在构建 Ckafka、Oceanus、Elasticsearch 集群等服务时选择的网络必须保持一致,网络才能互通。
创建 Ckafka 实例
进入 消息队列 CKafka 控制台,在实例列表中单击新建,创建 Ckafka 实例详情请参见 创建实例。购买完成后,再创建 Kafka topic(topic-app-info),详细操作请参见 创建 Topic。
说明:
私有网络和子网选择上一步创建的网络和子网。
Kafka 建议选择最新的版本,和 Filebeat 采集工具兼容性较好。
创建 Oceanus 集群
流计算 Oceanus 服务兼容原生的 Flink 任务。在 流计算 Oceanus 控制台 中集群管理 > 新建集群创建集群,选择地域、可用区、VPC、日志、存储、设置初始密码等。VPC 及子网选择刚创建好的网络,具体创建步骤可参考 创建独享集群。创建完后 Flink 的集群如下:
创建 Elasticsearch 实例
创建腾讯云可观测平台服务
为了展示自定义系统指标,需购买 Promethus 服务。若只需要自定业务指标,可以省略此步骤。
创建独立 Grafana 资源
安装配置 Filebeat
Filebeat 是一款轻量级日志数据采集的工具,通过监控指定位置的文件收集信息。在该 VPC 下给需要监控主机信息和应用信息的 CVM 上安装 Filebeat。
方式一:下载 Filebeat 并安装 Filebeat 下载地址。
方式二:采用 Elasticsearch 管理页面 > beats 管理中提供的 Filebeat。本示例中采用了方式一。
下载到 CVM 中并配置 Filebeat,在 filebeat.yml 文件中提添加如下配置项:
# 监控日志文件配置- type: logenabled: truepaths:- /tmp/test.log#- c:\\programdata\\elasicsearch\\logs\\*
# 监控数据输出项配置output.kafka:version: 2.0.0 # kafka版本号hosts: ["xx.xx.xx.xx:xxxx"] # 请填写实际的IP地址+端口topic: 'topic-app-info' # 请填写实际的topic
注意
示例选用2.4.1的 Ckafka 版本,这里配置 version: 2.0.0。版本对应不上可能出现
ERROR [kafka] kafka/client.go:341 Kafka (topic=topic-app-info): dropping invalid message
错误。方案实现
接下来通过案例介绍如何通过流计算 Oceanus 实现个性化监控。
Filebeat 数据传输
1. 进入到 Filebeat 根目录下,并启动 Filebeat 进行数据采集。示例中采集了 top 命令中显示的 CPU、内存等信息,也可以采集 jar 应用的日志、JVM 使用情况、监听端口等,详情可参考 Filebeat 官网。
# filebeat启动./filebeat -e -c filebeat.yml# 监控系统信息写入test.log文件top -d 10 >>/tmp/test.log
2. 进入 Ckafka 页面,单击消息查询,查询对应 topic 消息,验证是否采集到数据。
filebeat 采集到的数据格式:{"@timestamp": "2021-08-30T10:22:52.888Z","@metadata": {"beat": "filebeat","type": "_doc","version": "7.14.0"},"input": {"type": "log"},"host": {"ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],"mac": ["xx:xx:xx:xx:xx:xx"],"hostname": "xx.xx.xx.xx","architecture": "x86_64","os": {"type": "linux","platform": "centos","version": "7(Core)","family": "redhat","name": "CentOSLinux","kernel": "3.10.0-1062.9.1.el7.x86_64","codename": "Core"},"id": "0ea734564f9a4e2881b866b82d679dfc","name": "xx.xx.xx.xx","containerized": false},"agent": {"name": "xx.xx.xx.xx","type": "filebeat","version": "7.14.0","hostname": "xx.xx.xx.xx","ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9","id": "6b23463c-0654-4f8b-83a9-84ec75721311"},"ecs": {"version": "1.10.0"},"log": {"offset": 2449931,"file": {"path": "/tmp/test.log"}},"message": "(B[m16root0-20000S0.00.00:00.00kworker/1:0H(B[m[39;49m[K"}
SQL 作业编写
在流计算 Oceanus 中,对 Kafka 接入的数据进行加工处理,并存入 Elasticsearch 中。
1. 定义 source
按照 Filebeat 中 json 消息的格式,构造 Flink Source。
CREATE TABLE DataInput (`@timestamp` VARCHAR,`host` ROW<id VARCHAR,ip ARRAY<VARCHAR>>,`log` ROW<`offset` INTEGER,file ROW<path VARCHAR>>,`message` VARCHAR) WITH ('connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector'topic' = 'topic-app-info', -- 替换为您要消费的 Topic'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID-- 定义数据格式 (JSON 格式)'format' = 'json','json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null);
2. 定义 sink
CREATE TABLE es_output (`id` VARCHAR,`ip` ARRAY<VARCHAR>,`path` VARCHAR,`num` INTEGER,`message` VARCHAR,`createTime` VARCHAR) WITH ('connector.type' = 'elasticsearch', -- 输出到 Elasticsearch'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'.'connector.hosts' = 'http://10.0.0.175:9200', -- Elasticsearch 的连接地址'connector.index' = 'oceanus_test2', -- Elasticsearch 的 Index 名'connector.document-type' = '_doc', -- Elasticsearch 的 Document 类型'connector.username' = 'elastic','connector.password' = 'yourpassword','update-mode' = 'upsert', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式'connector.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)'connector.key-null-literal' = 'n/a', -- 主键为 null 时的替代字符串,默认是 'null''connector.failure-handler' = 'retry-rejected', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)'connector.flush-on-checkpoint' = 'true', -- 可选参数, 快照时不允许批量写入(flush), 默认为 true'connector.bulk-flush.max-actions' = '42', -- 可选参数, 每批次最多的条数'connector.bulk-flush.max-size' = '42 mb', -- 可选参数, 每批次的累计最大大小 (只支持 mb)'connector.bulk-flush.interval' = '60000', -- 可选参数, 批量写入的间隔 (ms)'connector.connection-max-retry-timeout' = '1000', -- 每次请求的最大超时时间 (ms)--'connector.connection-path-prefix' = '/v1' -- 可选字段, 每次请求时附加的路径前缀'format.type' = 'json' -- 输出数据格式, 目前只支持 'json');
3. 业务逻辑
INSERT INTO es_outputSELECThost.id as `id`,host.ip as `ip`,log.file.path as `path`,log.`offset` as `num`,message,`@timestamp` as `createTime`from DataInput;
4. ES 数据查询
在 ES 控制台的 Kibana 页面查询数据,或者进入某台相同子网的 CVM 下,使用以下命令进行查询:
# 查询索引 username:password请替换为实际账号密码curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'{"query": { "match_all": {}},"size": 10}'
系统指标监控
本章节主要实现系统信息监控,对 Flink 作业运行状况进行监控告警。
Prometheus 是一个非常灵活的时序数据库,通常用于监控数据的存储、计算和告警。流计算 Oceanus 建议用户使用腾讯云可观测平台提供的 Prometheus 服务,以免去部署、运维开销;同时它还支持腾讯云的通知模板,可以通过短信、电话、邮件、企业微信机器人等方式,将告警信息轻松触达不同的接收方。
监控配置(Oceanus 作业监控)
除了 Oceanus 控制台自带的监控信息,还可以配置目前已经支持了任务级细粒度监控、作业级监控和集群 Flink 作业列表监控。
1. Oceanus 作业详情页面,单击作业参数,在高级参数处添加如下配置:
pipeline.max-parallelism: 2048metrics.reporters: promgatewaymetrics.reporter.promgateway.host: xx.xx.xx.xx # Prometheus实例地址metrics.reporter.promgateway.port: 9090 # Prometheus实例端口metrics.reporter.promgateway.needBasicAuth: truemetrics.reporter.promgateway.password: xxxxxxxxxxx # Prometheus实例密码metrics.reporter.promgateway.interval: 10 SECONDS
2. 在任一 Oceanus 作业中,单击云监控进入云 Prometheus 实例,点击链接进入 Grafana ,导入 json 文件,详情请参见 接入 Prometheus 自定义监控。
3. 展现出来的 flink 任务监控效果如下,用户也可以单击 Edit 设置不同 Panel 来优化展现效果。
告警配置
1. 进入腾讯腾讯云可观测平台界面,单击 Prometheus 监控,点击已购买的实例进入服务管理页面,然后选择告警策略 > 新建,配置相关信息。具体操作参考 接入 Prometheus 自定义监控。
2. 设置告警通知。选择模板或新建,设置通知模板。
3. 短信通知消息。
业务指标监控
通过 Filebeat 采集到应用业务数据,经过 Oceanus 服务的加工处理已经被存入 ES,可以通过 ES + Grafana 来实现业务数据的监控。
1. Grafana 配置 ES 数据源。进入 Grafana 控制台,进入刚创建的 Grafana 服务,找到外网地址并打开。Grafana 账号为 admin,登录后选择 Configuration > Add Source,搜索
elasticsearch
,填写相关 ES 实例信息,添加数据源。
2. 选择左侧 Dashboards > Manage,单击右上角 New Dashboard,新建面板。
展现效果如下:总数据量写入实时监控
:对写入数据源的总数据量进行监控。 数据来源实时监控
:对来源于某个特定 log 的数据写入量进行监控。字段平均值监控
:对某个字段的平均值进行监控。num字段最大值监控
:对 num 字段的最大值进行监控。
说明
此处仅作为示例,无实际业务。
总结
本方案中对系统监控指标和业务监控指标2种方式都进行尝试。若只需要对业务指标进行监控,可省略 Promethus 相关操作。此外,需要注意的是:
1. Ckafka 的版本和开源版本 Kafka 并没有严格对应,方案中 Ckafka2.4.1 和开源 Filebeat-1.14.1 版本能够调试成功。
2. 腾讯云可观测平台中的 Promethus 服务已经嵌入了 Grafana 监控服务。但不支持自定义数据源,该嵌入的 Grafana 只能接入 Promethus,需使用独立的 Grafana 才能完成 ES 数据接入 Grafana。