操作场景
本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
下载 Demo
操作步骤
步骤1:添加依赖
在 pom.xml 中添加依赖。
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
步骤2:准备配置
1. 在配置文件中加入 RabbitMQ 配置信息(以 yaml 配置为例)。
spring:rabbitmq:# host地址可在控制台中获取 或 使用Rabbitmq服务地址host: 1.14.235.155port: 5672# 要使用的角色名称 可在角色管理控制台获取username: admin# 角色密钥password: eyJrZXlJZ....# Vhost全称,可在集群控制台Vhost tab页获取virtual-host: test_vhost
参数 | 说明 |
host | 集群接入地址,在集群基本信息页面的客户端接入模块获取。 |
port | 集群接入地址端口,在集群基本信息页面的客户端接入模块获取。 |
username | 用户名称,填写在控制台创建的用户名称。 |
password | 用户密码,填写在控制台创建用户时填写的密码。 |
virtual-host | Vhost 名称,在控制台 Vhost 列表获取。 |
2. 创建配置文件加载程序(以
Fanout交换机
为例)。说明
/*** Fanout交换机配置*/@Configurationpublic class FanoutRabbitConfig {/*** 交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout-logs", true, false);}/*** 消息队列*/@Beanpublic Queue fanoutQueueA() {return new Queue("ps_queue", true);}@Beanpublic Queue fanoutQueueB() {// 可通过这种方式绑定死信队列Map<String, Object> requestParam = new HashMap<>();requestParam.put("x-dead-letter-exchange", "my-deadLetter-exchange");// 设置消息过期时长requestParam.put("x-message-ttl", 60000);return new Queue("ps_queue1", true, false,true, requestParam);}/*** 绑定消息队列与交换机*/@Beanpublic Binding bindingFanoutA() {return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());}@Beanpublic Binding bindingFanoutB() {return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());}}
参数 | 说明 |
fanout-logs | 绑定的 Exchange 名称,在控制台 Exchange 列表获取。 |
ps_queue | 与 Exchange 绑定的第一个 Queue名称,在控制台 Queue 列表获取。 |
my-deadLetter-exchange | 死信 Exchange 名称,在控制台 Exchange 列表获取。 |
ps_queue1 | 与 Exchange 绑定的第二个 Queue 名称,在控制台 Queue 列表获取。 |
步骤3:发送消息
创建并编译消息发送程序 DemoApplication.java,使用 RabbitTemplate 发送即可。
@Autowiredprivate RabbitTemplate rabbitTemplate;public String send() {String msg = "This is a new message.";// 发送消息// 参数说明:参数1:交换机名称,在控制台 Exchange 列表获取。 参数2:routing key 参数3:消息内容rabbitTemplate.convertAndSend("direct_logs", "", msg);return "success";}
步骤4:消费消息
创建并编译消息接收程序 FanoutReceiver.java。(以
Fanout交换机
为例。)@Componentpublic class FanoutReceiver {// 注册一个listener监听指定消息队列@RabbitHandler@RabbitListener(queues = "test_queue") //与 Exchange 绑定的 Queue名称,在控制台 Queue 列表获取。public void listenerPsQueue(String msg) {// 业务处理...System.out.println("(ps_queue) receiver message. [" + msg + "]");}}
步骤5:查看消息
说明