引言
在上一篇文章一篇文章搞懂RabbitMQ 延迟消息中作者详细介绍了RabbitMq实现延迟消息队列的两种方式:
那么本文我们就来验证使用第一种方式实现延迟消息队列在超时订单取消中的应用。
用户从下单到订单超时取消过程需要完成以下业务
使用IDEA创建一个多模块聚合项目,聚合项目命名为messagepractices, 三个子模块分别为common、message-producer和message-consumer,三个pom文件内容如下:
messagepractices根项目 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath />
</parent>
<groupId>com.hsf.rabbitmq</groupId>
<artifactId>message-practices</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>message-producer</module>
<module>message-consumer</module>
<module>common</module>
</modules>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--fast json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<!--端点监控 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
</project>
common子模块pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>message-practices</artifactId>
<groupId>com.hsf.rabbitmq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>common</artifactId>
<dependencies>
<!--rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!--引入spring mvc的起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.5.RELEASE</version>
</plugin>
</plugins>
</build>
</project>
在common子模块中引入spring-boot-starter-amqp、spring-boot-starter-logging、spring-boot-starter-web和spring-boot-starter-test等项目共用起步依赖。
message-producer子模块pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>message-producer</artifactId>
<parent>
<groupId>com.hsf.rabbitmq</groupId>
<artifactId>message-practices</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.hsf.rabbitmq</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.5.RELEASE</version>
</plugin>
</plugins>
</build>
</project>
message-producer子模块中的maven依赖中引入common子模块
message-consumer子模块pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hsf.rabbitmq</groupId>
<artifactId>message-practices</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>message-consumer</artifactId>
<dependencies>
<dependency>
<groupId>com.hsf.rabbitmq</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.5.RELEASE</version>
</plugin>
</plugins>
</build>
</project>
message-consumer子模的maven依赖中通用引入common子模块。
ProductOrder实体类
package com.hsf.rabbitmq.common.pojo;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
public class ProductOrder implements Serializable {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");
private String orderId;
private String productId;
private String productName;
private String categoryId;
private Double price = 0.0;
private Integer count = 0;
private String timestamp;
public String getOrderId() {
if(orderId==null || "".equals(orderId)){
orderId = UUID.randomUUID().toString().replace("-","");
}
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
// 省略其他setter,getter方法
}
响应结果CommonResult类
package com.hsf.rabbitmq.common.pojo;
import java.io.Serializable;
public class CommonResult implements Serializable {
/**
* 相应码
*/
private Integer status = 200;
/**
* 响应消息
*/
private String message = "success";
/**
* 数据
*/
private Object data;
public CommonResult() {
}
public CommonResult(Integer status, String message) {
this.status = status;
this.message = message;
}
public CommonResult(Integer status, String message, Object data) {
this.status = status;
this.message = message;
this.data = data;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
public static CommonResult success(Object data){
CommonResult commonResult = new CommonResult();
commonResult.setData(data);
return commonResult;
}
public static CommonResult success(Object data, String message){
CommonResult commonResult = new CommonResult();
commonResult.setMessage(message);
commonResult.setData(data);
return commonResult;
}
public static CommonResult error(Integer status, String message){
CommonResult commonResult = new CommonResult(status, message);
return commonResult;
}
}
消息队列枚举
package com.hsf.rabbitmq.common.enums;
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.cancel", "mall.order.direct", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.cancel.ttl", "mall.order.direct.ttl", "mall.order.cancel.ttl"),
;
/**
* 队列名称
*/
private String name;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String name, String exchange, String routeKey) {
this.name = name;
this.exchange = exchange;
this.routeKey = routeKey;
}
public String getName() {
return name;
}
public String getExchange() {
return exchange;
}
public String getRouteKey() {
return routeKey;
}
}
本地启动rabbitMq服务后,使用admin账户登录http://localhost:15672 , 添加mall用户和登录密码及/mall 虚拟主机:
1)添加mall用户
鼠标选中Admin Tab页,在Add a user 菜单下的Username输入框中输入mall, password和confirm输入框中输入登录密码mall2023, 然后点击下面的Add user按钮完成添加mall用户;
2)添加/mall虚拟主机
右边的下拉框选中Virtual Hosts, Add a new virtual hosts 下面的 Name输入框中 /mall, 点击下面的 Add virtual host 完成添加 /mall虚拟空间。
application.yml
配置文件
server:
port: 8081
servlet:
context-path: /messge-producer #上下文路径
management:
endpoints:
health:
show-details: always
web:
base-path: /
exposure: #暴露端点
include:
- health
- info
- mappings
- env
- beans
spring:
application:
name: msgProducer #应用名
消息队列配置类
package com.hsf.rabbitmq.message.producer.configuration;
import com.hsf.rabbitmq.common.enums.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
//注意这个ConnectionFactory类是org.springframework.amqp.rabbit包下的类,而不是com.rabbit.client包下的类
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("mall");
connectionFactory.setPassword("mall2023");
connectionFactory.setVirtualHost("/mall");
connectionFactory.setPublisherReturns(false);
connectionFactory.setChannelCacheSize(50);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setConnectionLimit(50);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
/**
* 订单消息实际消费队列所绑定的交换机
* @return
*/
@Bean
public DirectExchange orderDirect(){
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单延迟队列所绑定的交换机
* @return
*/
@Bean
public DirectExchange orderTtlDirect(){
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
* @return
*/
@Bean
public Queue orderQueue(){
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列(死信队列)
* @return
*/
@Bean
public Queue orderTtlQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange());
arguments.put("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
Queue queue = new Queue(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName(), true, false, false, arguments);
return queue;
}
/**
* 将订单队列绑定到交换机
* @param orderDirect
* @param orderQueue
* @return
*/
@Bean
public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
return BindingBuilder.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
* @param orderTtlDirect
* @param orderTtlQueue
* @return
*/
@Bean
public Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}
在Rabbitmq消息配置队列中我们配置了Rabbitmq的连接工厂类、RabbitTemplate、取消订单交换器、订单延迟队列绑定交换机、取消订单消息队列和订单延迟队列等实例的bean。
延迟发送消息组件类
package com.hsf.rabbitmq.message.producer.configuration;
import com.hsf.rabbitmq.common.enums.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class CancelOrderSender {
private static final Logger logger = LoggerFactory.getLogger(CancelOrderSender.class);
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(String orderId, final long delayTime){
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId,
message -> {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTime));
return message;
});
logger.info("send delay message orderId={}", orderId);
}
}
生成订单服务类
ProductOrderService
package com.hsf.rabbitmq.message.producer.service;
import com.hsf.rabbitmq.common.pojo.CommonResult;
import com.hsf.rabbitmq.common.pojo.ProductOrder;
public interface ProductOrderService {
/**
* 根据提交参数生成订单
* @param order
* @return
*/
CommonResult generateOrder(ProductOrder order);
}
ProductOrderServiceImpl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class ProductOrderServiceImpl implements ProductOrderService {
private static final Logger logger = LoggerFactory.getLogger(ProductOrderServiceImpl.class);
@Resource
private CancelOrderSender cancelOrderSender;
@Override
public CommonResult generateOrder(ProductOrder order) {
//todo 执行一系类下单操作,下单数据入库
String orderId = order.getOrderId();
logger.info("process order, orderId={}", orderId);
// 下单完成后开启一个延迟消息,用于当用户没有付款时取消订单
cancelOrderSender.sendMessage(orderId, 30*1000L);
return CommonResult.success(null, "下单成功");
}
}
为了快速查看延迟消息的处理结果,这里特意设置的延迟时间为30s。
生成订单控制器类
package com.hsf.rabbitmq.message.producer.controller;
import com.hsf.rabbitmq.common.pojo.CommonResult;
import com.hsf.rabbitmq.common.pojo.ProductOrder;
import com.hsf.rabbitmq.message.producer.service.ProductOrderService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private ProductOrderService orderService;
@PostMapping(value = "/generateOrder")
public CommonResult generateOrder(@RequestBody ProductOrder productOrder){
return orderService.generateOrder(productOrder);
}
}
消息生产者启动类
package com.hsf.rabbitmq.message.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MessageProducerApplication {
public static void main(String[] args){
SpringApplication.run(MessageProducerApplication.class,args);
}
}
application.yml
配置文件
server:
port: 8082
servlet:
context-path: /messge-consumer #上下文路径
spring:
application:
name: msgConsumer #应用名称
management:
endpoints:
web: #暴露的web端点
path-mapping:
health: healthcheck
RabbtMq配置类
package com.hsf.rabbitmq.message.consumer.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//注意这个ConnectionFactory类是org.springframework.amqp.rabbit包下的类,而不是com.rabbit.client包下的类
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("mall");
connectionFactory.setPassword("mall2023");
connectionFactory.setVirtualHost("/mall");
connectionFactory.setPublisherReturns(false);
connectionFactory.setChannelCacheSize(50);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setConnectionLimit(50);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
消息消费者的Rabbitmq配置类只需要配置RabbitMq客户端的连接工厂ConnectionFactory
和RabbitTemplate
两个实例bean即可。
取消订单服务类
CancelOrderService
package com.hsf.rabbitmq.message.consumer.service;
import com.hsf.rabbitmq.common.pojo.CommonResult;
public interface CancelOrderService {
/**
* 取消单个订单
* @param orderId
* @return
*/
CommonResult cancelOrder(String orderId);
}
CancelOrderServiceImpl
package com.hsf.rabbitmq.message.consumer.service;
import com.hsf.rabbitmq.common.pojo.CommonResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class CancelOrderServiceImpl implements CancelOrderService {
private static final Logger logger = LoggerFactory.getLogger(CancelOrderServiceImpl.class);
@Override
public CommonResult cancelOrder(String orderId) {
// todo 取消订单,释放库存,扣除下单时奖励的积分
logger.info("cancel order, orderId={}", orderId);
return CommonResult.success(null, "取消订单成功");
}
}
取消订单消息监听处理器
package com.hsf.rabbitmq.message.consumer.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class CancelOrderReceiver {
private static Logger logger = LoggerFactory.getLogger(CancelOrderReceiver.class);
@RabbitListener(queues = "mall.order.cancel")
@RabbitHandler
public void handle(String message) {
try {
logger.info("cancel order, orderId={}", message);
// 根据orderId
} catch (Exception e){
logger.error("cancel order error", e);
}
}
}
注意:上面的@RabbitListener注解必须加在消息处理方法handle上,而不能加在类CancelOrderReceiver上,否者启动消息消费者服务时会一直报"method no match exception"异常。
消息消费者启动类
package com.hsf.rabbitmq.message.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MessageConsumerApplication {
public static void main(String[] args){
SpringApplication.run(MessageConsumerApplication.class,args);
}
}
先启动消息生产者服务,后启动消息消费者服务。生产者应用启动成功后可以看到rabbitmq管理界面有了mall.order.direct和mall.order.direct.ttl两个交换器与mall.order.cancel和mall.order.cancel.ttl两个消息队列
交换器界面:
消息队列界面:
然后在postman中调用生成订单接口
post http://localhost:8081/messge-producer/order/generateOrder
接口入参如下:
{
"productId": "01productId01",
"productName": "华为Mate50手机",
"categoryId": "1001",
"price": 5000,
"count": 1
}
接口调用成功后可以在消息生产者应用控制台中看到发送订单的日志信息
2023-07-08 22:41:33.730 INFO 10584 --- [nio-8081-exec-1] c.h.r.m.p.s.ProductOrderServiceImpl : process order, orderId=20ae8a516b9545c58a285fbb9684197c
2023-07-08 22:41:33.740 INFO 10584 --- [nio-8081-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672
2023-07-08 22:41:33.780 INFO 10584 --- [nio-8081-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#1706e1:0/SimpleConnection@1876e75 [delegate=amqp://mall@127.0.0.1:5672//mall, localPort= 65176]
2023-07-08 22:41:33.897 INFO 10584 --- [nio-8081-exec-1] c.h.r.m.p.c.CancelOrderSender : send delay message orderId=20ae8a516b9545c58a285fbb9684197c
经过30秒后可以在消息消费者应用控制台中看到监听取消订单队列处理取消订单的日志信息:
023-07-08 22:40:08.827 INFO 17884 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8082 (http) with context path '/messge-consumer'
2023-07-08 22:40:08.830 INFO 17884 --- [ main] c.h.r.m.c.MessageConsumerApplication : Started MessageConsumerApplication in 3.444 seconds (JVM running for 6.797)
2023-07-08 22:42:04.196 INFO 17884 --- [ntContainer#0-1] c.h.r.m.c.rabbitmq.CancelOrderReceiver : cancel order, orderId=20ae8a516b9545c58a285fbb9684197c
由此,验证了我们通过TTL+DLX方式实现的延迟消息队列达到了在过期时间后取消未支付的订单功能。