前传:分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
本文主要基于 RocketMQ 4.0.x 正式版
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
MQ 提供了两类消费者:
Push
开头,实际在实现时,使用 Pull
方式实现。通过 Pull
不断不断不断轮询 Broker
获取消息。当不存在新消息时, Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker
主动 Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling
)。本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
RebalanceService
:均衡消息队列服务,负责分配当前 Consumer
可消费的消息队列( MessageQueue
)。当有新的 Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从 Broker
拉取消息,并提交消费任务到 ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
: Consumer
消费进度管理,负责从 Broker
获取消费进度,同步消费进度到 Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对 Namesrv
, Broker
的 API调用,提供给 Producer
、 Consumer
使用。1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
Topic
。Consumer
信息到 Broker
。1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
Topic
和 订阅表达式 创建订阅数据1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
Consumer
可消费的消息队列( MessageQueue
)。
MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:
详细解析见:MQClientInstance#doRebalance(...)。
第25行
等待超时,每 20s 调用一次。PushConsumer
启动时,调用 rebalanceService#wakeup(...)
触发。Broker
通知 Consumer
加入 或 移除时, Consumer
响应通知,调用 rebalanceService#wakeup(...)
触发。1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
Client
包含的 consumerTable
( Consumer
集合 ),执行消息队列分配。consumerTable
只包含 Consumer
自己。?有大大对这个疑问有解答的,烦请解答下。MQConsumerInner#doRebalance(...)
进行队列分配。 DefaultMQPushConsumerImpl
、 DefaultMQPullConsumerImpl
分别对该接口方法进行了实现。 DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }
RebalanceImpl#doRebalance(...)
进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。1: /**
2: * 执行分配消息队列
3: *
4: * @param isOrder 是否顺序消息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每个 topic 的消息队列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未订阅的topic对应的消息队列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未订阅的消息队列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }
#doRebalance(...)
说明 :执行分配消息队列。subscriptionInner
),分配每一个 Topic
的消息队列。Topic
的消息队列。#truncateMessageQueueNotMyTopic(...)
说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合( subscriptionInner
),对应消息队列移除在该方法。1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 获取 topic 对应的 队列 和 consumer信息
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根据 队列分配策略 分配消息队列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新消息队列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /**
81: * 当负载均衡时,更新 消息处理队列
82: * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
83: * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
84: *
85: * @param topic Topic
86: * @param mqSet 负载均衡结果后的消息队列数组
87: * @param isOrder 是否顺序
88: * @return 是否变更
89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待读:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的队列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 发起消息拉取请求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }
#rebalanceByTopic(...)
说明 :分配 Topic
的消息队列。Topic
对应的消息队列和消费者们,并对其进行排序。因为各 Consumer
是在本地分配消息队列,排序后才能保证各 Consumer
顺序一致。AllocateMessageQueueStrategy
) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。Topic
对应的消息队列。BROADCASTING
) 下,分配 Topic
对应的所有消息队列。CLUSTERING
) 下,分配 Topic
对应的部分消息队列。#updateProcessQueueTableInRebalance(...)
说明 :当分配队列时,更新 Topic
对应的消息队列,并返回是否有变更。顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。当前时间-最后一次拉取消息时间>120s
( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。mqSet
) 的 消息处理队列( processQueueTable
)。mqSet
) 新增的消息队列。1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步队列的消费进度,并移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 顺序消费
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...)1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }
PushConsumer
不断不断不断拉取消息的起点。1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }
PullMessageService
异步执行,非阻塞。详细解析见:PullMessageService。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
[0,mod)
: mqAll.size()/cidAll.size()+1
。前面 mod
个 Consumer
平分余数,多获得 1 个消息队列。[mod,cidAll.size())
: mqAll.size()/cidAll.size()
。index
:当前 Consumer
在消费集群里是第几个。这里就是为什么需要对传入的 cidAll
参数必须进行排序的原因。如果不排序, Consumer
在本地计算出来的 index
无法一致,影响计算结果。mod
:余数,即多少消息队列无法平均分配。averageSize
:代码可以简化成 (mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())
。startIndex
: Consumer
分配消息队列开始位置。range
:分配队列数量。之所以要 Math#min(...)
的原因:当 mqAll.size()<=cidAll.size()
时,最后几个 Consumer
分配不到消息队列。固定消息队列长度为4。
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 | |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
对应的消息队列。Broker
对应的消息队列。AllocateMessageQueueAveragely
略有不同,其是将多余的结尾部分分配给前 rem
个 Consumer
。Consumer
和 Broker
分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
PushConsumer
读取消费进度有三种选项:CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。[PullConsumer]
RebalancePullImpl#computePullFromWhere(...)暂时跳过。?
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
拉取消息,并提交消费任务到 ConsumeMessageService
。#executePullRequestLater(...)
:第 26 至 40 行 : 提交延迟拉取消息请求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交立即拉取消息请求。#executeTaskLater(...)
:第 55 至 63 行 :提交延迟任务。#pullMessage(...)
:第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。#run(...)
:第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue
),进行消息拉取。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
#pullMessage(...)
说明 :拉取消息。Broker
处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。Consumer
未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。Consumer
处于暂停中,不进行消息拉取,提交延迟拉取消息请求。Consumer
为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。Topic
对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。Consumer
本地的订阅信息( SubscriptionData
),而不使用 Broker
里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。PullCallback
:拉取消息回调:FOUND
) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到 ConsumeMessageService
。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率( pullInterval
),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。NO_NEW_MSG
) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见: #correctTagsOffset(...)
。 * 第 148 行 :提交立即拉取消息请求。NO_MATCHED_MSG
)。逻辑同 NO_NEW_MSG
。OFFSET_ILLEGAL
)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为 dropped
。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到 Broker
。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???#correctTagsOffset(...)
:更正消费进度。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
信息( Broker
地址、是否为从节点)。Broker
信息不存在,则抛出异常。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
编号。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
信息( Broker
地址、是否为从节点)。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
编号的映射。tagCode
匹配合适消息。Broker
编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker
编号,会使用更新后的 Broker
编号。tagCode
匹配合适消息。tagCode
匹配消息。Hook
。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
如果用最简单粗暴的方式描述 PullConsumer
拉取消息的过程,那就是如下的代码:
while (true) {
if (不满足拉取消息) {
Thread.sleep(间隔);
continue;
}
主动拉取消息();
}
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。如果消息数超过批量消费上限,会不会是BUG。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Topic
为原始 Topic
。例如:原始 Topic
为 TopicTest
,重试时 Topic
为 %RETRY%please_rename_unique_group_name_4
,经过该方法, Topic
设置回 TopicTest
。Hook
。Hook
。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
ackIndex
值。 consumeRequest.msgs[0-ackIndex]
为消费成功,需要进行 ack
确认。CONSUME_SUCCESS
: ackIndex=context.getAckIndex()
。RECONSUME_LATER
: ackIndex=-1
。Broker
。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。Broker
失败的消息,直接提交延迟重新消费。Broker
成功,结果因为例如网络异常,导致 Consumer
以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。BROADCASTING
:广播模式,无论是否消费失败,不发回消息到 Broker
,只打印日志。CLUSTERING
:集群模式,消费失败的消息发回到 Broker
。Broker
成功】的消息,并更新最新消费进度。Broker
成功】的消息?见第 56 行。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Consumer
发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。Consumer
内置默认 Producer
发送消息。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
RemoteBrokerOffsetStore
: Consumer
集群模式 下,使用远程 Broker
消费进度。LocalFileOffsetStore
: Consumer
广播模式下,使用本地 文件
消费进度。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Offset
存储序列化。Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1471,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1470
}
}
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
获取。读取消费进度类型:
READ_FROM_MEMORY
:从内存读取。READ_FROM_STORE
:从存储( Broker
或 文件
)读取。MEMORY_FIRST_THEN_STORE
:优先从内存读取,读取不到,从存储读取。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
文件
读取消费进度。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
读取消费进度。该方法 RemoteBrokerOffsetStore
与 LocalFileOffsetStore
实现相同。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
Broker
,并移除非指定消息队列。// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
?可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。 感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。?真的有丢丢长。
本文转载自「芋道源码」 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有