首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SpringBoot整合RabbitMQ实现延迟消息

SpringBoot整合RabbitMQ实现延迟消息

作者头像
用户3587585
发布于 2023-08-10 01:05:17
发布于 2023-08-10 01:05:17
86800
代码可运行
举报
文章被收录于专栏:阿福谈Web编程阿福谈Web编程
运行总次数:0
代码可运行

引言

在上一篇文章一篇文章搞懂RabbitMQ 延迟消息中作者详细介绍了RabbitMq实现延迟消息队列的两种方式:

  • 使用 TTL 和 DLX实现 延迟消息;
  • 使用 RabbitMq 延迟消息插件实现延迟消息;

那么本文我们就来验证使用第一种方式实现延迟消息队列在超时订单取消中的应用。

用户从下单到订单超时取消过程需要完成以下业务

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

创建聚合项目

使用IDEA创建一个多模块聚合项目,聚合项目命名为messagepractices, 三个子模块分别为common、message-producer和message-consumer,三个pom文件内容如下:

messagepractices根项目 pom.xml

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath />
    </parent>
    <groupId>com.hsf.rabbitmq</groupId>
    <artifactId>message-practices</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>message-producer</module>
        <module>message-consumer</module>
        <module>common</module>
    </modules>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--fast json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <!--端点监控 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>
</project>

common子模块pom.xml

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>message-practices</artifactId>
        <groupId>com.hsf.rabbitmq</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>common</artifactId>

    <dependencies>
        <!--rabbitmq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <!--引入spring mvc的起步依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.1.5.RELEASE</version>
            </plugin>
        </plugins>
    </build>
</project>

在common子模块中引入spring-boot-starter-amqp、spring-boot-starter-logging、spring-boot-starter-web和spring-boot-starter-test等项目共用起步依赖。

message-producer子模块pom.xml

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <artifactId>message-producer</artifactId>
    <parent>
        <groupId>com.hsf.rabbitmq</groupId>
        <artifactId>message-practices</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>com.hsf.rabbitmq</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.1.5.RELEASE</version>
            </plugin>
        </plugins>
    </build>

</project>

message-producer子模块中的maven依赖中引入common子模块

message-consumer子模块pom.xml

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hsf.rabbitmq</groupId>
        <artifactId>message-practices</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>message-consumer</artifactId>
    <dependencies>
            <dependency>
                <groupId>com.hsf.rabbitmq</groupId>
                <artifactId>common</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.1.5.RELEASE</version>
            </plugin>
        </plugins>
    </build>
</project>

message-consumer子模的maven依赖中通用引入common子模块。

common子模块新建订单实体类与枚举

ProductOrder实体类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.common.pojo;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

public class ProductOrder implements Serializable {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");

    private String orderId;

    private String productId;

    private String productName;

    private String categoryId;

    private Double price = 0.0;

    private Integer count = 0;

    private String timestamp;


    public String getOrderId() {
        if(orderId==null || "".equals(orderId)){
            orderId = UUID.randomUUID().toString().replace("-","");
        }
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
   // 省略其他setter,getter方法
}

响应结果CommonResult类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.common.pojo;

import java.io.Serializable;

public class CommonResult implements Serializable {
    /**
     * 相应码
     */
    private Integer status = 200;

    /**
     * 响应消息
     */
    private String message = "success";

    /**
     * 数据
     */
    private Object data;

    public CommonResult() {
    }

    public CommonResult(Integer status, String message) {
        this.status = status;
        this.message = message;
    }

    public CommonResult(Integer status, String message, Object data) {
        this.status = status;
        this.message = message;
        this.data = data;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    public static  CommonResult success(Object data){
        CommonResult commonResult = new CommonResult();
        commonResult.setData(data);
        return commonResult;
    }

    public static CommonResult success(Object data, String message){
        CommonResult commonResult = new CommonResult();
        commonResult.setMessage(message);
        commonResult.setData(data);
        return commonResult;
    }

    public static CommonResult error(Integer status, String message){
        CommonResult commonResult = new CommonResult(status, message);
        return commonResult;
    }

}

消息队列枚举

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.common.enums;

public enum QueueEnum {

