首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

使用多线程增加kafka消费能力

前提:本例适合那些没有顺序要求的消息主题。

kafka通过一系列优化,写入和读取速度能够达到数万条/秒。通过增加分区数量,能够通过部署多个消费者增加并行消费能力。但还是有很多情况下,某些业务的执行速度实在是太慢,这个时候我们就要用到多线程去消费,提高应用机器的利用率,而不是一味的给kafka增加压力。

使用Spring创建一个kafka消费者是非常简单的。我们选择的方式是继承kafka的,然后实现它的方法即可。

参考:

https://github.com/apache/kafka/blob/2.1/examples/src/main/java/kafka/examples/Consumer.java

多线程消费某个分区的数据

即然是使用多线程,我们就需要新建一个线程池。

我们创建了一个最大容量为20的线程池,其中有两个参数需要注意一下。(参考

《JAVA多线程使用场景和注意事项简版》

)。

我们使用了了零容量的,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除因为阻塞队列丢消息的可能。

然后使用了饱和策略,使得多线程处理不过来的时候,能够阻塞在kafka的消费线程上。

然后,我们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,我们都手工的commit一次,表明这条消息我已经处理了。由于是线程池认领了这些任务,顺序性是无法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。

不过这暂时不重要,首先让它并行化运行就好。

可惜的是,当我们运行程序,直接抛出了异常,无法进行下去。

程序直接说了:

显然,kafka的消费端不是线程安全的,它拒绝你这么调用它的api。kafka的初衷是好的,想要避免一些并发环境的问题,但我确实需要使用多线程处理。

kafka消费者通过比较调用者的线程id来判断是否是由外部线程发起请求。

得,只能将函数放在线程外面了,先提交ack、再执行任务。

加入管道

我们获取的消息,可能在真正被执行之前,会进行一些过滤,比如一些空值或者特定条件的判断。虽然可以直接放在消费者线程里运行,但显的特别的乱,可以加入一个生产者消费者模型(你可以认为这是画蛇添足)。这里采用的是阻塞队列依然是,它充当了管道的功能。

我们把任务放入管道后,立马commit。如果线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。然后,我们单独启动了一个线程,用来接收这些数据,然后提交到这部分的代码看起来大概这样。

应用能够启动了,消费速度贼快。

参数配置

kafka的参数非常的多,我们比较关心的有以下几个参数。

调用一次poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出的值(默认5分钟),造成消费者的离线。在耗时非常大的消费中,是需要特别注意的。

是否开启自动提交(offset)如果开启,consumer已经消费的offset信息将会间歇性的提交到kafka中(持久保存)

当开启offset自动提交时,提交请求的时间频率由参数控制。

consumer会话超时时长,如果在此时间内,server尚未接收到consumer任何请求(包括心跳检测),那么server将会判定此consumer离线。此值越大,server等待consumer失效、rebalance时间就越长。

在本例中,我们的参数简单的设置如下,主要调整了每次获取的条数和检测时间。其他的都是默认。

消息保证

仔细的同学可能会看到,我们的代码依然不是完全安全的。这是由于我们提前提交了ack导致的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,很可能会丢失,对于一致性要求非常高的应用,我们要从两个手段上进行保证。

使用关闭钩子

第一种就是考虑kill -15的情况。这种方式比较简单,只要覆盖ShutdownableThread的shutdown方法即可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。

使用日志处理

应用oom,或者直接kill -9了,事情就变得麻烦起来。

维护一个单独的日志文件(或者本地db),在commit之前写入一条日志,然后在真正执行完毕之后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,重新执行。

想要效率,还想要可靠,是得下点苦力气的。

借助redis处理

这种方式与日志方式类似,但由于redis的效率很高(可达数万),而且方便,是优于日志方式的。

可以使用Hash结构,提交任务的同时写入Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。

在系统启动时,首先检测一下redis中是否有异常数据。如果有,首先处理这些数据,然后正常消费。

End

多线程是为了增加效率,redis等是为了增加可靠性。业务代码是非常好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增加它的效率、照顾它的边界。

以程序员的角度来说,最有竞争力的代码都是为了照顾小概率发生的边界异常。

kafka在吞吐量和可靠性方面,有各种的权衡,很多都是鱼和熊掌的关系。不必纠结于它本身,我们可以借助外部的工具,获取更大的收益。在这种情况下,redis当机与应用同时当机的概率还是比较小的。5个9的消息保证是可以做到的,剩下的那点不完美问题消息,你为什么不从日志里找呢?

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190322A0I39K00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券