在以前,我写过一篇如何使用SpringBoot
整合rabbitMq
的文章。
SpringBoot整合rabbitMq | 半月无霜 (banmoon.top)
上面这种方法,是自己创建队列,交换机,绑定。生成Bean
,从而实现队列等等的创建。
这种方式太过于繁琐,有没有一种方法可以快速创建呢,我们只管使用就行了
还真的有,只需要在配置文件中配置队列、交换机等信息,就可以在服务启动的时候自动创建并绑定。
一次偶然间,在csdn
上看到了,动态创建rabbitMq
队列的文章。
拉出来魔改了一下,只要再配置文件中配置了相关的实现,实现了队列、交换机的绑定。
同时还解决了,多个开发连接同一个rabbitMq
,导致自己生产的消息,被其他同事消费走的问题。
这是RabbitModuleInfoProperties.java
,读取配置文件中的信息,生成信息对象
package com.banmoon.config.properties;
import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import lombok.Data;
import java.util.List;
import java.util.Map;
@Data
public class RabbitModuleInfoProperties {
/**
* 路由key
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 多个队列
*/
private List<Queue> queues;
/**
* 交换机信息
*/
private Exchange exchange;
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化,默认true持久化,重启消息不会丢失
*/
private boolean durable = true;
/**
* 是否具有排他性,默认false,可多个消费者消费同一个队列
*/
private boolean exclusive = false;
/**
* 当消费者均断开连接,是否自动删除队列,默认false,不自动删除,避免消费者断开队列丢弃消息
*/
private boolean autoDelete = false;
/**
* 绑定死信队列的队列名称
*/
private String deadLetterQueue;
/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;
/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;
/**
* 其他属性设置
*/
private Map<String, Object> arguments;
}
@Data
public static class Exchange {
/**
* 交换机类型,默认直连交换机
*/
private String type = RabbitExchangeTypeEnum.DIRECT.getCode();
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化,默认true持久化,重启消息不会丢失
*/
private boolean durable = true;
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/
private boolean autoDelete = false;
/**
* 是否为txl延迟交换机
*/
private boolean txlDelay = false;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
/**
* 头部交换机的参数
*/
private Map<String, Object> headersMap;
/**
* 头部交换机的参数匹配类型,默认是所有参数都要匹配
*/
private String headersType = HeadersTypeEnum.ALL.getCode();
}
}
这是RabbitModuleProperties.java
,上面有多个绑定配置
package com.banmoon.config.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {
private List<RabbitModuleInfoProperties> modules;
}
这个是配置,请注意交换机,队列前缀,这个就是保证不同开发之间消息隔离的关键
spring:
rabbitmq:
host: rabbitMq服务地址
port: rabbitMq服务端口
username: 帐号
password: 密码
virtual-host: /
# 确认消息是否发送至交换机
publisher-confirm-type: correlated
publisher-confirms: true
# 确认消息是否发送至队列
publisher-returns: true
# 交换机,队列前缀
prefix: whc
modules:
- routingKey: test.direct.routingKey
queue:
name: test.direct.queue
exchange:
name: test.direct.exchange
- routingKey: test.fanout.router.key
queues:
- name: test.fanout.queue.a
- name: test.fanout.queue.b
- name: test.fanout.queue.c
exchange:
name: test.fanout.exchange
type: fanout
- routingKey: test.topic.routerKey.#
queue:
name: test.topic.queue.log
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.topic.routerKey.text
queue:
name: test.topic.queue.text
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.topic.routerKey.image
queue:
name: test.topic.queue.image
exchange:
name: test.topic.exchange
type: topic
- routingKey: test.headers.routerKey
queue:
name: test.headers.queue
exchange:
name: test.headers.exchange
type: headers
headers-map:
authentication: "半月无霜"
- routingKey: test.ttl.routerKey
queue:
name: test.ttl.queue
deadLetterQueue: test.ttl.death.queue
deadLetterExchange: test.ttl.death.exchange
deadLetterRoutingKey: test.ttl.death.routerKey
arguments:
x-message-ttl: 5000
exchange:
name: test.ttl.exchange
- routingKey: test.txl.routerKey
queue:
name: test.txl.queue
exchange:
name: test.txl.exchange
txl-delay: true
RabbitmqConfig.java
;这是一个配置类,主要得到了AmqpAdmin
对象、RabbitModuleProperties
对象、以及定义的前缀
package com.banmoon.config;
import com.banmoon.config.init.RabbitModuleInitializer;
import com.banmoon.config.properties.RabbitModuleProperties;
import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitmqConfig {
@Value(RabbitmqConstant.RABBITMQ_PREFIX)
private String rabbitPrefix;
@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
return new RabbitModuleInitializer(amqpAdmin, rabbitPrefix, rabbitModuleProperties.getModules());
}
}
RabbitModuleInitializer.java
,初始化类,主要声明队列、交换机,以及绑定都在其中
package com.banmoon.config.init;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.config.properties.RabbitModuleInfoProperties;
import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import com.banmoon.utils.stream.StreamUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Slf4j
@AllArgsConstructor
public class RabbitModuleInitializer implements SmartInitializingSingleton {
private AmqpAdmin amqpAdmin;
private String rabbitmqPrefix;
private List<RabbitModuleInfoProperties> modules;
@Override
public void afterSingletonsInstantiated() {
log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
declareRabbitModule();
}
/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/
private void declareRabbitModule() {
if (CollUtil.isEmpty(modules)) {
return;
}
for (RabbitModuleInfoProperties rabbitModuleInfo : modules) {
// 配置参数校验
configParamValidate(rabbitModuleInfo);
// 队列
List<Queue> queues = convertQueue(rabbitModuleInfo.getQueues(), rabbitModuleInfo.getQueue());
// 交换机
RabbitModuleInfoProperties.Exchange exchangeInfo = rabbitModuleInfo.getExchange();
Exchange exchange = convertExchange(exchangeInfo);
// 绑定关系
String routingKey = rabbitmqPrefix + rabbitModuleInfo.getRoutingKey();
// 创建队列
queues.forEach(amqpAdmin::declareQueue);
// 创建交换机
amqpAdmin.declareExchange(exchange);
// 队列 绑定 交换机
queues.forEach(queue -> {
Binding binding;
if (RabbitExchangeTypeEnum.HEADERS.getCode().equals(exchange.getType())) {
HeadersExchange headersExchange = (HeadersExchange) exchange;
if (HeadersTypeEnum.ALL.getCode().equals(exchangeInfo.getHeadersType())) {
binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(exchangeInfo.getHeadersMap()).match();
} else {
binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(exchangeInfo.getHeadersMap()).match();
}
} else {
binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).and(null);
}
amqpAdmin.declareBinding(binding);
});
}
}
/**
* RabbitMQ动态配置参数校验
*/
public void configParamValidate(RabbitModuleInfoProperties rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
Assert.isTrue(Objects.nonNull(rabbitModuleInfo.getQueue()) || CollUtil.isNotEmpty(rabbitModuleInfo.getQueues()), "routingKey:{}未配置queue", routingKey);
}
public List<Queue> convertQueue(List<RabbitModuleInfoProperties.Queue> queues, RabbitModuleInfoProperties.Queue queueInfo) {
if (CollUtil.isNotEmpty(queues)) {
if (Objects.nonNull(queueInfo)) {
queues.add(queueInfo);
}
return StreamUtil.listToList(queues, this::convertQueue);
}
Queue queue = convertQueue(queueInfo);
return CollUtil.newArrayList(queue);
}
/**
* 转换生成RabbitMQ队列
*/
public Queue convertQueue(RabbitModuleInfoProperties.Queue queue) {
String name = rabbitmqPrefix + queue.getName();
Map<String, Object> arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null && arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 设置队列的优先级
if (arguments != null && arguments.containsKey("x-max-priority")) {
arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
}
// 是否需要绑定死信队列
String deadLetterQueue = queue.getDeadLetterQueue();
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterQueue) && StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap<>();
}
deadLetterQueue = rabbitmqPrefix + deadLetterQueue;
deadLetterExchange = rabbitmqPrefix + deadLetterExchange;
deadLetterRoutingKey = rabbitmqPrefix + deadLetterRoutingKey;
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
// 绑定死新队列
Queue deadQueue = new Queue(deadLetterQueue, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
amqpAdmin.declareQueue(deadQueue);
Exchange deadExchange = new DirectExchange(deadLetterExchange, true, true, null);
amqpAdmin.declareExchange(deadExchange);
Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange).with(deadLetterRoutingKey).and(null);
amqpAdmin.declareBinding(binding);
}
return new Queue(name, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 转换生成RabbitMQ交换机
*/
public Exchange convertExchange(RabbitModuleInfoProperties.Exchange exchange) {
String type = exchange.getType();
boolean txlDelay = exchange.isTxlDelay();
String exchangeName = rabbitmqPrefix + exchange.getName();
boolean isDurable = exchange.isDurable();
boolean isAutoDelete = exchange.isAutoDelete();
Map<String, Object> arguments = exchange.getArguments();
if (txlDelay) {
return RabbitExchangeTypeEnum.getTxlDelayExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
}
return RabbitExchangeTypeEnum.getExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
}
}
这是一个常量类,里面记录着相关的队列名称,主要是给生产者、消费者使用的。太杂乱了不好打理,故专门弄了一个常量类来进行管理
package com.banmoon.constant;
/**
* 记录rabbitmq相关的队列,交换机,路由KEY名称
*
* @author banmoon
* @date 2024/02/27 12:22:13
*/
public interface RabbitmqConstant {
/**
* 定义的前缀
*/
String RABBITMQ_PREFIX = "#{'${spring.rabbitmq.prefix:}'.empty ? '' : '${spring.rabbitmq.prefix:}' + '.'}";
/**
* 直连测试队列
*/
String DIRECT_TEST_QUEUE = RABBITMQ_PREFIX + "test.direct.queue";
String DIRECT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.direct.exchange";
String DIRECT_TEST_ROUTING_KEY = RABBITMQ_PREFIX + "test.direct.routingKey";
/**
* 扇形测试队列
*/
String FANOUT_TEST_QUEUE_A = RABBITMQ_PREFIX + "test.fanout.queue.a";
String FANOUT_TEST_QUEUE_B = RABBITMQ_PREFIX + "test.fanout.queue.b";
String FANOUT_TEST_QUEUE_C = RABBITMQ_PREFIX + "test.fanout.queue.c";
String FANOUT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.fanout.exchange";
String FANOUT_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.fanout.routerKey";
/**
* 主题测试队列
*/
String TOPIC_TEST_QUEUE_LOG = RABBITMQ_PREFIX + "test.topic.queue.log";
String TOPIC_TEST_QUEUE_TEXT = RABBITMQ_PREFIX + "test.topic.queue.text";
String TOPIC_TEST_QUEUE_IMAGE = RABBITMQ_PREFIX + "test.topic.queue.image";
String TOPIC_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.topic.exchange";
String TOPIC_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.topic.routerKey.#";
String TOPIC_TEST_ROUTER_KEY_TEXT = RABBITMQ_PREFIX + "test.topic.routerKey.text";
String TOPIC_TEST_ROUTER_KEY_IMAGE = RABBITMQ_PREFIX + "test.topic.routerKey.image";
/**
* 头部测试队列
*/
String HEADERS_TEST_QUEUE = RABBITMQ_PREFIX + "test.headers.queue";
String HEADERS_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.headers.exchange";
String HEADERS_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.headers.routerKey";
/**
* TTL测试队列
*/
String TTL_TEST_QUEUE = RABBITMQ_PREFIX + "test.ttl.queue";
String TTL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.exchange";
String TTL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.routerKey";
String TTL_TEST_DEATH_QUEUE = RABBITMQ_PREFIX + "test.ttl.death.queue";
String TTL_TEST_DEATH_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.death.exchange";
String TTL_TEST_DEATH_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.death.routerKey";
/**
* TXL测试队列
*/
String TXL_TEST_QUEUE = RABBITMQ_PREFIX + "test.txl.queue";
String TXL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.txl.exchange";
String TXL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.txl.routerKey";
}
在上面的创建中,我们用到了两个枚举类,没什么可说的,直接贴出来
package com.banmoon.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author banmoon
* @date 2024/03/04 16:35:27
*/
@Getter
@AllArgsConstructor
public enum HeadersTypeEnum {
ANY("any", "任一"),
ALL("all", "所有"),
;
private final String code;
private final String msg;
}
package com.banmoon.enums;
import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.amqp.core.*;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@Getter
@AllArgsConstructor
public enum RabbitExchangeTypeEnum {
DIRECT("direct", "直连交换机"),
FANOUT("fanout", "扇形交换机"),
TOPIC("topic", "主题交换机"),
HEADERS("headers", "头部交换机"),
;
private final String code;
private final String msg;
public static RabbitExchangeTypeEnum getByCode(String code) {
return getByCode(code, null);
}
public static RabbitExchangeTypeEnum getByCode(String code, RabbitExchangeTypeEnum defaultEnum) {
return Arrays.stream(values()).filter(e -> e.getCode().equalsIgnoreCase(code)).findFirst().orElse(defaultEnum);
}
public static Exchange getExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
AbstractExchange exchange = null;
switch (RabbitExchangeTypeEnum.getByCode(type)) {
case DIRECT:
exchange = new DirectExchange(exchangeName, durable, autoDelete, arguments);
break;
case TOPIC:
exchange = new TopicExchange(exchangeName, durable, autoDelete, arguments);
break;
case FANOUT:
exchange = new FanoutExchange(exchangeName, durable, autoDelete, arguments);
break;
case HEADERS:
exchange = new HeadersExchange(exchangeName, durable, autoDelete, arguments);
break;
}
return exchange;
}
public static Exchange getTxlDelayExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
RabbitExchangeTypeEnum typeEnum = RabbitExchangeTypeEnum.getByCode(type);
Map<String, Object> argMap = Optional.ofNullable(arguments).orElse(MapUtil.newHashMap(2));
argMap.put("x-delayed-type", typeEnum.getCode());
return new CustomExchange(exchangeName, "x-delayed-message", durable, autoDelete, argMap);
}
}
这是一个生产者抽象类,我自己写的生产者都需要继承它
package com.banmoon.queues;
import cn.hutool.extra.spring.SpringUtil;
import com.banmoon.utils.JsonUtil;
import lombok.Data;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import java.util.HashMap;
import java.util.Map;
/**
* 基础的生产者
*
* @author banmoon
* @date 2024/02/28 11:44:28
*/
@Data
public abstract class AbstractProducer {
private AmqpTemplate amqpTemplate;
private String queueName;
private String exchangeName;
private String routingKey;
public AbstractProducer(AmqpTemplate amqpTemplate, String queueName, String exchangeName, String routingKey) {
this.amqpTemplate = amqpTemplate;
// TODO: 2024/3/2 这边还要进行修改
this.queueName = SpringUtil.getProperty(queueName);
this.exchangeName = SpringUtil.getProperty(exchangeName);
this.routingKey = SpringUtil.getProperty(routingKey);
}
public AbstractProducer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void send(Object obj) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg);
}
public void sendTtlMesssage(Object obj, Integer delayMillisecond) {
Map<String, Object> map = new HashMap<>(2);
map.put("x-message-ttl", delayMillisecond);
send(obj, map);
}
public void sendTxlMesssage(Object obj, Integer delayMillisecond) {
send(obj, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setDelay(delayMillisecond);
return message;
});
}
public void send(Object obj, Map<String, Object> headers) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg, message -> {
MessageProperties properties = message.getMessageProperties();
properties.setHeaders(headers);
return message;
});
}
public void send(Object obj, MessagePostProcessor messagePostProcessor) {
String msg = JsonUtil.toJSONString(obj);
amqpTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
}
}
比如说直连交换机队列的生产者
package com.banmoon.queues.producer;
import com.banmoon.constant.RabbitmqConstant;
import com.banmoon.queues.AbstractProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 直连测试队列生产者
*
* @author banmoon
* @date 2024/02/28 16:39:13
*/
@Slf4j
@Component
public class TestDirectProducer extends AbstractProducer {
public TestDirectProducer(AmqpTemplate amqpTemplate) {
super(amqpTemplate);
}
@Override
@Value(RabbitmqConstant.DIRECT_TEST_QUEUE)
public void setQueueName(String queueName) {
super.setQueueName(queueName);
}
@Override
@Value(RabbitmqConstant.DIRECT_TEST_EXCHANGE)
public void setExchangeName(String exchangeName) {
super.setExchangeName(exchangeName);
}
@Override
@Value(RabbitmqConstant.DIRECT_TEST_ROUTING_KEY)
public void setRoutingKey(String routingKey) {
super.setRoutingKey(routingKey);
}
}
package com.banmoon.queues.consumer;
import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 直连测试队列消费者
*
* @author banmoon
* @date 2024/02/27 12:08:12
*/
@Slf4j
@Component
public class TestDirectConsumer {
@RabbitListener(queues = RabbitmqConstant.DIRECT_TEST_QUEUE)
public void test(String message) {
log.info("直连测试队列消费者:{}", message);
}
}
关于上面几种交换机类型,以及TTL
死信队列、TXL
延迟队列都有做了配置示例。
主要是没有生产者、消费者的代码示例,相信大家都知道怎么写了。
那个,关于生产者的那个抽象类
AbstractProducer.java
有一个地方一直没有调通,就是如何将spel
表达式获取配置文件中的配置信息 只能退而求其次,使用@Value
注解来进行获取了。 相信注解能获取的,一定有注解解析器,这边也一定可以的。 又要看源码喽! 还有那个开发环境队列隔离问题 有些公司开发使用的是同一个配置文件,这样会导致前缀都是同一个,那样设置前缀就没有意义了。 其实可以这样,如果是使用nacos
的远端配置的,可以创建自己的命名空间,修改前缀。 如果是在本地resources
文件夹里面,可以使用maven
编译后替换变量的那个功能。 如何读取到maven中profile设置的参数 | 半月无霜 (banmoon.top) 上面两种方法,都是可以实现的
我是半月,你我一同共勉!!!