Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

作者头像
程序猿DD
发布于 2018-02-01 11:28:13
发布于 2018-02-01 11:28:13
2.5K00
代码可运行
举报
文章被收录于专栏:程序猿DD程序猿DD
运行总次数:0
代码可运行

前传:分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述
  • 2、Consumer
  • 3、PushConsumer 一览
  • 4、PushConsumer 订阅
  • 5、PushConsumer 消息队列分配
  • 6、PushConsumer 消费进度读取
  • 7、PushConsumer 拉取消息
  • 8、PushConsumer 消费消息
  • 9、PushConsumer 发回消费失败消息
  • 10、Consumer 消费进度
  • 11、结尾

1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:
    • 在大多数场景下使用。
    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时, Broker挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
  • PullConsumer

本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费 本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费 本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断不断不断Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStoreConsumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 NamesrvBrokerAPI调用,提供给 ProducerConsumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 创建订阅数据
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 通过心跳同步Consumer信息到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }
  • 说明 :订阅 Topic
  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
  2:     String subString) throws Exception {
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 处理订阅表达式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\\|\\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25: 
 26:     return subscriptionData;
 27: }
  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }
  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

RebalanceService

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public class RebalanceService extends ServiceThread {
  2: 
  3:     /**
  4:      * 等待间隔,单位:毫秒
  5:      */
  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval", "20000"));
  9: 
 10:     private final Logger log = ClientLogger.getLog();
 11:     /**
 12:      * MQClient对象
 13:      */
 14:     private final MQClientInstance mqClientFactory;
 15: 
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19: 
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23: 
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28: 
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31: 
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }
  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。
  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发: 详细解析见:MQClientInstance#doRebalance(...)。
    • 第25行 等待超时,每 20s 调用一次。
    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。
    • Broker 通知 Consumer 加入 或 移除时, Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

MQClientInstance#doRebalance(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }
  • 说明 :遍历当前 Client 包含的 consumerTable( Consumer集合 ),执行消息队列分配。
  • 疑问:目前代码调试下来, consumerTable 只包含 Consumer 自己。?有大大对这个疑问有解答的,烦请解答下。
  • 第 6 行 :调用 MQConsumerInner#doRebalance(...) 进行队列分配。 DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分别对该接口方法进行了实现。 DefaultMQPushConsumerImpl#doRebalance(...)详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。

DefaultMQPushConsumerImpl#doRebalance(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void doRebalance() {
  2:     if (!this.pause) {
  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  4:     }
  5: }
  • 说明:执行消息队列分配。
  • 第 3 行 :调用 RebalanceImpl#doRebalance(...) 进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。

RebalanceImpl#doRebalance(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: /**
  2:  * 执行分配消息队列
  3:  *
  4:  * @param isOrder 是否顺序消息
  5:  */
  6: public void doRebalance(final boolean isOrder) {
  7:     // 分配每个 topic 的消息队列
  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  9:     if (subTable != null) {
 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 11:             final String topic = entry.getKey();
 12:             try {
 13:                 this.rebalanceByTopic(topic, isOrder);
 14:             } catch (Throwable e) {
 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 16:                     log.warn("rebalanceByTopic Exception", e);
 17:                 }
 18:             }
 19:         }
 20:     }
 21:     // 移除未订阅的topic对应的消息队列
 22:     this.truncateMessageQueueNotMyTopic();
 23: }
 24: 
 25: /**
 26:  * 移除未订阅的消息队列
 27:  */
 28: private void truncateMessageQueueNotMyTopic() {
 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
 31:         if (!subTable.containsKey(mq.getTopic())) {
 32: 
 33:             ProcessQueue pq = this.processQueueTable.remove(mq);
 34:             if (pq != null) {
 35:                 pq.setDropped(true);
 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
 37:             }
 38:         }
 39:     }
 40: }
  • #doRebalance(...) 说明 :执行分配消息队列。
    • 第 7 至 20 行 :循环订阅主题集合( subscriptionInner ),分配每一个 Topic 的消息队列。
    • 第 22 行 :移除未订阅的 Topic 的消息队列。
  • #truncateMessageQueueNotMyTopic(...) 说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。

