1、ActiveMQ是Apache提供的开源组件,是基于JMS标准的实现组件。利用SpringBoot整合ActiveMQ组件,实现队列消息的发送与接收。修改pom.xml配置文件,追加spring-boot-starter-activemq依赖库。
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5 https://maven.apache.org/xsd/maven-4.0.0.xsd">
6 <modelVersion>4.0.0</modelVersion>
7 <parent>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-parent</artifactId>
10 <version>2.3.5.RELEASE</version>
11 <relativePath /> <!-- lookup parent from repository -->
12 </parent>
13 <groupId>com.example</groupId>
14 <artifactId>demo</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>demo</name>
17 <description>Demo project for Spring Boot</description>
18
19 <properties>
20 <java.version>1.8</java.version>
21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22 </properties>
23
24 <dependencies>
25 <dependency>
26 <groupId>org.springframework.boot</groupId>
27 <artifactId>spring-boot-starter-web</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.springframework.boot</groupId>
32 <artifactId>spring-boot-starter-test</artifactId>
33 <scope>test</scope>
34 <exclusions>
35 <exclusion>
36 <groupId>org.junit.vintage</groupId>
37 <artifactId>junit-vintage-engine</artifactId>
38 </exclusion>
39 </exclusions>
40 </dependency>
41
42 <!-- mysql驱动包 -->
43 <dependency>
44 <groupId>mysql</groupId>
45 <artifactId>mysql-connector-java</artifactId>
46 </dependency>
47
48 <!-- druid连接池 -->
49 <dependency>
50 <groupId>com.alibaba</groupId>
51 <artifactId>druid</artifactId>
52 <version>1.1.10</version>
53 </dependency>
54
55 <dependency>
56 <groupId>org.springframework.boot</groupId>
57 <artifactId>spring-boot-starter-data-jpa</artifactId>
58 </dependency>
59 <dependency>
60 <groupId>org.springframework.boot</groupId>
61 <artifactId>spring-boot-starter-cache</artifactId>
62 </dependency>
63 <dependency>
64 <groupId>org.hibernate</groupId>
65 <artifactId>hibernate-ehcache</artifactId>
66 </dependency>
67
68 <!-- activeMQ -->
69 <dependency>
70 <groupId>org.springframework.boot</groupId>
71 <artifactId>spring-boot-starter-activemq</artifactId>
72 </dependency>
73 </dependencies>
74
75 <build>
76 <plugins>
77 <plugin>
78 <groupId>org.springframework.boot</groupId>
79 <artifactId>spring-boot-maven-plugin</artifactId>
80 </plugin>
81 </plugins>
82 <resources>
83 <resource>
84 <directory>src/main/resources</directory>
85 <includes>
86 <include>**/*.properties</include>
87 <include>**/*.yml</include>
88 <include>**/*.xml</include>
89 <include>**/*.p12</include>
90 <include>**/*.html</include>
91 <include>**/*.jpg</include>
92 <include>**/*.png</include>
93 </includes>
94 </resource>
95 </resources>
96 </build>
97
98 </project>
修改application.yml配置文件,进行ActiveMQ的配置,如下所示:
1 # 配置消息类型,true表示为topic消息,false表示Queue消息
2 spring.jms.pub-sub-domain=false
3 # 连接的用户名
4 spring.activemq.user=admin
5 # 密码
6 spring.activemq.password=admin
7 # 消息组件的连接主机信息
8 spring.activemq.broker-url=tcp://192.168.110.142:61616
定义消息消费监听类,如下所示:
1 package com.demo.consumer;
2
3 import org.springframework.jms.annotation.JmsListener;
4 import org.springframework.stereotype.Service;
5
6 @Service
7 public class MessageConsumer {
8
9 /**
10 *
11 * @param text
12 */
13 @JmsListener(destination = "msg.queue") // 定义消息监听队列
14 public void receiveMessage(String text) {
15 // 进行消息接受处理
16 System.err.println("【*** 接受消息 ***】" + text);
17 }
18 }
定义消息生产者业务类,如下所示:
1 package com.demo.producer;
2
3 import javax.jms.Queue;
4
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.jms.core.JmsMessagingTemplate;
7 import org.springframework.stereotype.Service;
8
9 /**
10 *
11 * @author 消息发送
12 *
13 */
14 @Service
15 public class MessageProducer {
16
17 // 消息发送模板
18 @Autowired
19 private JmsMessagingTemplate jmsMessagingTemplate;
20
21 // 注入队列
22 @Autowired
23 private Queue queue;
24
25 /**
26 * 发送消息
27 */
28 public void send(String msg) {
29 this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
30 }
31
32 }
定义JMS消息发送配置类,该类主要用于配置队列信息,如下所示:
1 package com.demo.config;
2
3 import javax.jms.Queue;
4
5 import org.apache.activemq.command.ActiveMQQueue;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8 import org.springframework.jms.annotation.EnableJms;
9
10 @Configuration
11 @EnableJms
12 public class ActiveMqConfig {
13
14 @Bean
15 public Queue queue() {
16 ActiveMQQueue activeMQQueue = new ActiveMQQueue("msg.queue");
17 return activeMQQueue;
18 }
19 }
使用ActiveMQ实现了消息的发送与接收处理。每当有消息接收到时,都会自动执行MessageConsumer类,进行消息消费。
1 package com.demo.controller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.demo.producer.MessageProducer;
9
10 @Controller
11 public class ActiveMqController {
12
13 @Autowired
14 private MessageProducer messageProducer;
15
16 @RequestMapping(value = "/messageProducer")
17 @ResponseBody
18 public void findAll() {
19 for (int i = 0; i < 10000; i++) {
20 messageProducer.send("active producer message : " + i);
21 }
22 }
23
24 }
在浏览器或者可以执行命令的地方执行,http://127.0.0.1:8080/messageProducer,可以在activemq的监控地址进行观察http://192.168.110.142:8161/admin/queues.jsp