首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq的ConsumeFromWhere

聊聊rocketmq的ConsumeFromWhere

原创
作者头像
code4it
修改2019-11-27 10:45:20
修改2019-11-27 10:45:20
2K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下rocketmq的ConsumeFromWhere

ConsumeFromWhere

rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java

代码语言:javascript
代码运行次数:0
运行
复制
public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
​
    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}
  • ConsumeFromWhere定义了CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP枚举值

computePullFromWhere

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

代码语言:javascript
代码运行次数:0
运行
复制
public class RebalancePushImpl extends RebalanceImpl {
​
    //......
​
    @Override
    public long computePullFromWhere(MessageQueue mq) {
        long result = -1;
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                }
                // First start,no offset
                else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        result = 0L;
                    } else {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    result = 0L;
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    } else {
                        try {
                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                UtilAll.YYYYMMDDHHMMSS).getTime();
                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }
​
            default:
                break;
        }
​
        return result;
    }
​
    //......
}
  • RebalancePushImpl的computePullFromWhere会判断defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • 对于CONSUME_FROM_LAST_OFFSET,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值,若lastOffset为-1,则在mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)时更新result为0,否则更新result为mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • 对于CONSUME_FROM_FIRST_OFFSET,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值,若lastOffset为-1,则更新result为0;对于CONSUME_FROM_TIMESTAMP,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值;若lastOffset为-1,则对于mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)取mQClientFactory.getMQAdminImpl().maxOffset(mq),否则取defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp()去搜索QClientFactory.getMQAdminImpl().searchOffset,将返回值更新到result

小结

  • ConsumeFromWhere定义了CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP枚举值;RebalancePushImpl的computePullFromWhere会判断defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • 若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值;对于lastOffset为-1且mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX),CONSUME_FROM_LAST_OFFSET取0,CONSUME_FROM_TIMESTAMP取mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • 对于lastOffset为-1但是非q.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)的情况,CONSUME_FROM_LAST_OFFSET取mQClientFactory.getMQAdminImpl().maxOffset(mq),CONSUME_FROM_TIMESTAMP取mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ConsumeFromWhere
  • computePullFromWhere
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档