RebalanceImpl#rebalanceByTopic(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2:     switch (messageModel) {
  3:         case BROADCASTING: {
  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  5:             if (mqSet != null) {
  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  7:                 if (changed) {
  8:                     this.messageQueueChanged(topic, mqSet, mqSet);
  9:                     log.info("messageQueueChanged {} {} {} {}", //
 10:                         consumerGroup, //
 11:                         topic, //
 12:                         mqSet, //
 13:                         mqSet);
 14:                 }
 15:             } else {
 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 17:             }
 18:             break;
 19:         }
 20:         case CLUSTERING: {
 21:             // 获取 topic 对应的 队列 和 consumer信息
 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 24:             if (null == mqSet) {
 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 27:                 }
 28:             }
 29: 
 30:             if (null == cidAll) {
 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
 32:             }
 33: 
 34:             if (mqSet != null && cidAll != null) {
 35:                 // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
 36:                 List<MessageQueue> mqAll = new ArrayList<>();
 37:                 mqAll.addAll(mqSet);
 38: 
 39:                 Collections.sort(mqAll);
 40:                 Collections.sort(cidAll);
 41: 
 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 43: 
 44:                 // 根据 队列分配策略 分配消息队列
 45:                 List<MessageQueue> allocateResult;
 46:                 try {
 47:                     allocateResult = strategy.allocate(//
 48:                         this.consumerGroup, //
 49:                         this.mQClientFactory.getClientId(), //
 50:                         mqAll, //
 51:                         cidAll);
 52:                 } catch (Throwable e) {
 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
 54:                         e);
 55:                     return;
 56:                 }
 57: 
 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();
 59:                 if (allocateResult != null) {
 60:                     allocateResultSet.addAll(allocateResult);
 61:                 }
 62: 
 63:                 // 更新消息队列
 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
 65:                 if (changed) {
 66:                     log.info(
 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
 69:                         allocateResultSet.size(), allocateResultSet);
 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
 71:                 }
 72:             }
 73:             break;
 74:         }
 75:         default:
 76:             break;
 77:     }
 78: }
 79: 
 80: /**
 81:  * 当负载均衡时,更新 消息处理队列
 82:  * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 83:  * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
 84:  *
 85:  * @param topic Topic
 86:  * @param mqSet 负载均衡结果后的消息队列数组
 87:  * @param isOrder 是否顺序
 88:  * @return 是否变更
 89:  */
 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
 91:     boolean changed = false;
 92: 
 93:     // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 95:     while (it.hasNext()) { // TODO 待读:
 96:         Entry<MessageQueue, ProcessQueue> next = it.next();
 97:         MessageQueue mq = next.getKey();
 98:         ProcessQueue pq = next.getValue();
 99: 
