在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
消费者工程:
创建生产者工程 springboot-rabbitmq-producer
修改pom.xml文件内容为如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- springboot 父工程 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- 工程坐标 -->
<groupId>com.lijw</groupId>
<artifactId>springboot-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 工程信息 -->
<name>springboot-rabbitmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<!-- 工程依赖 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rabbitmq起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.lijw.springbootrabbitmqproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
}
}
创建application.yml,内容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
创建RabbitMQ队列与交换机绑定的配置类com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig
package com.lijw.springbootrabbitmqproducer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
// 队列名称
public static final String ITEM_QUEUE = "item_queue";
// 声明交换机
@Bean("itemTopicExchange")
public Exchange topicExchange(){
/**
* ExchangeBuilder
* - topicExchange(ITEM_TOPIC_EXCHANGE) 设置 topic 模式的交换机名称
* - durable(true) 设置持久化
* - build() 构建返回 Exchange 对象
*/
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
// 声明队列
@Bean("itemQueue")
public Queue itemQueue(){
/**
* QueueBuilder
* - .durable(ITEM_QUEUE) 设置队列名称以及持久化
* - .build() 构建返回 Queue 对象
*/
return QueueBuilder.durable(ITEM_QUEUE).build();
}
// 绑定队列与交换机
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
/**
* BindingBuilder
* - .bind(queue) 绑定队列
* - .to(exchange) 绑定交换机
* - with("item.#") 设置 Routing Key
* - .noargs() 设置无参数
*/
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:
package com.lijw.springbootrabbitmqproducer;
import com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// 生产者发送消息
@Test
public void testSend() {
/**
* 1. 参数一: 交换机名称
* 2. 参数二: routingKey
* 3. 参数三: 发送的消息
*/
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
}
}
运行上述测试程序(交换机和队列才能先被声明和绑定),执行如下:
执行完毕后, 可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:
在上面我们已经确认了消息写入了队列,下面我们来编写消费者工程进行消费。
创建消费者工程 springboot-rabbitmq-consumer
修改pom.xml文件内容为如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- springboot 父工程 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- 工程坐标 -->
<groupId>com.lijw</groupId>
<artifactId>springboot-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 工程信息 -->
<name>springboot-rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<!-- 工程依赖 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rabbitmq的起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.lijw.springbootrabbitmqconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRabbitmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args);
}
}
创建application.yml,内容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
编写消息监听器 com.lijw.springbootrabbitmqconsumer.listener.MyListener
package com.lijw.springbootrabbitmqconsumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "item_queue")
public void myListener1(String message){
System.out.println("消费者接收到的消息: " + message);
}
}
启动消费者工程,查看接收到的消息:
我们可以在生产者工程多发送几次消息看看,如下:
到这里,我们已经确认消费者能够正常接收消息了。