介绍
CLS(Cloud Log Service)日志服务,可以作为 Oceanus 的数据源(Source),用户可以将 CLS 日志主题,通过 CLS Kafka协议消费 功能,将日志导出到 Oceanus,参与后续流计算。
版本说明
Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持 |
使用范围
CLS 支持用作数据源表(Source)。
CLS日志主题
建立 CLS Source 表前,需要创建CLS日志主题,具体步骤如下:
1. 打开 CLS 日志主题 页面中选择日志主题 > 创建日志主题,创建日志主题。
2. 日志主题创建成功后,在列表页单击新建日志主题的日志主题名称/ID 进入到详情页面。
3. 在详情页,切换到Kafka协议消费页签,打开Kafka协议消费的功能。
开启后的状态如下:
若有不清楚的地方,可以参考 Kafka 协议消费。Oceanus消费CLS日志
在Oceanus控制台新建作业。
-- 建表语句如下CREATE TABLE `nginx_source` (-- 日志中字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATAFROM'partition' VIRTUAL,-- kafka分区`ts` TIMESTAMP(3) METADATAFROM'timestamp') WITH ('connector' = 'kafka',-- cls kafka协议消费控制台给出的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',-- 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',-- 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true',-- 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6-- 密码是用户的SecretId#SecretKey组合的字符串,比如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');