100:         if (mq.getTopic().equals(topic)) {
101:             if (!mqSet.contains(mq)) { // 不包含的队列
102:                 pq.setDropped(true);
103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104:                     it.remove();
105:                     changed = true;
106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107:                 }
108:             } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理
109:                 switch (this.consumeType()) {
110:                     case CONSUME_ACTIVELY:
111:                         break;
112:                     case CONSUME_PASSIVELY:
113:                         pq.setDropped(true);
114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115:                             it.remove();
116:                             changed = true;
117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118:                                 consumerGroup, mq);
119:                         }
120:                         break;
121:                     default:
122:                         break;
123:                 }
124:             }
125:         }
126:     }
127: 
128:     // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息请求数组
130:     for (MessageQueue mq : mqSet) {
131:         if (!this.processQueueTable.containsKey(mq)) {
132:             if (isOrder && !this.lock(mq)) {
133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134:                 continue;
135:             }
136: 
137:             this.removeDirtyOffset(mq);
138:             ProcessQueue pq = new ProcessQueue();
139:             long nextOffset = this.computePullFromWhere(mq);
140:             if (nextOffset >= 0) {
141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142:                 if (pre != null) {
143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144:                 } else {
145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146:                     PullRequest pullRequest = new PullRequest();
147:                     pullRequest.setConsumerGroup(consumerGroup);
148:                     pullRequest.setNextOffset(nextOffset);
149:                     pullRequest.setMessageQueue(mq);
150:                     pullRequest.setProcessQueue(pq);
151:                     pullRequestList.add(pullRequest);
152:                     changed = true;
153:                 }
154:             } else {
155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156:             }
157:         }
158:     }
159: 
160:     // 发起消息拉取请求
161:     this.dispatchPullRequest(pullRequestList);
162: 
163:     return changed;
164: }
  • #rebalanceByTopic(...) 说明 :分配 Topic 的消息队列。
    • 第 21 至 40 行 :获取 Topic 对应的消息队列和消费者们,并对其进行排序。因为各 Consumer 是在本地分配消息队列,排序后才能保证各 Consumer 顺序一致。
    • 第 42 至 61 行 :根据 队列分配策略( AllocateMessageQueueStrategy ) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。
    • 第 63 至 72 行 :更新 Topic 对应的消息队列。
    • 第 3 至 19 行 :广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列。
    • 第 20 至 74 行 :集群模式( CLUSTERING ) 下,分配 Topic 对应的部分消息队列。
  • #updateProcessQueueTableInRebalance(...) 说明 :当分配队列时,更新 Topic 对应的消息队列,并返回是否有变更。
    • 第 132 至 135 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 137 行 :移除消息队列的消费进度。
    • 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
    • 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求
    • 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
    • 第 108 至 120 行 :队列拉取超时,即 当前时间-最后一次拉取消息时间>120s ( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。
    • 第 93 至 126 行 :移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )。
    • 第 128 至 158 行 :增加 分配的消息队列( mqSet ) 新增的消息队列。
    • 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。

RebalanceImpl#removeUnnecessaryMessageQueue(...)

RebalancePushImpl#removeUnnecessaryMessageQueue(...)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     // 同步队列的消费进度,并移除之。
  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
  5:     // TODO 顺序消费
  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
  8:         try {
  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 10:                 try {
 11:                     return this.unlockDelay(mq, pq);
 12:                 } finally {
 13:                     pq.getLockConsume().unlock();
 14:                 }
 15:             } else {
 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 17:                     mq, //
 18:                     pq.getTryUnlockTimes());
 19: 
 20:                 pq.incTryUnlockTimes();
 21:             }
 22:         } catch (Exception e) {
 23:             log.error("removeUnnecessaryMessageQueue Exception", e);
 24:         }
 25: 
 26:         return false;
 27:     }
 28:     return true;
 29: }
  • 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
  • 第 2 至 4 行 :同步队列的消费进度,并移除之。
  • 第 5 至 27 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
  4:     return true;
  5: }
  • 说明 :移除不需要的消息队列相关的信息,并返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(...)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
  2:     for (PullRequest pullRequest : pullRequestList) {
  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
  5:     }
  6: }
  • 说明 :发起消息拉取请求。该调用是 PushConsumer不断不断不断拉取消息的起点
DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1: public void executePullRequestImmediately(final PullRequest pullRequest) {
  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
  3: }
  • 说明 :提交拉取请求。提交后, PullMessageService 异步执行非阻塞。详细解析见:PullMessageService。

AllocateMessageQueueStrategy

AllocateMessageQueueAveragely

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配队列策略。
  • 第 7 至 25 行 :参数校验。
  • 第 26 至 36 行 :平均分配消息队列。
    • [0,mod)mqAll.size()/cidAll.size()+1。前面 modConsumer 平分余数,多获得 1 个消息队列。
    • [mod,cidAll.size())mqAll.size()/cidAll.size()
    • 第 27 行 : index :当前 Consumer 在消费集群里是第几个。这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因。如果不排序, Consumer 在本地计算出来的 index 无法一致,影响计算结果。
    • 第 28 行 : mod :余数,即多少消息队列无法平均分配。
    • 第 29 至 31 行 : averageSize :代码可以简化成 (mod>0&&index<mod?mqAll.size()/cidAll.size()+1:mqAll.size()/cidAll.size())
    • 第 32 行 : startIndexConsumer 分配消息队列开始位置。
    • 第 33 行 : range :分配队列数量。之所以要 Math#min(...) 的原因:当 mqAll.size()<=cidAll.size() 时,最后几个 Consumer 分配不到消息队列。
    • 第 34 至 36 行 :生成分配消息队列结果。
  • 举个例子:

