Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ系列 | 如何让消息“丢失”?

RocketMQ系列 | 如何让消息“丢失”?

作者头像
烟雨平生
发布于 2023-10-01 00:37:31
发布于 2023-10-01 00:37:31
53700
代码可运行
举报
文章被收录于专栏:数字化之路数字化之路
运行总次数:0
代码可运行
  • RocketMQ简介
  • RocketMQ领域模型
  • 如何让消息“丢失”
  • 小结:如果你担心某种情况发生,那么它就更有可能发生。

RocketMQ 简介

RocketMQ 5.0: 云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。

RocketMQ领域模型

如上图所示,Apache RocketMQ 中消息的生命周期主要分为消息生产消息存储消息消费这三部分。

生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。

消息生产

生产者(Producer):Apache RocketMQ 领域中用于产生消息的运行实体,一般集成于业务调用链路的上游。

消息存储

  • 主题(Topic): Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
  • 队列(MessageQueue): Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
  • 消息(Message): Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

  • 消费者分组(ConsumerGroup): Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
  • 消费者(Consumer): Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
  • 订阅关系(Subscription): Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。 Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

如何让“消息丢失”?

在“如何让消息丢失”之前,让我们梳理一下消息的生命周期,先对齐下整体的概念。

一条消息的历程

1、发送场景丢失消息

1.1 单向发送

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 发送消息,Oneway形式,服务器不应答,
     * 无法保证消息是否成功到达服务器
     *
     * @param message 要发送的消息
     */
    void sendOneway(final Message message);
com.aliyun.openservices.ons.api.Producer#sendOneway

RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

同步发送:同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。 异步发送:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。

此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

1.2 发送失败时未重试或补偿

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service
@Slf4j
public class ProductSender {

    @Autowired
    private Producer producer;

    public void sendMsg(String content) {
        try {
            byte[] body = content.getBytes(StandardCharsets.UTF_8);
            Message message = new Message("topicName", "tagName", "msgKey", body);
            SendResult sendResult = producer.send(message);
        } catch (Exception ignored) {
            // TODO: 2023/9/29 发送失败时无处理,网络抖动或服务不稳定时会造成消息丢失
        }
    }


}

2、消息存储场景丢失消息

2.1 、Broker宕机或者磁盘损坏,Broker Server内存中的消息没有落盘

2.2 、过期清理机制引发消息丢失 Apache RocketMQ 中队列的定义,消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。因此,在实际使用中需要考虑以下问题,消息在服务端中的存储以什么维度为判定条件?消息存储以什么粒度进行管理?消息存储超过限制后如何处理?这些问题都是由消息存储和过期清理机制来定义的。

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。删除旧的没有使用过的消息是由后台定时任务完成的。

消息存储文件结构说明

3、消费场景丢失消息

3.1 消费失败,但消费消息的返回结果为成功

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service
@Slf4j
public class MissingMsgWhenConsumeFail implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            return Action.CommitMessage;
        } catch (Exception e) {
            //丢失消息:消费失败了,但消费消息的返回结果为成功。
            return Action.CommitMessage;
        }
    }

}

RocketMQ消费场景引发的系统故障

3.2 订阅关系不一致导致消息丢失

您可在云消息队列 RocketMQ 版控制台Group 详情页面查看指定Group的订阅关系是否一致。出现订阅关系不一致时,控制台中也会有告警:

同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

如下图所示,一个队列中分发不同类型的消息。 如果一个消费者Group ID订阅了tagA和tagB,那么这个消费组下消费者绑定的队列中会被borker投递所订阅所有Tag的信息。

消息丢失的根因是,一个队列在同一时间只会被分配给一个消费者,这样队列上不符合消息过滤规则的消息消费会被忽略,并且消息消费的进度会向前移动,从而造成消息丢失。 经典实践一个GroupId[消费组]只在一个JVM中使用

正确订阅关系一:相同Group ID的N个消费者订阅一个Topic且订阅一个Tag

正确订阅关系二:相同Group ID的N个消费者订阅一个Topic且订阅多个Tag

正确订阅关系三:相同Group ID的N个消费者订阅多个Topic且订阅多个Tag

小结

在RocketMQ领域中,一条消息从生产、存储、消费整个链路中都可以让消息“丢失”。 业务逻辑复杂,历史久远的接口出现数据错误怎么办? 干货|如何快速问题出在哪了? 从全链路视角看,让消息丢失的漏洞百出。

那么,你“学会”让消息丢失的"技巧"了吗?

参考

https://rocketmq.apache.org/zh/docs/

发送普通消息(单向发送):https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/sample-code-2

发送普通消息(三种方式):https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-normal-messages-in-one-of-three-modes

消息存储机制:https://rocketmq.apache.org/zh/docs/featureBehavior/11messagestorepolicy

