首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >RabbitMQ都写了,RocketMQ怎么能落下?

RabbitMQ都写了,RocketMQ怎么能落下?

作者头像
Java识堂
发布于 2020-07-28 07:47:57
发布于 2020-07-28 07:47:57
9890
举报
文章被收录于专栏:Java识堂Java识堂

整体架构

最近看到了我在Github上写的rabbitmq-examples陆续被人star了,就想着写个rocketmq-examples。对rabbitmq感兴趣的小伙伴可以看我之前的文章。下面把RocketMQ的各个特性简单介绍一下,这样在用的时候心里也更有把握

全网最全RabbitMQ总结,别再说你不会RabbitMQ

RocketMQ是阿里自研的消息中间件,RocketMQ的整体架构如下

主要有4个角色

Producer:消息生产者。类似,发信者 Consumer:消息消费者。类似,收信者 BrokerServer:消息的存储,投递,查询。类似,邮局 NameServer:注册中心,支持Broker的动态注册与发现。类似,邮局的管理结构

再介绍几个基本概念

Topic(主题):一类消息的集合,Topic和消息是一对多的关系。每个Broker可以存储多个Topic的消息,每个Topic也可以分片存储于不同的Broker

Tag(标签):在Topic类别下的二级子类别。如财务系统的所有消息的Topic为Finance_Topic,创建订单消息的Tag为Create_Tag,关闭订单消息的Tag为Close_Tag。这样就能根据Tag消费不同的消息,当然你也可以为创建订单和关闭订单的消息各自创建一个Topic

Message Queue(消息队列):相当于Topic的分区,用于并行发送和消费消息。Message Queue在Broker上,一个Topic默认的Message Queue的数量为4

Producer Group(生产者组):同一类Producer的集合。如果发送的是事务消息且原始生产者在发送之后崩溃,Broker会联系统一生产者组内的其他生产者实例以提交或回溯消费

Consumer Group(消费者组):同一类Consumer的集合。消费者组内的实例必须订阅完全相同的Topic

Clustering(集群消费):相同Consumer Group下的每个Consumer实例平均分摊消息

Broadcasting(广播消费):相同Consumer Group的每个Consumer实例都接收全量的消息

用图演示一下Clustering和Broadcasting的区别

如果我有一条订单程成交的消息,财务系统和物流系统都要同时订阅消费这条消息,该怎么办呢?定义2个Consumer Group即可

Consumer1和Consumer2属于一个Consumer Group,Consumer3和Consumer4属于一个Consumer Group,消息会全量发送到这2个Consuemr Group,至于这2个Consumer Group是集群消费还是广播消费,自己定义即可

工作流程在官方文档写的很详细,不再深入了

https://github.com/apache/rocketmq/tree/master/docs/cn

Message

消息的各种处理方式涉及到的内容较多,所以我就不在文章中放代码了,直接放GitHub了,目前还在不断完善中

地址为:https://github.com/erlieStar/rocketmq-examples,

和之前的RabbitMQ一个风格,基本上所有知识点都涉及到了

地址为:https://github.com/erlieStar/rabbitmq-example

每个消息必须属于一个Topic。RocketMQ中每个消息具有唯一的Message Id,且可以携带具有业务标识的Key,我们可以通过Topic,Message Id或Key来查询消息

消息消费的方式

  1. Pull(拉取式消费),Consumer主动从Broker拉取消息
  2. Push(推送式消费),Broker收到数据后会主动推送给Consumer,实时性较高

消息的过滤方式

  1. 指定Tag
  2. SQL92语法过滤

消息的发送方式

  1. 同步,收到响应后才会发送下一条消息
  2. 异步,一直发,用异步的回调函数来获取结果
  3. 单向(只管发,不管结果)

消息的种类

  1. 顺序消息
  2. 延迟消息
  3. 批量消息
  4. 事务消息

顺序消息

顺序消息分为局部有序和全局有序

官方介绍为普通顺序消息和严格顺序消息

局部有序:同一个业务相关的消息是有序的,如针对同一个订单的创建和付款消息是有序的,只需要在发送的时候指定message queue即可,如下所示,将同一个orderId对应的消息发送到同一个队列

代码语言:javascript
AI代码解释
复制
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
 /**
  * @param mqs topic对应的message queue
  * @param msg send方法传入的message
  * @param arg send方法传入的orderId
  */
 @Override
 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  // 根据业务对象选择对应的队列
  Integer orderId = (Integer) arg;
  int index = orderId % mqs.size();
  return mqs.get(index);
 }
}, orderId);