固定消息队列长度为4

Consumer * 2 可以整除

Consumer * 3 不可整除

Consumer * 5 无法都分配

消息队列[0]

Consumer[0]

Consumer[0]

Consumer[0]

消息队列[1]

Consumer[0]

Consumer[0]

Consumer[1]

消息队列[2]

Consumer[1]

Consumer[1]

Consumer[2]

消息队列[3]

Consumer[1]

Consumer[2]

Consumer[3]

AllocateMessageQueueByMachineRoom

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :平均分配可消费的 Broker 对应的消息队列。
  • 第 7 至 15 行 :参数校验。
  • 第 16 至 23 行 :计算可消费的 Broker 对应的消息队列。
  • 第 25 至 34 行 :平均分配消息队列。该平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是将多余的结尾部分分配给前 remConsumer
  • 疑问:使用该分配策略时, ConsumerBroker 分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。
AllocateMessageQueueAveragelyByCircle

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :环状分配消息队列。
AllocateMessageQueueByConfig

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :分配配置的消息队列。
  • 疑问 :该分配策略的使用场景。

5、PushConsumer 消费进度读取

RebalancePushImpl#computePullFromWhere(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列开始消费位置。
  • PushConsumer 读取消费进度有三种选项:
    • CONSUME_FROM_LAST_OFFSET :第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_FIRST_OFFSET :第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费
    • CONSUME_FROM_TIMESTAMP :第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费

[PullConsumer] RebalancePullImpl#computePullFromWhere(...)

暂时跳过。?

6、PushConsumer 拉取消息

PullMessageService

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService
  • #executePullRequestLater(...) :第 26 至 40 行 : 提交延迟拉取消息请求。
  • #executePullRequestImmediately(...) :第 42 至 53 行 :提交立即拉取消息请求。
  • #executeTaskLater(...) :第 55 至 63 行 :提交延迟任务
  • #pullMessage(...) :第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。
  • #run(...) :第 84 至 101 行 :循环拉取消息请求队列( pullRequestQueue ),进行消息拉取。

DefaultMQPushConsumerImpl#pullMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • #pullMessage(...) 说明 :拉取消息。
    • 执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
    • 当发起请求产生异常时,提交延迟拉取消息请求。对应 Broker 处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。
    • 第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
    • 第 9 行 :设置消息处理队列最后拉取消息时间。
    • 第 11 至 18 行 : Consumer 未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。
    • 第 20 至 25 行 : Consumer 处于暂停中,不进行消息拉取,提交延迟拉取消息请求。
    • 第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
    • 第 39 至 49 行 : Consumer并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。
    • 第 50 至 70 行 : 顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 72 至 78 行 : Topic 对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。
    • 第 222 至 224 行 :判断请求是否使用 Consumer 本地的订阅信息( SubscriptionData ),而不使用 Broker 里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。
    • 第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
    • 第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
    • 第 237 至 255 行 :
  • PullCallback :拉取消息回调:
  • 第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
  • 第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息( FOUND ) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到 ConsumeMessageService。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率( pullInterval ),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。
    • 第 140 至 149 行 :没有新消息( NO_NEW_MSG ) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见: #correctTagsOffset(...)。 * 第 148 行 :提交立即拉取消息请求。
    • 第 150 至 159 行 :有新消息但是不匹配( NO_MATCHED_MSG )。逻辑同 NO_NEW_MSG
    • 第 160 至 189 行 :拉取请求的消息队列位置不合法 ( OFFSET_ILLEGAL)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为 dropped。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到 Broker。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???
  • 第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
  • #correctTagsOffset(...) :更正消费进度。
    • 第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。

PullAPIWrapper#pullKernelImpl(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明?。
  • 第 34 至 43 行 :获取 Broker 信息( Broker 地址、是否为从节点)。
    • #recalculatePullFromWhichNode(...)
    • #MQClientInstance#findBrokerAddressInSubscribe(...)
  • 第 45 至 78 行 :请求拉取消息
  • 第 81 行 :当 Broker 信息不存在,则抛出异常。
