前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的MQFaultStrategy

聊聊rocketmq的MQFaultStrategy

作者头像
code4it
发布2019-12-12 22:50:23
6080
发布2019-12-12 22:50:23
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下rocketmq的MQFaultStrategy

MQFaultStrategy

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java

代码语言:javascript
复制
public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }
}
  • MQFaultStrategy定义了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration属性;其selectOneMessageQueue方法在sendLatencyFaultEnable为false的时候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable为true时,先通过tpInfo.getSendWhichQueue().getAndIncrement()获取index,之后遍历tpInfo.getMessageQueueList(),计算pos(Math.abs(index++) % tpInfo.getMessageQueueList().size()),若小于0则重置为0;然后使用latencyFaultTolerance.isAvailable来判断是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)则返回该MessageQueue
  • selectOneMessageQueue方法在sendLatencyFaultEnable为true时,若遍历tpInfo.getMessageQueueList()都不可用,则通过latencyFaultTolerance.pickOneAtLeast()方法来选择notBestBroker,若其writeQueueNums大于0,则通过tpInfo.selectOneMessageQueue()选择MessageQueue,设置其brokerName为notBestBroker,设置其queueId为(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);若writeQueueNums小于等于0则执行latencyFaultTolerance.remove(notBestBroker);如果前面没有选出MessageQueue则最后使用tpInfo.selectOneMessageQueue()
  • 其updateFaultItem方法在sendLatencyFaultEnable为true时会使用computeNotAvailableDuration计算duration,然后通过latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)来更新;computeNotAvailableDuration方法则从后开始遍历latencyMax,在找到currentLatency >= latencyMax[i]时返回notAvailableDuration[i],否则最后返回0

DefaultMQProducerImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

代码语言:javascript
复制
public class DefaultMQProducerImpl implements MQProducerInner {
    private final InternalLogger log = ClientLogger.getLog();
    private final Random random = new Random();
    private final DefaultMQProducer defaultMQProducer;
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private final RPCHook rpcHook;
    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
    private final ExecutorService defaultAsyncSenderExecutor;
    private final Timer timer = new Timer("RequestHouseKeepingService", true);
    protected BlockingQueue<Runnable> checkRequestQueue;
    protected ExecutorService checkExecutor;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    private ExecutorService asyncSenderExecutor;

    //......

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }

    //......
}
  • DefaultMQProducerImpl的selectOneMessageQueue、updateFaultItem方法均委托给mqFaultStrategy来执行

小结

  • MQFaultStrategy定义了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration属性;其selectOneMessageQueue方法在sendLatencyFaultEnable为false的时候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable为true时,先通过tpInfo.getSendWhichQueue().getAndIncrement()获取index,之后遍历tpInfo.getMessageQueueList(),计算pos(Math.abs(index++) % tpInfo.getMessageQueueList().size()),若小于0则重置为0;然后使用latencyFaultTolerance.isAvailable来判断是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)则返回该MessageQueue
  • selectOneMessageQueue方法在sendLatencyFaultEnable为true时,若遍历tpInfo.getMessageQueueList()都不可用,则通过latencyFaultTolerance.pickOneAtLeast()方法来选择notBestBroker,若其writeQueueNums大于0,则通过tpInfo.selectOneMessageQueue()选择MessageQueue,设置其brokerName为notBestBroker,设置其queueId为(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);若writeQueueNums小于等于0则执行latencyFaultTolerance.remove(notBestBroker);如果前面没有选出MessageQueue则最后使用tpInfo.selectOneMessageQueue()
  • 其updateFaultItem方法在sendLatencyFaultEnable为true时会使用computeNotAvailableDuration计算duration,然后通过latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)来更新;computeNotAvailableDuration方法则从后开始遍历latencyMax,在找到currentLatency >= latencyMax[i]时返回notAvailableDuration[i],否则最后返回0

doc

  • MQFaultStrategy
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MQFaultStrategy
  • DefaultMQProducerImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档