当我们使用RocketMQ时,RocketMQ-Dashboard是一个非常好用的图形化界面工具
我们首先在RocketMQ-Dashboard上创建一个topic,每个topic下4个队列
每个topic是一类消息的集合,topic下面再细分queue是为了提高消息消费的并发度
「当producer发送topic消息时,应该往topic下的哪个queue来发送呢?」
producer会采用轮询的策略发送
「那么consumer应该消费哪个queue下的消息呢?」
当有一个消费者时当然是消费所有的queue
「如果有多个消费者呢?」
只需要根据各种负载均衡策略将队列分配给消费者即可,如下图是两种负载均衡的方式
你问我这两种负载策略怎么实现的?去看看源码呗,详细过程我就不分析了
「如果消费者数量超过队列的数量会发生什么?」
多出来的消费者将不会消费任何队列
「为什么一个consumer只能消费一个queue呢?」
多个消费者消费一个queue肯定会有并发问题,所以得加锁,这样还不如把topic下的队列数量设置的多一点
「我在运行的过程中可以设置topic下queue的数量吗?」
当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景
「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」
是的,大家一般把这个过程叫做重平衡
下面我们来分享一下详细的细节
消息发送主要有3种方式单向发送(只发送,不管结果),同步发送和异步发送
消息消费的模式有两种方式:
这两种方式都有各自的缺点:
「看起来拉取和推送难以抉择」
然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!
「你猜怎么改的?」
其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。
「对了,这种策略就叫做长轮询」
「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的」
「拉取到消息后是怎么处理的呢?」
PullRequest类的成员变量如下图
当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体
「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏把」
msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的
「什么是流控呢?」
就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图
当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等
接着就是收到响应,处理消息,并键PullRequest再次放入阻塞队列.
「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」
嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的」
当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中
「这样就会造成消息的重复消费」