简介
Apache Kafka 连接器用于连接 Kafka 集群和腾讯云数据连接器集群,可消费 Kafka 集群的消息并作为集成流的 trigger,也可将集成流的消息生产发布到 Kafka 集群中。
Apache Kafka 是一个分布式发布-订阅消息传递系统,kafka 有以下一些基本概念:
Producer:消息生产者,即向 kafka broker 发布消息的客户端。
Consumer:消息消费者,负责消费 Kafka 服务器上的消息。
Topic:主题,用于建立 Producer 和 Consumer 之间的订阅关系。
Partition:消息分区,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列,partition 中的每条消息都会被分配一个有序的 ID(offset)。
Broker:一台 kafka 服务器即为一个 broker,一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic。
Consumer Group:消费者分组,用于归组同类消费者,每个 consumer 属于一个特定的 consumer group,多个消费者可以共同消息一个 Topic 下的消息,每个消费者消费其中的部分消息,这些消费者就组成一个分组,拥有同一个分组名称。
Offset:消息在 partition 中的偏移量,每一条消息在 partition 都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。
Producer、Consumer、Broker 关系如下图:
Broker 保存消息数据,和 topic、partition 关系示意图如下:
同一 topic 的各 partition 逻辑关系如下图,kafka 仅保证同一 partition 下的消息有序。
连接器配置
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
集群地址 | string | Apache Kafka 集群地址,配置的格式:ip:port,ip:port | 是 | - |
集群 Kafka 版本 | enum | 选择 Kafka 集群的版本号 | 否 | 1.1 |
SASL 安全认证模式 | enum | 选择连接到 Kafka 集群时的安全认证模式 | 否 | PlainText |
SASL 用户名 | string | SASL/Plain 和 SASL/SCRAM 安全认证模式下的用户名 | 否 | - |
SASL 密码 | string | SASL/Plain 和 SASL/SCRAM 安全认证模式下的密码 | 否 | - |
SASL/SCRAM 加密类型 | enum | SASL/SCRAM 安全认证模式下的加密类型 | 否 | - |
使能 TLS 安全传输协议 | bool | 是否使用 TLS 加密和 Kafka 集群间的连接 | 否 | false |
TLS 客户端证书 | file | 可选,使用提供的证书对连接进行加密,仅当使能 TLS 安全传输协议设置为 True 才可配置 | 否 | - |
TLS 客户端Key | file | 可选,使用提供的证书对连接进行加密,需和客户端证书同时提供,仅当使能 TLS 安全传输协议设置为 True 才可配置 | 否 | - |
TLS 服务端证书 | file | 可选,使用提供的证书对连接进行加密,仅当使能TLS安全传输协议设置为 True 才可配置 | 否 | - |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
消费者组 | string | 消费时的所属的消费者组名称 | 是 | - |
主题 | string | 订阅的主题列表,支持正则表达式 | 是 | - |
消费者数量 | int | 消费者组内的消费者数量,必须小于主题的 partition 数量 | 否 | 1 |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
消息确认模式 | enum | 消费消息后,在 Kafka 上确认 Offset 的模式,有两种: 消费后直到触发的流成功结束后才确认 Offset 消费后立即确认 Offset | 否 | 流运行成功后确认 |
隔离级别 | enum | Kafka 的事务隔离级别,可选 Read Uncommitted 和 Read committed | 否 | Read Uncommitted |
重试阻塞时间 | int | 重试阻塞时间(毫秒),避免在失败场景下以紧密循环的方式重复发送请求 | 否 | 2000 |
请求超时时间 | int | 请求超时时间(毫秒) | 否 | 30000 |
会话超时时间 | int | 会话超时时间(毫秒) | 否 | 10000 |
心跳时间 | int | 心跳时间(毫秒) | 否 | 3000 |
Rebalance 检测频率 | int | Rebalance 检测频率(毫秒),Kafka 消费者组重平衡检测的间隔时间 | 否 | 60000 |
Rebalance 策略 | enum | Rebalance 策略,按区间划分(Range)和轮询(RoundRobin)两种策略 | 否 | Range |
Rebalance 最大重试次数 | int | Rebalance 最大重试次数 | 否 | 4 |
Rebalance 重试阻塞时间 | int | Rebalance 重试阻塞时间(毫秒) | 否 | 2000 |
最小拉取数据量 | int | 最小拉取数据量(字节) | 否 | 1 |
默认拉取数据量 | int | 默认拉取数据量(字节) ,默认为1048756(1024*1024) | 否 | 1048576 |
最大拉取数据量 | int | 最大拉取数据量(兆),设置为0时无限制 | 否 | 0 |
拉取阻塞时间 | int | 拉取阻塞时间(毫秒) | 否 | 250 |
提交频率 | int | 提交频率(毫秒) | 否 | 1000 |
初始偏移量 | enum | 初始偏移量,取值为 Offset Newest 和 Offset Latest | 否 | Offset Newest |
偏移量保留的时间 | int | 偏移量保留的时间(秒),0表示不过期 | 否 | 0 |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
最大消息限制 | int | 最大消息限制(字节),单条消息允许发送的最大数据量 | 否 | 1000000 |
确认模式 | enum | 确认模式,可选值为:None、LeaderOnly、All。其中 None 为不需确认,LeaderOnly 仅需 Leader 确认,All 需要 Leader 及所有 follower 都确认 | 否 | LeaderOnly |
请求超时时间 | int | 请求超时时间 | 否 | 10000 |
幂等性使能开关 | bool | 幂等性使能开关,设置为 true 时,打开 kafka 幂等性功能,实现 Exactly-Once 语义 | 否 | false |
消息刷新到磁盘的数据量 | int | 消息刷新到磁盘的数据量(字节),设置为0时不设置该参数 | 否 | 0 |
消息刷新到磁盘的消息数量 | int | 消息刷新到磁盘的消息数量,设置为0时不设置该参数 | 否 | 0 |
消息刷新到磁盘的间隔时间 | int | 消息刷新到磁盘的间隔时间(毫秒),设置为0时不设置该参数 | 否 | 0 |
消息刷新的最大消息量 | int | 消息刷新的最大消息量,设置为0时不设置该参数 | 否 | 0 |
操作配置
Apache Kafka 连接器包含 Consumer 和 Publish 两种操作。
Consumer 操作
输入参数
None,Consumer 操作没有输入参数。
输出
Consumer 操作执行成功后,输出结果会保存在 message 消息体的 payload;执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性 | 值 |
payload | 执行成功后,payload 为 byte 数组 |
error | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息 |
attribute | 执行成功后,attribute 为 dict,属性列表见下表 |
variable | 继承上个组件的 variable 信息 |
atrrbites 属性列表:
atrrbites 属性 | 数据类型 | 描述 |
topic | string | 消息所属的 topic |
partition | int32 | 消息的 partition |
offset | int64 | 消息的 offset |
例如:执行成功后,message payload 值为 byte 数组,message attribute 值如下:
{"topic": "test-topic","partition": 1,"offset": 4}
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
案例
1. 按照 Kafka 集群信息,填写 kafka 连接器配置。
2. 新建流,将 kafka consumer 作为 trigger 节点。
3. 添加其他业务逻辑,处理消费到的数据,例如:增加 amqp publish,将消费到的消息发布到 rabbitMQ。
4. 若配置正确,流发布后,可以看到 kafka 中的流被正确消费,且被发布到 rabbitMQ 中。
Publish 操作
输入参数
通用配置
参数 | 数据类型 | 说明 | 是否必填 | 默认值 |
要写入的 Kafka 主题名称 | string | 写入的 kafka 主题 | 是 | - |
要写入的 Kafka 消息内容 | string或byte | 待发布的 Kafka 消息内容 | 是 | - |
高级配置
参数 | 数据类型 | 说明 | 是否必填 | 默认值 |
目标 partition | string | 指定写入的 partition | 否 | - |
key | string | key 相同的 message 会写入到同一 partition | 否 | - |
header | string | header的 JSON 字符串 | 否 | - |
输出
Publish 操作执行成功后,输出属性会保存在 message 消息体的 attribute;执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性 | 值 |
payload | 继承上个组件的 payload 信息 |
error | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息 |
attribute | 执行成功后,attribute 为 dict,属性列表见下 |
variable | 继承上个组件的 variable 信息 |
atrrbites 属性列表:
atrrbites 属性 | 数据类型 | 描述 |
topic | string | 消息所属的 topic |
partition | int32 | 消息的 partition |
offset | int64 | 消息的 offset |
例如:执行成功后,message attribute 值如下:
{"topic": "test-topic","partition": 1,"offset": 4}
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
案例
1. 按照 Kafka 集群信息,填写 kafka 连接器配置。
2. 新建流,添加 trigger,例如:HTTP listener。
3. 在流中增加 kafka publish 操作,填写消息内容,将消息发布到 Kafka 集群中。
4. 发布流,触发流后,若连接参数及消息参数配置正确,消息将成功投递到所配置的 Kafka 集群的对应队列中。