消费者所使用的Listener必须是MessageListenerOrderly(对于一个队列的消息采用一个线程去处理),而平常的话我们使用的是MessageListenerConcurrently

全局有序:要想实现全局有序,则Topic只能有一个message queue。

延迟消息

RocketMQ并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

代码语言:javascript
AI代码解释
复制
// org.apache.rocketmq.store.config.MessageStoreConfig 
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

批量消息

批量发送消息能显著提高传递小消息的性能,限制是这批消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息,一批消息的总大小不应超过1MB

事务消息

事务在实际的业务场景中还是经常遇到的,以转账为例子

张三给李四转账100元,可以分为如下2步

  1. 张三的账户减去100元
  2. 李四的账户加上100元

这2个操作要是同时成功,要是同时失败,不然会造成数据不一致的情况,基于单个数据库Connection时,我们只需要在方法上加上@Transactional注解就可以了。

如果基于多个Connection(如服务拆分,数据库分库分表),加@Transactional此时就不管用了,就得用到分布式事务

分布式事务的解决方案很多,RocketMQ只是其中一种方案,RocketMQ可以保证最终一致性

RocketMQ实现分布式事务的流程如下

  1. producer向mq server发送一个半消息
  2. mq server将消息持久化成功后,向发送方确认消息已经发送成功,此时消息并不会被consumer消费
  3. producer开始执行本地事务逻辑
  4. producer根据本地事务执行结果向mq server发送二次确认,mq收到commit状态,将消息标记为可投递,consumer会消费该消息。mq收到rollback则删除半消息,consumer将不会消费该消息,如果收到unknow状态,mq会对消息发起回查
  5. 在断网或者应用重启等特殊情况下,步骤4提交的2次确认有可能没有到达mq server,经过固定时间后mq会对该消息发起回查
  6. producer收到回查后,需要检查本地事务的执行状态
  7. producer根据本地事务的最终状态,再次提交二次确认,mq仍按照步骤4对半消息进行操作

理解了原理,看代码实现就很容易了,放一个官方的example

代码语言:javascript
AI代码解释
复制
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger index = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = index.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (status != null) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

实现分布式事务需要实现TransactionListener接口,2个方法的作用如下

  1. executeLocalTransaction,执行本地事务
  2. checkLocalTransaction,回查本地事务状态

针对这个例子,所有的消息都会回查,因为返回的都是UNKNOW,回查的时候status=1的数据会被消费,status=2的数据会被删除,status=0的数据会一直回查,直到超过默认的回查次数。

发送方代码如下

代码语言:javascript
AI代码解释
复制
public class TransactionProducer {

    public static final String RPODUCER_GROUP_NAME = "transactionProducerGroup";
    public static final String TOPIC_NAME = "transactionTopic";
    public static final String TAG_NAME = "transactionTag";

    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer(RPODUCER_GROUP_NAME);

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100), new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread();
                thread.setName("transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message message = new Message(TOPIC_NAME, TAG_NAME,
                    ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        TimeUnit.HOURS.sleep(1);
        producer.shutdown();
    }
}

看到这,可能有人会问了,我们先执行本地事务,执行成功后再发送消息,这样可以吗?

其实这样做还是有可能会造成数据不一致的问题。假如本地事务执行成功,发送消息,由于网络延迟,消息发送成功,但是回复超时了,抛出异常,本地事务回滚。但是消息其实投递成功并被消费了,此时就会造成数据不一致的情况

那消息投递到mq server,consumer消费失败怎么办?

如果是消费超时,重试即可。如果是由于代码等原因真的消费失败了,此时就得人工介入,重新手动发送消息,达到最终一致性。

消息重试

发送端重试

producer向broker发送消息后,没有收到broker的ack时,rocketmq会自动重试。重试的次数可以设置,默认为2次

代码语言:javascript
AI代码解释
复制
DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
// 同步发送设置重试次数为5次
producer.setRetryTimesWhenSendFailed(5);
// 异步发送设置重试次数为5次
producer.setRetryTimesWhenSendAsyncFailed(5);
消费端重试
顺序消息的重试

对于顺序消息,当Consumer消费消息失败后,RocketMQ会不断进行消息重试,此时后续消息会被阻塞。所以当使用顺序消息的时候,监控一定要做好,避免后续消息被阻塞

无序消息的重试

当消费模式为集群模式时,Broker才会自动进行重试,对于广播消息是不会进行重试的

当consumer消费消息后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表明消费消息成功,不会进行重试

当consumer符合如下三种场景之一时,会对消息进行重试

  1. 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 返回null
  3. 主动或被动抛出异常

