导读:目前国内公有云上的kafka产品都是基于开源kafka产品二次封装改造的,基本上开源kafka的配置参数都能应用在云上kafka产品里。本文以腾讯云的ckafka产品为例,分别介绍了几个应用场景,每个点都有详细的配置干货。通过这些设置和正确的使用姿势,我们来很好的保证关联业务的稳定性和可靠性。
从生产者的角度来看,向不同的 partition 写入是完全并行的;从消费者的角度来看,并发数完全取决于 partition 的数量(如果 consumer 数量大于 partition 数量,则必有 consumer 闲置)。因此选取合适的分区数量对于发挥 CKafka 实例的性能十分重要。partition 的数量需要根据生产和消费的吞吐来判断。理想情况下,可以通过如下公式来判断分区的数目:
Num = max( T/PT , T/CT ) = T / min( PT ,CT )
其中,Num 代表 partition 数量,T 代表目标吞吐量,PT 代表生产者写入单个 partition 的最大吞吐,CT 代表消费者从单个 partition 消费的最大吞吐。则 partition 数量应该等于 T/PT 和 T/CT 中较大的那一个。
在实际情况中,生产者写入但 partition 的最大吞吐 PT 的影响因素和批处理的规模、压缩算法、确认机制、副本数等有关。消费者从单个 partition 消费的最大吞吐 CT 的影响因素和业务逻辑有关,需要在不同场景下实测得出。
通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。 如果消费者数量是 5,则 partition 的数目也应该是 ≥ 5 的。同时,过多的分区会导致生产吞吐的降低和选举耗时的增加,因此也不建议过多分区。提供如下信息供参考:
· 一个 partition 是可以实现消息的顺序写入的。
· 一个 partition 只能被同一个消费者组的一个消费者进程消费。
· 一个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。
· partition 越多则生产消息失败后, leader 选举的耗时越长。
· offset 的粒度最细是在 partition 级别的,partition 越多,查询 offset 就越耗时。
· partition 的数量是可以动态增加的,只能增加不能减少。但增加会出现消息 rebalance 的情况。
影响生产者写入但 partition 的最大吞吐 PT 的参数:
batch.size=16384
# 生产者会尝试将业务发送到相同的 Partition的消息合包发送到 Broker,batch.size设置合包的大小上限。默认为 16KB。batch.size 设太小会导致吞吐下降,设太大会导致内存使用过多。
acks=1
# Kafka producer 的 ack 有 3 种机制,分别说明如下:
# -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,可以配合 Topic 级别参数 min.insync.replicas 使用。
# 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)
# 1:生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)
# 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
timeout.ms=30000
# timeout.ms控制生产请求在 Broker 等待副本同步满足 acks 设置的条件所等待的最大时间
buffer.memory=33554432
# buffer.memory配置生产者用来缓存消息等待发送到 Broker 的内存。用户要根据生产者所在进程的内存总大小调节
max.block.ms=60000
# max.block.ms是当生产消息的速度比 Sender 线程发送到 Broker 速度快,导致buffer.memory 配置的内存用完时会阻塞生产者 send 操作,该参数设置最大的阻塞时间
linger.ms=1000
# linger.ms是设置消息延迟发送的时间,这样可以等待更多的消息组成 batch 发送。默认为0表示立即发送。当待发送的消息达到batch.size 设置的大小时,不管是否达到 linger.ms设置的时间,请求也会立即发送
max.request.size=1048576
# max.request.size是生产者能够发送的请求包大小上限,默认为1MB。在修改该值时注意不能超过 Broker 配置的包大小上限16MB
compression.type=[none, snappy, lz4]
# compression.type是压缩格式配置,目前 0.9(包含)以下版本不允许使用压缩,0.10(包含)以上不允许使用 GZip 压缩
request.timeout.ms=30000
# request.timeout.ms是客户端发送给 Broker 的请求的超时时间,不能小于 Broker 配置的 replica.lag.time.max.ms,目前该值为10000ms
max.in.flight.requests.per.connection=5
# max.in.flight.requests.per.connectio是客户端在每个连接上最多可发送的最大的未确认请求数,该参数大于1且 retries 大于0时可能导致数据乱序。 希望消息严格有序时,建议客户将该值设置1
retries=3
# retries是请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
retry.backoff.ms=100
# retry.backoff.ms是发送请求失败时到下一次重试请求之间的时间
在我们的Consumer group在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了,消费停止,不及时处理,会导致消息堆积。
根据 Consumer Group 的状态机可知,当 Consumer Group 为 Empty、AwaitSync 或 Stable 状态时,Group 可能会进行 Rebalance。
以下情况可能会发生 Rebalance:
以0.10版本Kafka 的机制为例,Rebalance 过程分析如下:
上述过程会在每次 Rebalance 发生时执行一次。
第一类:因为未能及时发送心跳,导致 Consumer 被踢出Group 而引发的。
因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。设置 session.timeout.ms = 6s。设置 heartbeat.interval.ms = 2s。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些已经挂掉的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
第二类: Consumer 消费时间过长导致的。
像Consumer 消费数据时需要将消息处理之后写入到 MongoDB,这是一个很慢的消费逻辑。如果MongoDB 出现不稳定都会导致 Consumer 程序消费时长的增加。此时max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。
第三类:共用消费组而影响其它topic的消费
所有topic共用几个消费组,经常由于rebalance影响其它的topic消费,可以不同topic使用不同的消费组进行隔离避免相互影响。
总之,你要为你的业务处理逻辑留下充足的时间。这样Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 了。如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。
如何对接使用ckafka,以及ckafka提供的用户管理和ACL权限管理,满足多种环境的访问方式和安全权限的需求。
Ckafka提供了以下接入方式:
1、内网接入方式:购买ckafka实例,即提供内网访问的:ip和port
2、VPC网络接入方式:不是一个VPC下资源访问ckafka,可开通路由接入方式,来创建路由类型为VPC网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
3、支撑网络 接入方式:腾讯云的支撑环境下的资源访问ckafka,可开通路由接入方式,来创建路由类型为支撑网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
4、基础网络接入方式:腾讯云的基础网络下的资源访问ckafka,可开通路由接入方式,来创建路由类型为基础网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
5、公网网络接入方式:要通过外网来访问访问ckafka,可开通路由接入方式,来创建路由类型为公网域名接入、接入方式是SASL_PLAINTEXT 的路由:域名和port
注意说明:PLAINTEXT的ip和port为可直接使用的访问方式,SASL_PLAINTEXT则是Ckafak提供的安全认证机制,需要创建用户名和密码,进行用户认证,才能访问Ckafka。
ACL 访问控制列表(Access Control List):
帮助用户定义一组权限规则,允许/拒绝用户 user 通过 IP 读/写 Topic 资源 resource。
公网网络接入方式的使用说明:
生产者和消费的配置文件需要添加以下配置:
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="instanceId#admin" password="admin";
注意说明:接入方式只影响接入时的验证方式,设置的 ACL 权限则是全局的。
如果您在开通公网访问路由的同时还使用了 PLAINTEXT 方式接入 Kafka,那么之前为 Topic 设置的 ACL 仍然会生效;如果希望 PLAINTEXT 方式的访问不受影响,请为 PLAINTEXT 需要访问的 Topic 添加全部用户的可读写的权限。
生产者将数据发送到消息队列 CKafka 时,数据可能因为网络抖动而丢失,此时消息队列 CKafka 未收到该数据。可能情况:
解决方法
为了尽可能减少生产端消息丢失,您可以通过 buffer.memory 和 batch.size(以字节为单位)调优缓冲区的大小。缓冲区并非越大越好,如果由于某种原因生产者 done 掉了,那么缓冲区存在的数据越多,需要回收的垃圾越多,恢复就会越慢。应该时刻注意生产者的生产消息数情况、平均消息大小等(消息队列 CKafka 监控中有丰富的监控指标)。
即使按照上述配置 ACK,也不能保证数据不丢,例如,当 ISR 中只有 leader 时(ISR 中的成员由于某些情况会增加也会减少,最少时只剩一个 leader),此时会变成 acks = 1的情况。所以需要同时在配合 min.insync.replicas 参数(此参数可以在消息队列 CKafka 控制台 Topic 配置开启高级配置中进行配置),min.insync.replicas 表示在 ISR 中最小副本的个数,默认值是1,当且仅当 acks = -1或者 all 时生效。
建议配置的参数值
此参数值仅供参考,实际数值需要依业务实际情况而定。
message.send.max.retries=3;
retry.backoff.ms=10000;
request.required.acks=-1;
min.insync.replicas=2;
request.required.acks=0;
request.required.acks=1;
解决方法
RecordTooLargeException:消息太大。
生产者的参数max.request.size:这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。如果要修改,这个参数与之对应的broker端的message.max.bytes参数也要修改,message.max.bytes的默认值也是1MB,broker端对应ckafka的topic维度,在控制台的调整是max.message.bytes:
如果将broker端的max.message.bytes参数配置为2MB,而max.request.size参数配置为3MB,那么当我们发送的一条大小为2.5MB的消息时,生产者客户端就会报出如下的异常:org.apache.kafka.common.errors.RecordTooLargeException:The request included a message larger than the max message size the server will accept.
凡事预则立,不预则废。Ckafka作为解耦生产者与消费者的中间件,提供高吞吐性能、高可扩展性的消息队列服务。在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。但也是基于正确的使用姿势上,为了避免一些不当的使用姿势,本人分享的一线经验总结,希望对你有所帮助,如有疑问欢迎在评论进行讨论。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。