源码很长,这里就不仔细看了,其实主要就是初始化了三个组件,然后启动后台定时任务
均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列
拉取消息组件
消费进度组件
pullRequestQueue
中获取consumer的pull请求
启动流程图如下:
当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup
也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。
那么就会有一个问题,比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。
首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。
然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。
主要流程如下:
注意这里会对Consumer
集合做一个排序,为什么要这样做呢?因为每个 consumer 都是在本地负载均衡,所以要排序,否则多个Consumer之间会有冲突。
对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:
offset
需要consumer自己来维护
Push 本质上也是基于消费者主动拉取实现的,只不过名字叫push,意思是 Broker 会尽可能实时的把消息给消费者处理。
RebalanceServic
根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest
实例放入阻塞队列pullRequestQueue
中。PullMessageService
不断地从阻塞队列pullRequestQueue
中获取PullRequest
请求并通过网络通信模块异步发送Pull消息的RPC请求给Broker端。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。PullMessageService
异步拉取到消息后,通过PullCallback
进行回调处理,如果拉取成功,则更新消费进度,putPullRequest
到阻塞队列pullRequestQueue
中,接着立即进行拉取PullCallback
会把拉取到的消息交给Consumerrequest
进行处理,Consumerrequest
会调用消费者业务方实现的consumeMessage()
接口处理具体业务,消费者业务方处理完成后返回ACK给Consumerrequest
,如果消费者ACK返回的失败,则在集群模式下把消息发回 Broker 进行重试(广播模型重试的成本太高),最后更新消费进度offsetTable
PullMessageProcessor
业务处理器收到Pull消息的RPC请求后,通过MessageStore
实例从commitLog
获取消息。如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService
重新尝试和后台线程ReputMessageService
进行二次处理。在 RocketMq 中消费者主动发起pull请求,broker在处理消息拉取请求时,如果没有查询到消息,将不返回消费者任何信息,而是先hold住并且挂起请求,使其不会立即发起下一次拉取请求,会将请求信息pullRequest添加到pullRequestTable
中,等待触发通知消费者的事件。
(pullRequestTable表示待处理的消息拉取请求集合,它的key是Topic+queueId
,value中包含了消费者信息(与该消费者的长连接channel),以及其想要拉取的消息位置,后面需要根据这些信息来将对应的新消息返回给对应的消费者)。
然后在Broker端,通过后台独立线程PullRequestHoldService
遍历所有挂起的请求pullRequestTable
,如果有消息,则返回响应给消费者。
同时,另外一个ReputMessageService
线程不断地构建ConsumeQueue/IndexFile数据,不断的检测是否有新消息产生,如果有新消息,则从pullRequestTable
通过Topic+queueId
的key获取对应hold住的请求pullRequest
,再根据其中的长链接channel进行通信响应。
通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。
资料引用: www.jianshu.com/p/fac642f3c… blog.csdn.net/wb_snail/ar… segmentfault.com/a/119000002…