消息在云消息队列 RocketMQ 版中能保存多久?https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/faq-about-features#section-r2b-stc-pz6

常见订阅关系不一致问题 https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/subscription-consistency

MQ相关阅读 RabbitMQ消息为什么变成了数字呢? 微服务+RabbitMQ之从零到yi

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-29 20:13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
面试系列之-rocketmq消息机制
广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer进行消费;
用户4283147
2022/12/29
1.6K0
面试系列之-rocketmq消息机制
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
在队列的基础上,加入生产者与消费者模型,使用队列作为载体就能够组成简单的消息队列,在队列中“运输”的数据被称为消息
菜菜的后端私房菜
2024/09/12
1.1K0
Rocketmq--消息驱动
MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数
IT小马哥
2021/06/03
7260
Rocketmq--消息驱动
【消息队列 MQ 专栏】消息队列之 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。其主要特点有:
芋道源码
2018/07/31
6.4K0
【消息队列 MQ 专栏】消息队列之 RocketMQ
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
并且,索性咱就直接把这个坑填得满满的,直接盘点RocketMQ支持的11种消息类型以及背后的实现原理
三友的java日记
2023/11/09
7400
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
Spring Cloud 集成 rocketMq
后来通过查询资料得知,可能阿里的rocketMq服务版本比较高,ons客户端版本已经到了4.8而spring-cloud-starter-stream-
ruochen
2021/12/16
2.5K0
Springboot快速集成RocketMq
RocketMQ前身是阿里研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
用户7353950
2022/05/11
2K0
Springboot快速集成RocketMq
RocketMQ实战教程之常见概念和模型
官方文档: https://rocketmq.apache.org/zh/docs/introduction/02concepts
全干程序员demo
2024/05/30
2030
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。
腾讯云中间件团队
2025/04/04
1420
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
RocketMQ 和 RabbitMQ 的比较以及 RocketMQ 的使用
消息队列在项目中会经常用到,目前我们使用的是 RabbitMQ,但在 Java 技术栈下,RocketMQ 使用的比较多。下面比较下 RabbitMQ 和 RocketMQ。 RabbitMQ 和 RocketMQ 对比 1、设计理念和架构
郑子铭
2025/02/25
8280
RocketMQ 和 RabbitMQ 的比较以及 RocketMQ 的使用
RocketMq的使用demo
生产者和消费者都属于MQ的客户端,都继承于ClientConfig类,ClientConfig为客户端的公共配置类。这里将客户端相关配置信息写在最前面,大家可以看了就知道大概由哪些属性了,客户端配置
名字是乱打的
2021/12/24
1.1K0
RocketMq的使用demo
10分钟掌握RocketMQ的核心知识
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。
微观技术
2021/04/19
6610
RocketMQ入门手册
继我上一篇博客后 分布式消息队列RocketMQ学习教程① 上一篇博客最主要介绍了几种常用的MQ,所以本博客再简单介绍一下RocketMQ的原理和简单的例子,基于Java实现,希望可以帮助学习者
SmileNicky
2019/01/17
1.6K0
RocketMQ 常用消息类型
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
程序员果果
2021/02/02
9560
RabbitMQ都写了,RocketMQ怎么能落下?
最近看到了我在Github上写的rabbitmq-examples陆续被人star了,就想着写个rocketmq-examples。对rabbitmq感兴趣的小伙伴可以看我之前的文章。下面把RocketMQ的各个特性简单介绍一下,这样在用的时候心里也更有把握
Java识堂
2020/07/28
9130
RabbitMQ都写了,RocketMQ怎么能落下?
Apache RocketMQ QuickStart
RocketMQ作为一款分布式的消息中间件(阿里的说法是不遵循任何规范的,所以不能完全用JMS的那一套东西来看它),经历了Metaq1.x、Metaq2.x的发展和淘宝双十一的洗礼,在功能和性能上远超ActiveMQ。
一个会写诗的程序员
2018/08/17
7580
Apache RocketMQ QuickStart
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。
知了一笑
2019/07/19
2.1K0
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
RocketMQ事务消息使用与原理
最近在找工作,面试过程中被多次问到事务消息的实现原理,另外在分布式事务解决方案中,事务消息也是一个不错的解决方案,本篇文章将围绕RocketMQ的事务消息实现展开描述。
叔牙
2022/09/27
1.5K0
RocketMQ事务消息使用与原理
RocketMQ学习-消息发布和订阅
前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
阿杜
2018/08/06
6.1K0
RocketMQ学习-消息发布和订阅
RocketMQ原生API使用
代码地址:https://gitcode.net/java_wxid/rocketmq-api
Java廖志伟
2022/03/07
1.1K0
RocketMQ原生API使用
相关推荐
面试系列之-rocketmq消息机制
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验