PullAPIWrapper#recalculatePullFromWhichNode(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :计算消息队列拉取消息对应的 Broker 编号。
MQClientInstance#findBrokerAddressInSubscribe(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :获取 Broker 信息( Broker 地址、是否为从节点)。

PullAPIWrapper#processPullResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理拉取结果。
    • 更新消息队列拉取消息 Broker 编号的映射。
    • 解析消息,并根据订阅信息消息 tagCode匹配合适消息。
  • 第 16 行 :更新消息队列拉取消息 Broker 编号的映射。下次拉取消息时,如果未设置默认拉取的 Broker 编号,会使用更新后的 Broker 编号。
  • 第 18 至 55 行 :解析消息,并根据订阅信息消息 tagCode 匹配合适消息。
    • 第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
    • 第 24 至 35 行 :根据订阅信息 tagCode 匹配消息。
    • 第 37 至 43 行 : Hook
    • 第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
    • 第 54 行 :设置消息队列。
  • 第 58 行 :清空消息二进制数组。

ProcessQueue#putMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

总结

如果用最简单粗暴的方式描述 PullConsumer 拉取消息的过程,那就是如下的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
while (true) {
    if (不满足拉取消息) {
        Thread.sleep(间隔);
        continue;
    }
    主动拉取消息();
}

6、PushConsumer 消费消息

ConsumeMessageConcurrentlyService 提交消费请求

ConsumeMessageConcurrentlyService#submitConsumeRequest(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交立即消费请求。
  • 第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
  • 第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
    • 第 25 至 33 行 :计算当前拆分请求包含的消息。
    • 第 35 至 38 行 :提交拆分消费请求。
    • 第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :提交延迟消费请求。
  • 第 34 行 :直接调用 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。如果消息数超过批量消费上限,会不会是BUG

ConsumeRequest

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :消费请求。提交请求执行消费。
  • 第 24 至 28 行 :废弃处理队列不进行消费。
  • 第 34 至 44 行 :Hook。
  • 第 51 行 :当消息为重试消息,设置 Topic为原始 Topic。例如:原始 TopicTopicTest,重试时 Topic%RETRY%please_rename_unique_group_name_4,经过该方法, Topic 设置回 TopicTest
  • 第 53 至 58 行 :设置开始消费时间。
  • 第 61 行 :进行消费
  • 第 71 至 85 行 :解析消费返回结果类型
  • 第 87 至 90 行 : Hook
  • 第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
  • 第 101 至 106 行 : Hook
  • 第 108 至 110 行 :统计。
  • 第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。

ConsumeMessageConcurrentlyService#processConsumeResult(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :处理消费结果。
  • 第 8 至 10 行 :消费请求消息未空时,直接返回。
  • 第 12 至 32 行 :计算 ackIndex 值。 consumeRequest.msgs[0-ackIndex]为消费成功,需要进行 ack 确认。
    • 第 14 至 23 行 : CONSUME_SUCCESSackIndex=context.getAckIndex()
    • 第 24 至 29 行 : RECONSUME_LATERackIndex=-1
  • 第34 至 63 行 :处理消费失败的消息。
    • 第 43 至 52 行 :发回消费失败的消息到 Broker。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。
    • 第 54 至 59 行 :发回 Broker 失败的消息,直接提交延迟重新消费。
    • 如果发回 Broker 成功,结果因为例如网络异常,导致 Consumer以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。
    • 第 36 至 41 行 : BROADCASTING :广播模式,无论是否消费失败,不发回消息到 Broker,只打印日志。
    • 第 42 至 60 行 : CLUSTERING :集群模式,消费失败的消息发回到 Broker
  • 第 65 至 69 行 :移除【消费成功】【消费失败但发回 Broker成功】的消息,并更新最新消费进度。
    • 为什么会有【消费失败但发回 Broker成功】的消息?见第 56 行
    • ProcessQueue#removeMessage(...)

ProcessQueue#removeMessage(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

ConsumeMessageConcurrentlyService#cleanExpireMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时清理过期消息,默认周期:15min。

ProcessQueue#cleanExpiredMsg(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :移除过期消息。
  • 第 2 至 5 行 :顺序消费时,直接返回。
  • 第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
  • 第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
  • 第 29 行 :发回超时消息到 Broker
  • 第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。

