首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >rabbitmq系列(三)消息幂等性处理

rabbitmq系列(三)消息幂等性处理

原创
作者头像
Java旅途
修改于 2020-06-28 02:21:42
修改于 2020-06-28 02:21:42
1.1K0
举报
文章被收录于专栏:Java旅途Java旅途

一、springboot整合rabbitmq

  1. 我们需要新建两个工程,一个作为生产者,另一个作为消费者。在pom.xml中添加amqp依赖:
代码语言:txt
AI代码解释
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml文件中添加rabbitmq的相关信息:
代码语言:txt
AI代码解释
复制
spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 登录账号
    username: guest
    # 登录密码
    password: guest
    # 虚拟主机
    virtual-host: /
  1. 在生产者工程中新建配置项rabbitmqConfig.java,申明名称为”byte-zb“直连交换机和队列,使用”byte-zb“的routing-key将队列和交换机绑定,代码如下:
代码语言:txt
AI代码解释
复制
@Configuration
public class RabbitConfig {

    public static final String QUEUE_NAME = "byte-zb";

    public static final String EXCHANGE_NAME = "byte-zb";

    public static final String ROUTING_KEY = "byte-zb";

    // 队列申明
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME);
    }

    // 申明交换机
    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange(EXCHANGE_NAME);
    }

    // 数据绑定申明
    @Bean
    public Binding directBinding(){

        return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);
    }
}
  1. 创建生产者发送一条消息,代码如下:
代码语言:txt
AI代码解释
复制
@RestController
public class Producer {
    public static final String QUEUE_NAME = "byte-zb";

    public static final String EXCHANGE_NAME = "byte-zb";

    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/send")
    public void sendMessage(){

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email","11111111111");
        jsonObject.put("timestamp",System.currentTimeMillis());
        String json = jsonObject.toJSONString();
        System.out.println(json);
        amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,json);
    }

}
  1. 在消费者工程里创建消费者消费消息,代码如下:
代码语言:txt
AI代码解释
复制
@Component
public class Consumer throws Exception{

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message){

        System.out.println("接收到的消息为" message);
    }
}

我们启动生产者,然后请求send接口,然后打开rabbitmq控制台发现多了一个名为”byte-zb“的交换机和队列,并且队列中出现了一个未消费的消息,然后启动消费者,我们会在控制台上发现打印了一条消息,同时rabbitmq控制台中”byte-zb“的队列中消息没有了。

二、自动补偿机制

如果消费者消息消费不成功的话,会出现什么情况呢?我们修改一下消费者代码,然后看看。

代码语言:txt
AI代码解释
复制
@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) throws Exception {

        System.out.println("接收到的消息为" message);
        int i = 1 / 0;
    }
}

我们会看到消费者工程控制台一直在刷新报错,当消费者配出异常,也就是说当消息消费不成功的话,该消息会存放在rabbitmq的服务端,一直进行重试,直到不抛出异常为止。

如果一直抛异常,我们的服务很容易挂掉,那有没有办法控制重试几次不成功就不再重试了呢?答案是有的。我们在消费者application.yml中增加一段配置。

代码语言:txt
AI代码解释
复制
spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 登录账号
    username: guest
    # 登录密码
    password: guest
    # 虚拟主机
    virtual-host: /
    listener:
      simple:
        retry:
          enabled: true # 开启消费者进行重试
          max-attempts: 5 # 最大重试次数
          initial-interval: 3000 # 重试时间间隔

上面配置的意思是消费异常后,重试五次,每次隔3s。继续启动消费者看看效果,我们发现重试五次以后,就不再重试了。

三、结合实际案例来使用消息补偿机制

像上面那种情况出现的异常其实不管怎么重试都不会成功,实际上用到消息补偿的就是调用第三方接口的这种。

案例:生者往队列中扔一条消息,包含邮箱和发送内容。消费者拿到消息后将调用邮件接口发送邮件。有时候可能邮件接口由于网络等原因不通,这时候就需要去重试了。

在调用接口的工具类中,如果出现异常我们直接返回null,工具类具体代码就不贴了,如果返回null之后怎么处理呢?我们只需要抛出异常,rabbitListener捕获到异常后就会自动重试。

我们改造一下消费者代码:

代码语言:txt
AI代码解释
复制
@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) throws Exception {

        System.out.println("接收到的消息为" message);
        JSONObject jsonObject = JSONObject.parseObject(message);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email" email "&content=" content;
        // 如果发生异常则返回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
    }
}

当然我们可以自定义异常抛出。具体怎么试验呢,第一步启动生产者和消费者,这时候我们发现消费者在重试,第二步我们启动邮件服务,这时候我们会发现邮件发送成功了,消费者不再重试了

四、解决消息幂等性问题

一些刚接触java的同学可能对幂等性不太清楚。幂等性就是重复消费造成结果不一致。为了保证幂等性,因此消费者消费消息只能消费一次消息。我么可以是用全局的消息id来控制幂等性。当消息被消费了之后我们可以选择缓存保存这个消息id,然后当再次消费的时候,我们可以查询缓存,如果存在这个消息id,我们就不错处理直接return即可。先改造生产者代码,在消息中添加消息id:

代码语言:txt
AI代码解释
复制
@RequestMapping("/send")
    public void sendMessage(){

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email","11111111111");
        jsonObject.put("timestamp",System.currentTimeMillis());
        String json = jsonObject.toJSONString();
        System.out.println(json);

        	Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("UTF-8").setMessageId(UUID.randomUUID() "").build();
        amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,message);
    }