RocketMQ默认每条消息会被重试16次,超过16次则不再重试,会将消息放到死信队列,当然我们也可以自己设置重试次数

每次重试的时间间隔如下

第几次重试

与上次间隔时间

第几次重试

与上次间隔时间

1

10s

10

7分钟

2

30s

11

8分钟

3

1分钟

12

9分钟

4

2分钟

13

10分钟

5

3分钟

14

20分钟

6

4分钟

15

30分钟

7

5分钟

16

1小时

8

6分钟

17

2小时

重试队列和死信队列

当消息消费失败,会被发送到重试队列

当消息消费失败,并达到最大重试次数,rocketmq并不会将消息丢弃,而是将消息发送到死信队列

死信队列有如下特点

  1. 里面存的是不能被正常消费的消息
  2. 有效期与正常消息相同,都是3天,3天后会被删除
  3. 每个死信队列对应一个Consumer Group ID,即死信队列是消费者组级别的
  4. 如果一个Consumer Group没有产生死信消息,则RocketMQ不会创建对应的死信队列
  5. 死信队列包含了一个Consumer Group下的所有死信消息,不管该消息属于哪个Topic

重试队列的命名为 %RETRY%消费组名称 死信队列的命名为 %DLQ%消费组名称

RocketMQ高性能和高可用的方式

整体架构

rocketmq是通过broker主从机制来实现高可用的。相同broker名称,不同brokerid的机器组成一个broker组,brokerId=0表明这个broker是master,brokerId>0表明这个broker是slave。

消息生产的高可用:创建topic时,把topic的多个message queue创建在多个broker组上。这样当一个broker组的master不可用后,producer仍然可以给其他组的master发送消息。rocketmq目前还不支持主从切换,需要手动切换

消息消费的高可用:consumer并不能配置从master读还是slave读。当master不可用或者繁忙的时候consumer会被自动切换到从slave读。这样当master出现故障后,consumer仍然可以从slave读,保证了消息消费的高可用

消息存储结构

RocketMQ需要保证消息的高可靠性,所以要将数据通过磁盘进行持久化存储。

将数据存到磁盘会不会很慢?其实磁盘有时候比你想象的快,有时候比你想象的慢。目前高性能磁盘的顺序写速度可以达到600M/s,而磁盘的随机写大概只有100k/s,和顺序写的性能相差6000倍,所以RocketMQ采用顺序写。

并且通过mmap(零拷贝的一种实现方式,零拷贝可以省去用户态到内核态的数据拷贝,提高速度)具体原理并不是很懂,有兴趣的小伙伴可以看看相关书籍

总而言之,RocketMQ通过顺序写和零拷贝技术实现了高性能的消息存储

和消息相关的文件有如下几种

  1. CommitLog:存储消息的元数据
  2. ConsumerQueue:存储消息在CommitLog的索引
  3. IndexFile:提供了一种通过key或者时间区间来查询消息的方法

刷盘机制

  1. 同步刷盘:消息被写入内存的PAGECACHE,返回写成功状态,当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 。吞吐量低,但不会造成消息丢失
  2. 异步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。吞吐量高,当磁盘损坏时,会丢失消息

主从复制

如果一个broker有master和slave时,就需要将master上的消息复制到slave上,复制的方式有两种

  1. 同步复制:master和slave均写成功,才返回客户端成功。maste挂了以后可以保证数据不丢失,但是同步复制会增加数据写入延迟,降低吞吐量
  2. 异步复制:master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,但是当master出现故障后,有可能造成数据丢失

负载均衡

Producer负载均衡

producer在发送消息时,默认轮询所有queue,消息就会被发送到不同的queue上。而queue可以分布在不同的broker上

Consumer负载均衡

默认的分配算法是AllocateMessageQueueAveragely,如下图

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊queue,只是以环状轮流分queue的形式,如下图:

如果consumer数量比message queue还多,则多会来的consumer会被闲置。所以不要让consumer的数量多于message queue的数量

图形化管理工具

在rocketmq-externals这个项目中提供了rocketmq的很多扩展工具

github地址如下:https://github.com/apache/rocketmq-externals