    /**
     * 消息通知队列
     */
    QUEUE_ORDER_CANCEL("mall.order.cancel", "mall.order.direct", "mall.order.cancel"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_ORDER_CANCEL("mall.order.cancel.ttl", "mall.order.direct.ttl", "mall.order.cancel.ttl"),

    ;

    /**
     * 队列名称
     */
    private String name;

    /**
     * 交换机名称
     */
    private String exchange;

    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String name, String exchange, String routeKey) {
        this.name = name;
        this.exchange = exchange;
        this.routeKey = routeKey;
    }

    public String getName() {
        return name;
    }

    public String getExchange() {
        return exchange;
    }

    public String getRouteKey() {
        return routeKey;
    }
}

rabbitMq管理界面操作

本地启动rabbitMq服务后,使用admin账户登录http://localhost:15672 , 添加mall用户和登录密码及/mall 虚拟主机

1)添加mall用户

鼠标选中Admin Tab页,在Add a user 菜单下的Username输入框中输入mall, password和confirm输入框中输入登录密码mall2023, 然后点击下面的Add user按钮完成添加mall用户;

2)添加/mall虚拟主机

右边的下拉框选中Virtual Hosts, Add a new virtual hosts 下面的 Name输入框中 /mall, 点击下面的 Add virtual host 完成添加 /mall虚拟空间

消息生产者编码

application.yml配置文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
server:
  port: 8081
  servlet:
    context-path: /messge-producer #上下文路径

management:
  endpoints:
    health:
      show-details: always
    web:
      base-path: /
      exposure:  #暴露端点
        include:
          - health
          - info
          - mappings
          - env
          - beans
spring:
  application:
    name: msgProducer #应用名

消息队列配置类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.producer.configuration;


import com.hsf.rabbitmq.common.enums.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    //注意这个ConnectionFactory类是org.springframework.amqp.rabbit包下的类,而不是com.rabbit.client包下的类
    @Bean
   public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("mall");
        connectionFactory.setPassword("mall2023");
        connectionFactory.setVirtualHost("/mall");
        connectionFactory.setPublisherReturns(false);
        connectionFactory.setChannelCacheSize(50);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setConnectionLimit(50);
        return  connectionFactory;
   }

   @Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
   }
    
    /**
     * 订单消息实际消费队列所绑定的交换机
     * @return
     */
    @Bean
    public DirectExchange orderDirect(){
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();

    }

    /**
     * 订单延迟队列所绑定的交换机
     * @return
     */
    @Bean
    public DirectExchange orderTtlDirect(){
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单实际消费队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /**
     * 订单延迟队列(死信队列)
     * @return
     */
    @Bean
    public Queue orderTtlQueue(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange());
        arguments.put("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
        Queue queue = new Queue(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName(), true, false, false, arguments);
        return queue;
    }

    /**
     * 将订单队列绑定到交换机
     * @param orderDirect
     * @param orderQueue
     * @return
     */
    @Bean
    public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
        return BindingBuilder.bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /**
     * 将订单延迟队列绑定到交换机
     * @param orderTtlDirect
     * @param orderTtlQueue
     * @return
     */
    @Bean
    public Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder.bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }
    
}

在Rabbitmq消息配置队列中我们配置了Rabbitmq的连接工厂类、RabbitTemplate、取消订单交换器、订单延迟队列绑定交换机、取消订单消息队列和订单延迟队列等实例的bean。

延迟发送消息组件类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.producer.configuration;

import com.hsf.rabbitmq.common.enums.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class CancelOrderSender {

    private static final Logger logger = LoggerFactory.getLogger(CancelOrderSender.class);

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMessage(String orderId, final long delayTime){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId,
                message -> {
                    //给消息设置延迟毫秒值
                 message.getMessageProperties().setExpiration(String.valueOf(delayTime));
                  return message;
                });
        logger.info("send delay message orderId={}", orderId);
    }

}

生成订单服务类

ProductOrderService

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.producer.service;

import com.hsf.rabbitmq.common.pojo.CommonResult;
import com.hsf.rabbitmq.common.pojo.ProductOrder;

public interface ProductOrderService {

    /**
     * 根据提交参数生成订单
     * @param order
     * @return
     */
    CommonResult generateOrder(ProductOrder order);
    
}