7、PushConsumer 发回消费失败消息

DefaultMQPushConsumerImpl#sendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :发回消息。
  • 第 4 至 8 行 : Consumer 发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。
  • 第 10 至 25 行 :发生异常时, Consumer 内置默认 Producer 发送消息。
    • ?疑问:什么样的情况下会发生异常呢?

MQClientAPIImpl#consumerSendMessageBack(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

8、Consumer 消费进度

OffsetStore

  • RemoteBrokerOffsetStoreConsumer 集群模式 下,使用远程 Broker 消费进度。
  • LocalFileOffsetStoreConsumer 广播模式下,使用本地 文件 消费进度。

OffsetStore#load(...)

LocalFileOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :从本地文件加载消费进度到内存。
OffsetSerializeWrapper

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :本地 Offset 存储序列化。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
    "offsetTable":{{
            "brokerName":"broker-a",
            "queueId":3,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":2,
            "topic":"TopicTest"
        }:1471,{
            "brokerName":"broker-a",
            "queueId":1,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":0,
            "topic":"TopicTest"
        }:1470
    }
}
RemoteBrokerOffsetStore#load(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :不进行加载,实际读取消费进度时,从 Broker 获取。

OffsetStore#readOffset(...)

读取消费进度类型:

  • READ_FROM_MEMORY :从内存读取。
  • READ_FROM_STORE :从存储( Broker文件 )读取。
  • MEMORY_FIRST_THEN_STORE :优先从内存读取,读取不到,从存储读取。
LocalFileOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 文件 读取消费进度。
RemoteBrokerOffsetStore#readOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 第 16 行 :从 Broker 读取消费进度。

OffsetStore#updateOffset(...)

该方法 RemoteBrokerOffsetStoreLocalFileOffsetStore 实现相同。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

OffsetStore#persistAll(...)

LocalFileOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化消费进度。将消费进度写入文件
RemoteBrokerOffsetStore#persistAll(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :持久化指定消息队列数组的消费进度到 Broker,并移除非指定消息队列。
MQClientInstance#persistAllConsumerOffset(...)

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

  • 说明 :定时进行持久化,默认周期:5000ms。
  • 重要说明 :
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
    • 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。

9、结尾

?可能是本系列最长的一篇文章,如有表达错误和不清晰,请多多见谅。 感谢对本系列的阅读、收藏、点赞、分享,特别是翻到结尾。?真的有丢丢长。