其中有一个子项目rocketmq-console提供了rocketmq的图像化工具,提供了很多实用的功能,如前面说的通过Topic,Message Id或Key来查询消息,重新发送消息等,还是很方便的

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java识堂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RocketMQ事务消息实现的底层原理是什么
在我们熟知的Kafka、RabbitMQ等消费队列中,都没有实现事务消息这个功能,RocketMQ是唯一实现了这一功能的消息队列中间件,今天我们来从底层看一下RocketMQ如何实现的事务消息
潋湄
2025/01/19
3650
RocketMQ事务消息实现的底层原理是什么
RocketMQ
同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
ruochen
2021/11/23
3K0
精选RocketMQ面试题[33题]
queue 就是来源于数据结构的 FIFO 队列。而 Topic 是个抽象的概念,每个 Topic 底层对应N个 queue,而数据也真实存在 queue 上的。
一行Java
2022/04/07
4.9K0
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
并且,索性咱就直接把这个坑填得满满的,直接盘点RocketMQ支持的11种消息类型以及背后的实现原理
三友的java日记
2023/11/09
9180
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
万字精华总结RocketMQ的常见用法(案例+图)
上篇博文,我们介绍了什么是RocketMQ,以及如何安装单机版的RocketMQ。在安装的过程了,我们主要安装了两个服务,NameServer和Broker。在发送和接收消息时,又接触了两个概念,生产者和消费者。
程序猿小亮
2021/12/07
2.3K0
万字精华总结RocketMQ的常见用法(案例+图)
RocketMQ高级功能代码实现|文末赠书
本文章我们将通过一些实际的案例,引出解决方案,并实际通过代码来实现。通过本节的学习,你可以确切的掌握实际编码能力。 本文选自北京大学出版社的《分布式中间件核心原理与RocketMQ最佳实践》一书 本书:分布式中间件核心原理与RocketMQ实战技术一本通:实战案例+操作步骤+执行效果图,手把手教你吃透分布式中间件技术,轻松实现从小白到大牛的职业跃迁。 1,事务消息代码实现 之前我们已经在讨论订单业务消息丢失问题中引出了事务消息,本内容我们就实际用代码来实现一下事务消息吧。     首先我们用原生代码来实现
程序猿DD
2023/04/04
4610
RocketMQ高级功能代码实现|文末赠书
10分钟掌握RocketMQ的核心知识
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。
微观技术
2021/04/19
8020
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。
jinjunzhu
2022/09/23
1.2K0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
一篇文章把RabbitMQ、RocketMQ、Kafka三元归一
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/07/12
9650
一篇文章把RabbitMQ、RocketMQ、Kafka三元归一
RocketMQ事务消息使用与原理
最近在找工作,面试过程中被多次问到事务消息的实现原理,另外在分布式事务解决方案中,事务消息也是一个不错的解决方案,本篇文章将围绕RocketMQ的事务消息实现展开描述。
叔牙
2022/09/27
1.6K0
RocketMQ事务消息使用与原理
【RocketMQ系列(三)】基于RocketMQ的分布式事务
现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。
行百里er
2020/12/02
2K0
【RocketMQ系列(三)】基于RocketMQ的分布式事务
RocketMQ
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
JokerDJ
2023/11/27
2.1K0
RocketMQ
RocketMq之事务消息实现原理
RocketMQ提供了事务消息的功能,采用2PC(两段式协议)+补偿机制(事务回查)的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。
周同学
2020/06/19
3.7K0
RocketMq之事务消息实现原理
RocketMQ事务消息原理简析
零、业务场景在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等,交易系统要将订单落入DB和发送订单消息保证一致,不能本地事务回滚,订单没有生成但是发送了创建订单消息,下游系统产生脏数据,也不能订单已经创建,但是下游系统没有感知继而无法履约,影响用户体验。如果让我们自己实现的话,当然也是有办法的。比如在业务数据库中建立一张消息表用于存储消息,将业务数
白少年
2023/01/13
7610
RocketMQ原生API使用
代码地址:https://gitcode.net/java_wxid/rocketmq-api
Java廖志伟
2022/03/07
1.2K0
RocketMQ原生API使用
SpringBoot集成RocketMq
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
程序员子龙
2023/11/26
6790
SpringBoot集成RocketMq
彻底看懂RocketMQ事务实现原理
面试中经常会问到比如RocketMQ的事务是如何实现的呢?学习框架,我们不仅要熟练使用,更要掌握设计及原理,才算熟悉一个框架。
Bug开发工程师
2020/09/30
1.3K0
彻底看懂RocketMQ事务实现原理
RocketMQ入门(三)
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
传说之下的花儿
2023/11/29
5200
RocketMQ入门(三)
RocketMQ 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
GreizLiao
2021/08/13
1.3K0
RocketMQ 事务消息
RocketMQ 设计原理与最佳实践
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
Java识堂
2021/06/01
1.4K0
RocketMQ 设计原理与最佳实践
相关推荐
RocketMQ事务消息实现的底层原理是什么
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
首页
学习
活动
专区
圈层
工具
MCP广场
首页
学习
活动
专区
圈层
工具
MCP广场