ProductOrderServiceImpl

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class ProductOrderServiceImpl implements ProductOrderService {

    private static final Logger logger = LoggerFactory.getLogger(ProductOrderServiceImpl.class);

    @Resource
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(ProductOrder order) {
        //todo 执行一系类下单操作,下单数据入库
        String orderId = order.getOrderId();
        logger.info("process order, orderId={}", orderId);
        // 下单完成后开启一个延迟消息,用于当用户没有付款时取消订单
        cancelOrderSender.sendMessage(orderId, 30*1000L);
        return CommonResult.success(null, "下单成功");
    }

}

为了快速查看延迟消息的处理结果,这里特意设置的延迟时间为30s。

生成订单控制器类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.producer.controller;

import com.hsf.rabbitmq.common.pojo.CommonResult;
import com.hsf.rabbitmq.common.pojo.ProductOrder;
import com.hsf.rabbitmq.message.producer.service.ProductOrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/order")
public class OrderController {

    @Resource
    private ProductOrderService orderService;

    @PostMapping(value = "/generateOrder")
    public CommonResult generateOrder(@RequestBody ProductOrder productOrder){
        return orderService.generateOrder(productOrder);
    }
}

消息生产者启动类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageProducerApplication {

    public  static void  main(String[] args){

        SpringApplication.run(MessageProducerApplication.class,args);
    }
}

消息消费者编码

application.yml配置文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
server:
  port: 8082
  servlet:
    context-path: /messge-consumer #上下文路径
spring:
  application:
    name: msgConsumer  #应用名称
management:
  endpoints:
    web:  #暴露的web端点
      path-mapping:
        health: healthcheck

RabbtMq配置类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.consumer.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    //注意这个ConnectionFactory类是org.springframework.amqp.rabbit包下的类,而不是com.rabbit.client包下的类
     @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("mall");
        connectionFactory.setPassword("mall2023");
        connectionFactory.setVirtualHost("/mall");
        connectionFactory.setPublisherReturns(false);
        connectionFactory.setChannelCacheSize(50);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setConnectionLimit(50);
        return  connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}

消息消费者的Rabbitmq配置类只需要配置RabbitMq客户端的连接工厂ConnectionFactoryRabbitTemplate两个实例bean即可。

取消订单服务类

CancelOrderService

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.consumer.service;

import com.hsf.rabbitmq.common.pojo.CommonResult;

public interface CancelOrderService {

    /**
     * 取消单个订单
     * @param orderId
     * @return
     */
    CommonResult cancelOrder(String orderId);

}

CancelOrderServiceImpl

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.consumer.service;

import com.hsf.rabbitmq.common.pojo.CommonResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class CancelOrderServiceImpl implements CancelOrderService {

    private static final Logger logger = LoggerFactory.getLogger(CancelOrderServiceImpl.class);
    @Override
    public CommonResult cancelOrder(String orderId) {
        // todo 取消订单,释放库存,扣除下单时奖励的积分
        logger.info("cancel order, orderId={}", orderId);
        return CommonResult.success(null, "取消订单成功");
    }
}

取消订单消息监听处理器

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.consumer.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class CancelOrderReceiver {

    private static Logger logger = LoggerFactory.getLogger(CancelOrderReceiver.class);

    @RabbitListener(queues = "mall.order.cancel")
    @RabbitHandler
    public void handle(String message) {
         try {
             logger.info("cancel order, orderId={}", message);
             // 根据orderId
         } catch (Exception e){
             logger.error("cancel order error", e);
         }
    }
}

注意:上面的@RabbitListener注解必须加在消息处理方法handle上,而不能加在类CancelOrderReceiver上,否者启动消息消费者服务时会一直报"method no match exception"异常。

消息消费者启动类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.hsf.rabbitmq.message.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageConsumerApplication {

    public static void main(String[] args){

        SpringApplication.run(MessageConsumerApplication.class,args);
    }
}

效果测试

先启动消息生产者服务,后启动消息消费者服务。生产者应用启动成功后可以看到rabbitmq管理界面有了mall.order.direct和mall.order.direct.ttl两个交换器与mall.order.cancel和mall.order.cancel.ttl两个消息队列

交换器界面:

消息队列界面:

然后在postman中调用生成订单接口

post http://localhost:8081/messge-producer/order/generateOrder

接口入参如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{
 "productId": "01productId01",
 "productName": "华为Mate50手机",
 "categoryId": "1001",
 "price": 5000,
 "count": 1
}

