背景
TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和最佳实践,以及常见问题。
调优实践
常见问题
生产者问题
1. 使用 CKafka 生产消息时,出现错误
Message contents does not match its CRC
。err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
插件默认启用了 gzip 压缩,在 target 上加上参数
compression=none
关闭压缩即可。target: kafka://ip1:port1?compression=none
2. 生产时同一用户需要有序,如何配置?
客户端增加参数
partitioner
,可选 random(默认),roundrobin,hash(按 key 分区)。target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. 如何异步生产?
客户端增加参数
async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. 如何使用异步生产写数据回调?
需要在代码中重写异步生产写数据的成功/失败的回调函数,例如:
import ("git.code.oa.com/vicenteli/trpc-database/kafka")func init() {// 重写默认的异步生产写数据错误回调kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer occurred error.}// 重写默认的异步生产写数据成功回调kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {// do something if async producer succeed.}}
消费者问题
1. 如果消费时 Handle 返回了非 nil 会发生什么?
会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
2. 使用 ckafka 消费消息时,出现错误
client has run out of available brokers to talk to
。kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=0.10.2.0
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
4. 消费消息时,提示
The provider group protocol type is incompatible with the other members
。kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
同一消费者组的客户端重分组策略不一样,可修改参数
strategy
,可选:sticky(默认),range,roundrobin。address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. 如何注入自定义配置(远端配置)?
如果需要代码中指定配置,先在
trpc_go.yaml
中配置 fake_address
,然后配合 kafka.RegisterAddrConfig
方法注入,trpc_go.yaml
配置如下:address: fake_address
在服务启动前,注入自定义配置:
func main() {s := trpc.NewServer()// 使用自定义 addr,需在启动 server 前注入cfg := kafka.GetDefaultConfig()cfg.Brokers = []string{"127.0.0.1:9092"}cfg.Topics = []string{"test_topic"}kafka.RegisterAddrConfig("fake_address", cfg)kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})s.Serve()}
6. 如何获取底层 sarama 的上下文信息?
通过kafka
.
GetRawSaramaContext
可以获取底层 saramaConsumerGroupSession
和ConsumerGroupClaim
。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。// RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim// 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为type RawSaramaContext struct {Session sarama.ConsumerGroupSessionClaim sarama.ConsumerGroupClaim}
使用实例:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())}// ...return nil}