前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的consumeMessageBatchMaxSize

聊聊rocketmq的consumeMessageBatchMaxSize

原创
作者头像
code4it
修改2019-11-26 10:09:53
2.4K0
修改2019-11-26 10:09:53
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下rocketmq的consumeMessageBatchMaxSize

consumeMessageBatchMaxSize

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:javascript
复制
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
​
    private final InternalLogger log = ClientLogger.getLog();
​
    //......
​
    /**
     * Batch consumption size
     */
    private int consumeMessageBatchMaxSize = 1;
​
    public int getConsumeMessageBatchMaxSize() {
        return consumeMessageBatchMaxSize;
    }
​
    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
    }
​
    //......
}    
  • DefaultMQPushConsumer定义了consumeMessageBatchMaxSize属性,默认值为1

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:javascript
复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
​
    //......
​
    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
​
        //......
​
        // consumeMessageBatchMaxSize
        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
            || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
            throw new MQClientException(
                "consumeMessageBatchMaxSize Out of range [1, 1024]"
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        }
​
        //......
​
    }
​
    //......
}
  • checkConfig方法会校验defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必须大于等于且小于等于1024

submitConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

代码语言:javascript
复制
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
​
    //......
​
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
​
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
​
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }
​
    //......
}
  • submitConsumeRequest方法在msgs.size()小于等于consumeBatchSize时会创建ConsumeRequest,然后提交到consumeExecutor执行,若出现RejectedExecutionException则执行submitConsumeRequestLater;对于msgs.size()大于consumeBatchSize的,则按consumeBatchSize分批创建ConsumeRequest提交给consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater

小结

DefaultMQPushConsumer定义了consumeMessageBatchMaxSize属性,默认值为1;DefaultMQPushConsumerImpl的checkConfig方法会校验defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必须大于等于且小于等于1024;ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小于等于consumeBatchSize时会创建ConsumeRequest,然后提交到consumeExecutor执行,若出现RejectedExecutionException则执行submitConsumeRequestLater;对于msgs.size()大于consumeBatchSize的,则按consumeBatchSize分批创建ConsumeRequest提交给consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • consumeMessageBatchMaxSize
  • checkConfig
  • submitConsumeRequest
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档