创建投递任务

最近更新时间:2024-11-12 18:00:22

我的收藏

操作背景

您可以将日志主题的数据投递到腾讯云 CKafka,然后用于您的实时流计算场景。如果您没有购买腾讯云 CKafka 实例,可以考虑使用日志服务(Cloud Log Service,CLS)自带的 Kafka 协议消费功能

前提条件

已开通腾讯云消息队列 (CKafka)。
注意:
如果您的 CKafka 开启了 ACL,请注意 CKafka 的版本限制:CKafka1.X版本,需在1.1.1及以上;CKafka2.X版本,需在2.4.2及以上,其他版本开启 ACL 均会导致投递失败,可通过关闭 ACL 解决投递失败的问题。
当前仅支持实时日志投递,不支持历史日志的投递。
不支持投递至 Ckafka 弹性 Topic。
确保当前操作账号拥有开通投递到 CKafka 的权限。请参见 CLS 访问策略模板

操作步骤

1. 准备和配置 CKafka Topic。在日志主题同地域下,创建一个 CKafka 实例。详情请参见 创建实例。创建一个 Topic。详情请参见 创建 Topic
您需对该Topic的配置做如下修改:
CleanUp.policy:选择 delete。该参数需设置为 delete,否则会投递失败。
max.message.bytes:设置为 ≥ 8MB。该参数 default 值是1MB,当投递 CKafka 的单条 message(一组日志数据)的大小超过1MB时,无法写入 CKafka Topic,导致投递失败。



2. 前往 日志服务控制台,如下图,有两种方式,进入投递任务管理页面。
在左侧导航栏中,单击投递任务,选择地域、日志集和日志主题。



在左侧导航栏中,单击日志主题,选择需要配置投递到 CKafka 任务的日志主题,进入日志主题管理页面。单击投递到 CKafka 页签。



