操作背景
您可以将日志主题的数据投递到腾讯云 CKafka,然后用于您的实时流计算场景。如果您没有购买腾讯云 CKafka 实例,可以考虑使用日志服务(Cloud Log Service,CLS)自带的 Kafka 协议消费功能。
前提条件
已开通腾讯云消息队列 (CKafka)。
注意:
如果您的 CKafka 开启了 ACL,请注意 CKafka 的版本限制:CKafka1.X版本,需在1.1.1及以上;CKafka2.X版本,需在2.4.2及以上,其他版本开启 ACL 均会导致投递失败,可通过关闭 ACL 解决投递失败的问题。
当前仅支持实时日志投递,不支持历史日志的投递。
不支持投递至 Ckafka 弹性 Topic。
确保当前操作账号拥有开通投递到 CKafka 的权限。请参见 CLS 访问策略模板。
操作步骤
您需对该Topic的配置做如下修改:
CleanUp.policy:选择 delete。该参数需设置为 delete,否则会投递失败。
max.message.bytes:设置为 ≥ 8MB。该参数 default 值是1MB,当投递 CKafka 的单条 message(一组日志数据)的大小超过1MB时,无法写入 CKafka Topic,导致投递失败。
2. 前往 日志服务控制台,如下图,有两种方式,进入投递任务管理页面。
在左侧导航栏中,单击投递任务,选择地域、日志集和日志主题。
在左侧导航栏中,单击日志主题,选择需要配置投递到 CKafka 任务的日志主题,进入日志主题管理页面。单击投递到 CKafka 页签。
3. 单击右侧的编辑,开启投递到 CKafka 开关,选择相应的 CKafka 实例以及对应的 Topic,开始投递。您可以选择以原始内容或者 JSON 投递日志。
3.1 以原始内容投递。
原始内容投递配置项说明如下:
配置项 | 解释说明 | 规则 | 是否必填 |
目标 Ckafka Topic 归属 | 投递 CLS 日志至当前主账号的 CKafka Topic。 投递 CLS 日志至其他主账号的 CKafka,例如 A 账号在 CLS 将日志投递至 B 账号的 Ckafka Topic,需要 B 账号在 CAM(访问管理)侧配置访问 角色,配置完成之后,由 A 账号将角色 ARN 和外部 ID 填写到 CLS 控制台,方可进行跨账号投递。配置角色的步骤如下: 1. 新建角色。账号 B 登录 CAM 角色管理页面。 1.1 创建访问策略,策略名称例如:cross_shipper,策略语法参考如下: 说明: 示例中的授权按照最小权限的原则,resource 配置为仅可以投递至广州地域的 CKafka:ckafka-12abcde3,请您按照实际情况进行授权。
1.2 新建角色,选择腾讯云账户角色载体,云账号类型选择其他主账号,然后输入 A 账号的 ID,例如100012345678,勾选开启校验并配置外部 ID,例如:Hello123。 1.3 配置角色策略,配置角色的访问策略,选择第一步配置好的访问策略cross_shipper(示例)。 1.4 保存该角色,例如:uinA_writeCLS_to_CKafka。 2. 为角色配置载体。在 CAM 角色列表中找到 uinA_writeCLS_to_CKafka(示例),单击该角色,选中下方的角色载体 > 管理载体 > 添加产品服务 > 选中日志服务,然后单击更新。 可以看到当前角色的载体是两个:一个是 A 账号,另一个是 cls.cloud.tencent.com(CLS 日志服务)。 3. A 账号登录 CLS,填入角色 ARN 和外部 ID。 该两项信息需 B 账号来提供: B 账号在 CAM 角色列表中找到角色 uinA_writeCLS_to_CKafka(示例),单击可查看该角色的 RoleArn,例如 qcs::cam::uin/100001112345:roleName/uinA_writeCLS_to_CKafka。 在角色载体中可看到外部 ID,例如 Hello123。 注意: 填写角色 ARN,外部 ID 时,注意不要输入多余的空格,会导致权限校验失败。 跨账号投递会在 A 账号下,产生日志主题的读流量费。 | 当前主账号/其他主账号 | 否 |
CKafka 实例 | 与当前日志主题同地域的 CKafka Topic 作为投递目标。 跨账号投递场景下,需用户手动填写 Ckafka 实例 ID 和 Topic 名称。 | 列表选择 | 必填 |
投递数据格式 | 选项【原始内容】,投递用户的原始日志。 | 列表选择 | 必填 |
数据压缩格式 | 支持 Snappy\\LZ4。 | 列表选择 | 必填 |
投递日志预览 | 预览您投递的日志数据。 | - | - |
3.2 以 JSON 投递。
JSON 投递配置项说明如下:
配置项 | 解释说明 | 规则 | 是否必填 |
CKafka 实例 | 与当前日志主题同地域的 CKafka Topic 作为投递目标 | 列表选择 | 必填 |
投递数据格式 | 选项【JSON】,以 JSON 的数据格式投递日志 | 列表选择 | 必填 |
JSON 格式的转义/不转义 | 转义,将 JSON 第一层节点的值转为 String,如果您的第一层节点的值是 Struct,在下游入库或者计算时,需要提前将该Struct 转为 String,可以选这个选项。示例: 日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} 投递到 CKafka :{"a":"aa","b":"{\\"b1\\":\\"b1b1\\", \\"c1\\":\\"c1c1\\"}"} 不转义,不对您的 JSON 结构和层级做修改,日志格式和采集侧保持一致。示例: 日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} 投递到 CKafka:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} 注意: 当 JSON 的第一层节点中包含有数值时,投递后会自动转为 int、float。 日志原文:{"a":123, "b":"123", "c":"-123", "d":"123.45", "e":{"e1":123,"f1":"123"}} 投递到 CKafka:{"a":123,"b":123,"c":-123,"d":123.45,"e":{"e1":123,"f1":"123"}} | 列表选择 | 必填 |
__TAG__元信息 | 将__TAG__元信息平铺或者不平铺,请按照您的实际业务场景选择。 示例:__TAG__元信息:{"__TAG__":{"fieldA":200, "fieldB":"text"}} 平铺:{"__TAG__.fieldA":200,"__TAG__.fieldB":"text"} 不平铺:{"__TAG__":{"fieldA":200, "fieldB":"text"}} | | |
数据压缩格式 | 支持 Snappy\\ LZ4。 | 列表选择 | 必填 |
投递日志预览 | 预览您投递的日志数据。 | - | - |
4. 单击确定,启动投递到 CKafka。
注意:
常见问题
提示没有读写 CKafka Topic 的权限,怎么办?
如果您直接使用 API 接口投递数据到 CKafka,可能会存在读写 CKafka Topic 的权限问题。因为,如果您在控制台使用该功能,系统会引导您完成相关授权;如果您直接调用 API 投递,则需要手动授权。具体的排查和解决方案请参见 投递任务角色授权。