本文转载自「芋道源码」 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿DD 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RocketMQ 源码分析 —— Message 拉取与消费(下)
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
芋道源码
2020/05/07
1.7K0
RocketMQ 源码分析 —— Message 拉取与消费(下)
分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费
本文主要基于 RocketMQ 4.0.x 正式版 1. 概述 2. Producer 顺序发送 3. Consumer 严格顺序消费 3.1 获得(锁定)消息队列 3.2 移除消息队列 3.3 消费
程序猿DD
2018/03/26
1.6K0
分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费
RocketMQ之消费者启动与消费流程
RocketMQ是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer充当名字路由服务,整体架构图如下所示:
2020labs小助手
2022/07/12
1.1K0
RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)
这里推荐一篇Java语法糖的文章:Java 语法糖:让开发更丝滑的“幕后操作” 文章列举常用的Java语法糖并分析优劣点,让我们的开发更加丝滑~
菜菜的后端私房菜
2024/11/11
2550
关于RocketMQ消息拉取与重平衡的一些问题探讨
其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结。
张乘辉
2019/10/15
2.1K0
关于RocketMQ消息拉取与重平衡的一些问题探讨
全网最深入的RocketMQ Consumer 学习笔记
并发消费是默认的处理方法,一个消费者使用线程池技术,可以并发消费多条消息,提升机器的资源利用率。默认配置是 20 个线程,所以一台机器默认情况下,同一瞬间可以消费 20 个消息。关注公众后码猿技术专栏获取更多面试资源。
Bug开发工程师
2021/03/03
2.6K0
全网最深入的RocketMQ Consumer 学习笔记
RocketMQ为什么要保证订阅关系的一致性?
前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下:
张乘辉
2019/07/30
1.9K0
深入理解RocketMq普通消息和顺序消息使用,原理,优化
最近一直再做一些系统上的压测,并对一些问题做了优化,从这些里面收获了一些很多好的优化经验,后续的文章都会以这方面为主。
用户5397975
2019/11/28
3.5K0
深入理解RocketMq普通消息和顺序消息使用,原理,优化
RocketMQ 源码分析 —— Message 拉取与消费(上)
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!
芋道源码
2020/04/29
1.2K0
RocketMQ消费者启动流程
(1)当broker启动的时候,会把broker的地址端口、broker上的主题信息、主题队列信息发送到nameserver(如图中1) (2)消费者Client启动的时候会去nameserver拿toipc、topic队列以及对应的broker信息,拿到以后把信息存储到本地(如图中2) (3)消费者会给所有的broker发送心跳,并且附带自己的消费者组信息和ClientID信息,此时broker中就有消费者组对应的ClientID集合(如图中3) (4)消费者启动后会reblance,有订阅的主题队列列表,并且通过broker可以拿到消费者组的ClientID集合,两个集合做rebalance,就可以拿到当前消费者对应消费的主题队列 (5) 消费者知道自己消费的主题队列,就可以根据队列信息通过Netty发送消息
CBeann
2023/12/25
1740
RocketMQ消费者启动流程
消息中间件—RocketMQ消息消费(二)(push模式实现)
摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。 由于RocketMQ系列的技术分享存在一定的连续性,因此希望读者能回顾下往期RocketMQ分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一)
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(二)(push模式实现)
RocketMQ专题2:三种常用生产消费方式(顺序、广播、定时)以及顺序消费源码探究
​ 在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:
SecondWorld
2018/10/08
1.9K0
分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
本文主要基于 RocketMQ 4.0.x 正式版 1、概述 2、ConsumeQueue 结构 3、ConsumeQueue 存储 DefaultMessageStore#doDispatch(...) ConsumeQueue#putMessagePositionInfoWrapper(...) ReputMessageService FlushConsumeQueueService 4、Broker 提供[拉取消息]接口 PullMessageRequestHeader PullMessageProc
程序猿DD
2018/02/01
9710
分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
RocketMQ(一):推拉消费模型客户端实践
现在的的互联网系统中,mq已经必备基础设施了,我们已明显感觉它的必要性与强大。然而,它的本质是啥?存储转发系统罢了!
烂猪皮
2021/01/28
1.3K0
RocketMQ(一):推拉消费模型客户端实践
一文带你理解 RocketMQ 广播模式实现机制
集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,然后 Group 中的三个 Consumer 分别消费了一条消息:
jinjunzhu
2022/09/23
6950
一文带你理解 RocketMQ 广播模式实现机制
消息中间件—RocketMQ消息消费(一)
文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ中Pull和Push的两种消费方式”、“RocketMQ中消费者(Push模式)的启动流程”和“RocketMQ中Pull和Push两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于RocketMQ分布式消息队列的前几篇文章: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(一)
深入分析 RocketMQ 的 Push 消费方式实现
RocketMQ 是阿里巴巴旗下一款开源的 MQ 框架,经历过双十一考验,由 Java 编程语言实现,有非常完整的生态系统。RocketMQ 作为一款纯 Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
政采云前端团队
2023/11/09
1.5K0
深入分析 RocketMQ 的 Push 消费方式实现
6 张图告诉你 RocketMQ 是怎么保存偏移量的
对消息队列来说,偏移量是一个非常重要的概念,如果偏移量保存失败,可能会造成消息丢失、消息重复消费等问题。今天来聊一聊 RocketMQ 是怎么保存消息偏移量的。
jinjunzhu
2022/09/23
7080
6 张图告诉你 RocketMQ 是怎么保存偏移量的
深入理解广播消费
这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例、注意事项四个方面一一展开,希望能帮助到大家。
勇哥java实战
2023/09/24
3780
RocketMQ中msg&tag的生命周期
最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。
CBeann
2023/12/25
3060
RocketMQ中msg&tag的生命周期
推荐阅读
相关推荐
RocketMQ 源码分析 —— Message 拉取与消费(下)
更多 >
LV.0
永辉云创架构师
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验