接口调用成功后可以在消息生产者应用控制台中看到发送订单的日志信息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
2023-07-08 22:41:33.730  INFO 10584 --- [nio-8081-exec-1] c.h.r.m.p.s.ProductOrderServiceImpl      : process order, orderId=20ae8a516b9545c58a285fbb9684197c
2023-07-08 22:41:33.740  INFO 10584 --- [nio-8081-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:5672
2023-07-08 22:41:33.780  INFO 10584 --- [nio-8081-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#1706e1:0/SimpleConnection@1876e75 [delegate=amqp://mall@127.0.0.1:5672//mall, localPort= 65176]
2023-07-08 22:41:33.897  INFO 10584 --- [nio-8081-exec-1] c.h.r.m.p.c.CancelOrderSender            : send delay message orderId=20ae8a516b9545c58a285fbb9684197c

经过30秒后可以在消息消费者应用控制台中看到监听取消订单队列处理取消订单的日志信息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
023-07-08 22:40:08.827  INFO 17884 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8082 (http) with context path '/messge-consumer'
2023-07-08 22:40:08.830  INFO 17884 --- [           main] c.h.r.m.c.MessageConsumerApplication     : Started MessageConsumerApplication in 3.444 seconds (JVM running for 6.797)
2023-07-08 22:42:04.196  INFO 17884 --- [ntContainer#0-1] c.h.r.m.c.rabbitmq.CancelOrderReceiver   : cancel order, orderId=20ae8a516b9545c58a285fbb9684197c

由此,验证了我们通过TTL+DLX方式实现的延迟消息队列达到了在过期时间后取消未支付的订单功能。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 阿福谈Web编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
用Kotlin的方式来处理网络异常
之前的文章 RxJava处理业务异常的几种方式 曾经介绍过 Retrofit 的异常可以有多种处理方式。
fengzhizi715
2018/08/24
5860
用Kotlin的方式来处理网络异常
RxJava的Single、Completable以及Maybe
通常情况下,如果我们想要使用 RxJava 首先会想到的是使用Observable,如果要考虑到Backpressure的情况,在 RxJava2.x 时代我们会使用Flowable。除了Observable和Flowable之外,在 RxJava2.x 中还有三种类型的Observables:Single、Completable、Maybe。
fengzhizi715
2018/08/24
2.7K0
RxJava的Single、Completable以及Maybe
Transformer 在RxJava中的使用
Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer,在2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。由于 RxJava2 将Observable拆分成 Observable 和 Flowable,所以多了一个FlowableTransformer。同时,Maybe是 RxJava2 新增的一个类型,所以多了MaybeTransformer。
fengzhizi715
2018/08/24
8.7K0
Transformer 在RxJava中的使用
RxJava之异常捕获操作符介绍
RxJava 之 异常捕获操作符 官方介绍 :Error Handling Operators
103style
2022/12/19
7090
All RxJava - 为Retrofit添加重试
在我们的日常开发中离不开I/O操作,尤其是网络请求,但并不是所有的请求都是可信赖的,因此我们必须为APP添加请求重试功能。
小鄧子
2018/08/20
1.8K0
All RxJava - 为Retrofit添加重试
鸿蒙 MVP+ Rxjava+Retrofit+okhttp 实现教程【鸿蒙专题7】
大家好。我是坚果,这是我的公众号“坚果前端”,觉得不错的话,关注一下吧,如果你迷惘,不妨看看码农的轨迹
徐建国
2021/12/24
9180
鸿蒙  MVP+ Rxjava+Retrofit+okhttp 实现教程【鸿蒙专题7】
大佬们,一波RxJava 3.0来袭,请做好准备~
每个Android开发者,都是爱RxJava的,简洁线程切换和多网络请求合并,再配合Retrofit,简直是APP开发的福音。不知不觉,RxJava一路走来,已经更新到第三大版本了。不像RxJava 2对RxJava 1那么残忍,RxJava 3对RxJava 2的兼容性还是挺好的,目前并没有做出很大的更改。RxJava2到2020年12月31号不再提供支持,错误时同时在2.x和3.x修复,但新功能只会在3.x上添加。
Rouse
2019/07/17
2K0
大佬们,一波RxJava 3.0来袭,请做好准备~
Android RxJava的使用
首语 最近因为项目上线,挤不出时间,已经好久没有更新博客了😛,目前项目也做差不多了,写几篇总结类型的博客,梳理一下。 本文主要对RxJava及常用操作符的使用进行总结,同时对RxJava在Android中几种常见的使用场景进行举例。 简介 RxJava是Reactive Extensions的Java VM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。 Rx是Reactive Extensions的缩写的简写,它是一个使用可观察数据流进行异步编程的编程接口,Rx结合了观察者模式、迭代器模
八归少年
2022/06/29
3.2K0
Android RxJava的使用
Android RxJava+Retrofit完美封装(缓存,请求,生命周期管理)
Retrofit 和RxJava已经出来很久了,很多前辈写了很多不错的文章,在此不得不感谢这些前辈无私奉献的开源精神,能让我们站在巨人的肩膀上望得更远。对于 RxJava 不是很了解的同学推荐你们看扔物线大神的这篇文章给 Android 开发者的 RxJava 详解一遍看不懂就看第二遍。Retrofit的使用可以 加QQ群:668041364
java爱好者
2019/06/28
3.7K0
【建议收藏】Android实现Rxjava2+Retrofit完美封装
去年的时候学习了Rxjava和Retrofit的基本用法,但一直没有在实际项目中运用。今年开做新项目,果断在新项目中引入了RxJava和Retrofit。本篇文章将介绍笔者在项目中对Retrofit的封装。 先来看一下封装过后的Retrofit如何使用。
分你一些日落
2021/11/30
2.3K1
【译】对RxJava中-repeatWhen()和-retryWhen()操作符的思考
第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了。不得不说,它们绝对是“最令人困惑弹珠图”的有力角逐者。
用户1740424
2018/07/23
2.2K0
【译】对RxJava中-repeatWhen()和-retryWhen()操作符的思考
RxJava 2.0还没熟悉,RxJava 3.0说来就来了!(多种操作符代码详解篇)
在上篇文章中讲的是关于Rxjava的基础篇,今天来讲讲多种操作符的具体内容,操作符太多了,大家准备好啊,耐心看~
Android技术干货分享
2019/07/19
2.3K0
RxJava 2.0还没熟悉,RxJava 3.0说来就来了!(多种操作符代码详解篇)
Android RxJava操作符详解 系列:功能性操作符
上述所有的Demo源代码都存放在:Carson_Ho的Github地址:RxJava2_功能性操作符
Carson.Ho
2019/02/22
1.2K0
RxJava 创建操作符
内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。
三流之路
2018/09/11
1K0
Rx Java 异步编程框架
在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。
架构探险之道
2023/03/04
3.4K0
Rx Java 异步编程框架
RxJava1 升级到 RxJava2 所踩过的坑
RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 RxJava1 是不能同时存在于一个项目中的,逼不得已我得把自己所有框架中使用 RxJava 的地方以及 App 中使用 RxJava 的地方都升级到最新版本。所以我整理并记录了一些已经填好的坑。
fengzhizi715
2018/08/24
1.5K0
RxJava1 升级到 RxJava2 所踩过的坑
Reactor详解之:异常处理
不管是在响应式编程还是普通的程序设计中,异常处理都是一个非常重要的方面。今天将会给大家介绍Reactor中异常的处理流程。
程序那些事
2020/11/17
2.3K0
Android:RxJava 结合 Retrofit 全面实现 网络请求出错重连
前言 Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。 如果还不了解RxJava,请看文章:Android:这是一篇 清晰 & 易懂的R
Carson.Ho
2019/02/22
1.9K0
Android RxJava应用:网络请求出错重连(结合Retrofit)
Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。
Carson.Ho
2022/03/25
1.6K0
Android RxJava应用:网络请求出错重连(结合Retrofit)
Android OkHttp+Retrofit+RxJava搭建网络访问框架(含源码)
  在实际开发APP中,网络访问是必不可少的,最开始访问网络是使用HttpURLConnection、而后面有了一些框架比如Volley、OkHttp、Retrofit等。那么你可能看到最多的是OkHttp,因为它很出名,Google也推荐你使用此框架进行网络访问。你可能会说Retrofit,Retrofit其实就是对OkHttp的二次封装。还有RxJava,这个又是用来干嘛的呢?为什么要将三者组合起来,组合有什么优势吗?带着这些问题看下去。
晨曦_LLW
2022/05/10
4.1K3
Android OkHttp+Retrofit+RxJava搭建网络访问框架(含源码)
推荐阅读
相关推荐
用Kotlin的方式来处理网络异常
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档