消费者代码改造:

代码语言:txt
AI代码解释
复制
@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(Message message) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收导的消息为:" msg "==消息id为:" messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email" email "&content=" content;
        // 如果发生异常则返回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
        jedis.set("messageId",messageId);
    }
}

我们在消费者端使用redis存储消息id,只做演示,具体项目请根据实际情况选择相应的工具进行存储。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
《RabbitMQ》如何保证消息不被重复消费
为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。
Java旅途
2020/08/05
2.7K0
【RabbitMQ】RabbitMQ应用
官方安装指南:https://www.rabbitmq.com/install-rpm.html
瑞新
2021/11/08
1.9K0
快速学习-RabbitMQ五种消息模型
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
cwl_java
2020/02/11
8850
rabbitmq系列(四)死信队列
当消息在一个队列中变成一个死信之后,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。具体原理如下图:
Java旅途
2020/06/25
4960
SpringBoot整合RabbitMQ
添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 添加配置 spring.rabbitmq.host=192.168.2.71 spring.rabbitmq.port=5672 spring.rabbitmq.username=light spring.rabbitmq.password=l
崔笑颜
2020/06/08
4950
RabbitMQ六种队列模式之路由模式
本文接着带大家伙了解RabbitMQ队列模式中的路由模式,其实只要看过我前面写的发布订阅模式的文章后,相信路由模式上手就非常 easy 了,唯一差距就是两个参数,exchange类型和 routingKey 。
黎明大大
2021/03/09
6470
乐优商城第十五天 rabbitmq
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
周杰伦本人
2022/10/25
4450
乐优商城第十五天 rabbitmq
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。
会洗碗的CV工程师
2024/04/30
4190
SpringBoot整合RabbitMQ
RabbitMQ架构原理及消息分发机制
在现代分布式系统中,消息队列是不可或缺的组件之一。它不仅能够解耦系统模块,还能实现异步通信和削峰填谷。在众多消息队列中,RabbitMQ 因其高并发、高可靠性和丰富的功能而备受青睐。本文将从 RabbitMQ 的基础概念、架构原理、消息分发机制、持久化与内存管理、插件管理、Java API 编程以及 Spring 集成等方面,全面解析 RabbitMQ 的核心技术和应用场景。
SmileNicky
2025/04/17
2640
RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ
RabbitMQ 作为一个消息中间件,整体上采用了生产者与消费者模型,主要负责接收,存储和转发消息。
求和小熊猫
2020/12/16
1.2K0
RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ
RabbitMQ详解(三)------RabbitMQ的五种队列
  上一篇博客我们介绍了RabbitMQ消息通信中的一些基本概念,这篇博客我们介绍 RabbitMQ 的五种工作模式,这也是实际使用RabbitMQ需要重点关注的。
IT可乐
2018/07/24
8410
RabbitMQ详解(三)------RabbitMQ的五种队列
rabbitMQ实现可靠消息投递 原
    RabbitMQ消息的可靠性主要包括两方面,一方面是通过实现消费的重试机制(通过@Retryable来实现重试,可以设置重试次数和重试频率,但是要保证幂等性),另一方面就是实现消息生产者的可靠投递(注意消费单幂等),下面主要讲下生产者实现的可靠消息投递。
chinotan
2019/04/03
7810
rabbitMQ实现可靠消息投递
                                                                            原
【Rabbitmq篇】RabbitMQ⾼级特性----消息确认
前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下
用户11369558
2024/11/20
9820
【Rabbitmq篇】RabbitMQ⾼级特性----消息确认
SpringBoot:RabbitMQ消息重复消费场景及解决方案
首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。
Freedom123
2024/03/29
1.2K0
SpringBoot:RabbitMQ消息重复消费场景及解决方案
RabbitMQ 快速入门实战
本文基于docker来安装RabbitMQ,通过pull当前最新版本rabbitmq:3.8.5-management即可,之后通过如下的命令即可运行:
技术路漫漫
2020/07/18
8760
SpringBoot系列之RabbitMQ可靠性投递实践教程
基于SpringBoot 2.2.1.RELEASE集成RabbitMQ的可靠性投递实践,以下是一个详细的测试例子,包括如何配置、发送消息、接收消息,并验证消息的可靠性投递。
SmileNicky
2025/05/02
1940
SpringBoot系列之RabbitMQ可靠性投递实践教程
RabbitMQ使用教程(超详细)
下载地址:http://www.rabbitmq.com/download.html
全栈程序员站长
2022/07/21
2.9K1
RabbitMQ使用教程(超详细)
RabbitMQ In JAVA 介绍及使用
  RabbitMQ是开源的消息中间件,它是轻量级的,支持多种消息传递协议,可以部署在分布式和联合配置中,以满足高级别、高可用性需求。并且可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发工具。(这里只介绍JAVA下的RabbitMQ的使用,感兴趣的可以查看官方文档:http://www.rabbitmq.com/getstarted.html);
2019/02/21
7640
RabbitMQ In JAVA 介绍及使用
RabbitMQ极速入门
消息中间件是目前比较流行的一个中间件,其中RabbitMQ更是占有一定的市场份额,主要用来做异步处理、应用解耦、流量削峰、日志处理等等方面。
sowhat1412
2020/11/05
1.1K0
RabbitMQ极速入门
SpringBoot整合rabbitMq
消息队列(Message Queue)简称mq,本文将介绍SpringBoot整合rabbitmq的功能使用
半月无霜
2023/03/03
6320
相关推荐
《RabbitMQ》如何保证消息不被重复消费
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档