3. 单击右侧的编辑,开启投递到 CKafka 开关,选择相应的 CKafka 实例以及对应的 Topic,开始投递。您可以选择以原始内容或者 JSON 投递日志。
3.1 以原始内容投递。
原始内容投递配置项说明如下:
配置项
解释说明
规则
是否必填
目标 Ckafka Topic 归属
当前主账号
其他主账号
投递 CLS 日志至当前主账号的 CKafka Topic。
投递 CLS 日志至其他主账号的 CKafka,例如 A 账号在 CLS 将日志投递至 B 账号的 Ckafka Topic,需要 B 账号在 CAM(访问管理)侧配置访问 角色,配置完成之后,由 A 账号将角色 ARN 和外部 ID 填写到 CLS 控制台,方可进行跨账号投递。配置角色的步骤如下:
1. 新建角色。账号 B 登录 CAM 角色管理页面
1.1 创建访问策略,策略名称例如:cross_shipper,策略语法参考如下:
说明:
示例中的授权按照最小权限的原则,resource 配置为仅可以投递至广州地域的 CKafka:ckafka-12abcde3,请您按照实际情况进行授权。
{
"statement": [
{
"action": [
"cam:GetRole"
],
"effect": "allow",
"resource": [
"*"
]
},
{
"action": [
"ckafka:ListRoute",
"ckafka:AddRoute",
"ckafka:DescribeInstanceAttributes",
"ckafka:AuthorizeToken",
"ckafka:CreateToken",
"ckafka:DescribeTopicAttributes"
],
"effect": "allow",
"resource": [
"qcs::ckafka:ap-guangzhou:uin/100001234567:ckafkaId/ckafka-12abcde3"
]//resource 配置为仅可以投递至广州地域的 CKafka:ckafka-12abcde3,请您按照实际情况进行授权
}
],
"version": "2.0"
}
1.2 新建角色,选择腾讯云账户角色载体,云账号类型选择其他主账号,然后输入 A 账号的 ID,例如100012345678,勾选开启校验并配置外部 ID,例如:Hello123
1.3 配置角色策略,配置角色的访问策略,选择第一步配置好的访问策略cross_shipper(示例)。
1.4 保存该角色,例如:uinA_writeCLS_to_CKafka。
2. 为角色配置载体。在 CAM 角色列表中找到 uinA_writeCLS_to_CKafka(示例),单击该角色,选中下方的角色载体 > 管理载体 > 添加产品服务 > 选中日志服务,然后单击更新
可以看到当前角色的载体是两个:一个是 A 账号,另一个是 cls.cloud.tencent.com(CLS 日志服务)。
3. A 账号登录 CLS,填入角色 ARN 和外部 ID
该两项信息需 B 账号来提供
B 账号在 CAM 角色列表中找到角色 uinA_writeCLS_to_CKafka(示例),单击可查看该角色的 RoleArn,例如 qcs::cam::uin/100001112345:roleName/uinA_writeCLS_to_CKafka。
在角色载体中可看到外部 ID,例如 Hello123。
注意:
填写角色 ARN,外部 ID 时,注意不要输入多余的空格,会导致权限校验失败。
跨账号投递会在 A 账号下,产生日志主题的读流量费。
当前主账号/其他主账号
CKafka 实例
与当前日志主题同地域的 CKafka Topic 作为投递目标。
跨账号投递场景下,需用户手动填写 Ckafka 实例 ID 和 Topic 名称。
列表选择
必填
投递数据格式
选项【原始内容】,投递用户的原始日志。
列表选择
必填
数据压缩格式
支持 Snappy\\LZ4。
列表选择
必填
投递日志预览
预览您投递的日志数据。
-
-
3.2 以 JSON 投递。
JSON 投递配置项说明如下:
配置项
解释说明
规则
是否必填
CKafka 实例
与当前日志主题同地域的 CKafka Topic 作为投递目标
列表选择
必填
投递数据格式
选项【JSON】,以 JSON 的数据格式投递日志
列表选择
必填
JSON 格式的转义/不转义
转义,将 JSON 第一层节点的值转为 String,如果您的第一层节点的值是 Struct,在下游入库或者计算时,需要提前将该Struct 转为 String,可以选这个选项。示例:
日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}}
投递到 CKafka :{"a":"aa","b":"{\\"b1\\":\\"b1b1\\", \\"c1\\":\\"c1c1\\"}"}
不转义,不对您的 JSON 结构和层级做修改,日志格式和采集侧保持一致。示例:
日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}}
投递到 CKafka:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}}
注意:
当 JSON 的第一层节点中包含有数值时,投递后会自动转为 intfloat
日志原文:{"a":123, "b":"123", "c":"-123", "d":"123.45", "e":{"e1":123,"f1":"123"}}
投递到 CKafka:{"a":123,"b":123,"c":-123,"d":123.45,"e":{"e1":123,"f1":"123"}}
列表选择
必填
__TAG__元信息
将__TAG__元信息平铺或者不平铺,请按照您的实际业务场景选择。
示例:__TAG__元信息:{"__TAG__":{"fieldA":200, "fieldB":"text"}}
平铺:{"__TAG__.fieldA":200,"__TAG__.fieldB":"text"}
不平铺:{"__TAG__":{"fieldA":200, "fieldB":"text"}}


数据压缩格式
支持 Snappy\\ LZ4。
列表选择
必填
投递日志预览
预览您投递的日志数据。
-
-
4. 单击确定,启动投递到 CKafka。
注意:
如需在投递至 CKafka 前对日志进行清洗加工过滤,请参见使用 数据加工 操作。

常见问题

提示没有读写 CKafka Topic 的权限,怎么办?

如果您直接使用 API 接口投递数据到 CKafka,可能会存在读写 CKafka Topic 的权限问题。因为,如果您在控制台使用该功能,系统会引导您完成相关授权;如果您直接调用 API 投递,则需要手动授权。具体的排查和解决方案请参见 投递任务角色授权