前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq5的顺序消息队列选择

聊聊rocketmq5的顺序消息队列选择

原创
作者头像
code4it
发布2024-08-09 22:07:03
820
发布2024-08-09 22:07:03
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下rocketmq5的顺序消息的队列选择

SendMessageActivity

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java

代码语言:javascript
复制
public class SendMessageActivity extends AbstractMessingActivity {

    public SendMessageActivity(MessagingProcessor messagingProcessor,
        GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
        super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
    }

    public CompletableFuture<SendMessageResponse> sendMessage(ProxyContext ctx, SendMessageRequest request) {
        CompletableFuture<SendMessageResponse> future = new CompletableFuture<>();

        try {
            if (request.getMessagesCount() <= 0) {
                throw new GrpcProxyException(Code.MESSAGE_CORRUPTED, "no message to send");
            }

            List<apache.rocketmq.v2.Message> messageList = request.getMessagesList();
            apache.rocketmq.v2.Message message = messageList.get(0);
            Resource topic = message.getTopic();
            validateTopic(topic);

            future = this.messagingProcessor.sendMessage(
                ctx,
                new SendMessageQueueSelector(request),
                topic.getName(),
                buildSysFlag(message),
                buildMessage(ctx, request.getMessagesList(), topic)
            ).thenApply(result -> convertToSendMessageResponse(ctx, request, result));
        } catch (Throwable t) {
            future.completeExceptionally(t);
        }
        return future;
    }

    //......
}    

SendMessageActivity继承了AbstractMessingActivity,其sendMessage方法通过messagingProcessor.sendMessage进行消息发送,其传递的QueueSelector参数为new SendMessageQueueSelector(request)

SendMessageQueueSelector

代码语言:javascript
复制
    protected static class SendMessageQueueSelector implements QueueSelector {

        private final SendMessageRequest request;

        public SendMessageQueueSelector(SendMessageRequest request) {
            this.request = request;
        }

        @Override
        public AddressableMessageQueue select(ProxyContext ctx, MessageQueueView messageQueueView) {
            try {
                apache.rocketmq.v2.Message message = request.getMessages(0);
                String shardingKey = null;
                if (request.getMessagesCount() == 1) {
                    shardingKey = message.getSystemProperties().getMessageGroup();
                }
                AddressableMessageQueue targetMessageQueue;
                if (StringUtils.isNotEmpty(shardingKey)) {
                    // With shardingKey
                    List<AddressableMessageQueue> writeQueues = messageQueueView.getWriteSelector().getQueues();
                    int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size());
                    targetMessageQueue = writeQueues.get(bucket);
                } else {
                    targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false);
                }
                return targetMessageQueue;
            } catch (Exception e) {
                return null;
            }
        }
    }

SendMessageQueueSelector实现了QueueSelector接口,其select方法先获取系统属性中的messageGroup作为shardingKey,若该值不为空则通过Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size())进行hash,然后取writeQueues.get(bucket)作为targetMessageQueue;若shardingKey为空则通过messageQueueView.getWriteSelector().selectOneByPipeline(false)来选择targetMessageQueue

Hashing.consistentHash

com/google/common/hash/Hashing.java

代码语言:javascript
复制
  /**
   * Assigns to {@code input} a "bucket" in the range {@code [0, buckets)}, in a uniform manner that
   * minimizes the need for remapping as {@code buckets} grows. That is, {@code consistentHash(h,
   * n)} equals:
   *
   * <ul>
   *   <li>{@code n - 1}, with approximate probability {@code 1/n}
   *   <li>{@code consistentHash(h, n - 1)}, otherwise (probability {@code 1 - 1/n})
   * </ul>
   *
   * <p>This method is suitable for the common use case of dividing work among buckets that meet the
   * following conditions:
   *
   * <ul>
   *   <li>You want to assign the same fraction of inputs to each bucket.
   *   <li>When you reduce the number of buckets, you can accept that the most recently added
   *       buckets will be removed first. More concretely, if you are dividing traffic among tasks,
   *       you can decrease the number of tasks from 15 and 10, killing off the final 5 tasks, and
   *       {@code consistentHash} will handle it. If, however, you are dividing traffic among
   *       servers {@code alpha}, {@code bravo}, and {@code charlie} and you occasionally need to
   *       take each of the servers offline, {@code consistentHash} will be a poor fit: It provides
   *       no way for you to specify which of the three buckets is disappearing. Thus, if your
   *       buckets change from {@code [alpha, bravo, charlie]} to {@code [bravo, charlie]}, it will
   *       assign all the old {@code alpha} traffic to {@code bravo} and all the old {@code bravo}
   *       traffic to {@code charlie}, rather than letting {@code bravo} keep its traffic.
   * </ul>
   *
   * <p>See the <a href="http://en.wikipedia.org/wiki/Consistent_hashing">Wikipedia article on
   * consistent hashing</a> for more information.
   */
  public static int consistentHash(long input, int buckets) {
    checkArgument(buckets > 0, "buckets must be positive: %s", buckets);
    LinearCongruentialGenerator generator = new LinearCongruentialGenerator(input);
    int candidate = 0;
    int next;

    // Jump from bucket to bucket until we go out of range
    while (true) {
      next = (int) ((candidate + 1) / generator.nextDouble());
      if (next >= 0 && next < buckets) {
        candidate = next;
      } else {
        return candidate;
      }
    }
  }

guava的consistentHash使用LinearCongruentialGenerator来生成double

小结

rocketmq5的消息消息的队列选择是在proxy模块中,它根据messageGroup使用Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size())来作为writeQueues的下标。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SendMessageActivity
  • SendMessageQueueSelector
  • Hashing.consistentHash
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档