前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费

第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费

作者头像
恒宇少年
发布于 2018-06-27 08:05:27
发布于 2018-06-27 08:05:27
1.3K00
代码可运行
举报
运行总次数:0
代码可运行

消息队列目前流行的有KafKa、RabbitMQ、ActiveMQ等,它们的诞生无非不是为了解决消息的分布式消费,完成项目、服务之间的解耦动作。消息队列提供者与消费者之间完全采用异步通信方式,极力的提高了系统的响应能力,从而提高系统的网络请求吞吐量。

每一种的消息队列都有它在设计上的独一无二的优势,在实际的项目技术选型时根据项目的需求来确定。

本章目标

基于SpringBoot项目整合RabbitMQ消息队列,完成DirectExchange(路由键)分布式消息消费。

SpringBoot 企业级核心技术学习专题

专题

专题名称

专题描述

001

Spring Boot 核心技术

讲解SpringBoot一些企业级层面的核心组件

002

Spring Boot 核心技术章节源码

Spring Boot 核心技术简书每一篇文章码云对应源码

003

Spring Cloud 核心技术

对Spring Cloud核心技术全面讲解

004

Spring Cloud 核心技术章节源码

Spring Cloud 核心技术简书每一篇文章对应源码

005

QueryDSL 核心技术

全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA

006

SpringDataJPA 核心技术

全面讲解SpringDataJPA核心技术

Exchange

RabbitMQ中有三种常用的转发方式,分别是:

DirectExchange路由键方式转发消息。

FanoutExchange:广播方式转发消息。

TopicExchange:主题匹配方式转发消息。

我们本章先来讲解DirectExchange路由键方式,根据设置的路由键的值进行完全匹配时转发,下面我们来看一张图,形象的介绍了转发消息匹配流程,如下图所示:

DirectExchange

我们可以看到上图,当消息被提供者发送到RabbitMQ后,会根据配置队列的交换以及绑定实例进行转发消息,上图只会将消息转发路由键为KEY的队列消费者对应的实现方法逻辑中,从而完成消息的消费过程。

安装RabbitMQ

因为RabbitMQ是跨平台的分布式消息队列服务,可以部署在任意的操作系统上,下面我们分别介绍在不同的系统下该怎么去安装RabbitMQ服务。

我们本章采用的环境版本如下:

  • RabbitMQ Server 3.6.14
  • Erlang/OTP_X64 20.1

Windows下安装

我们先去RabbitMQ官方网站下载最新版的安装包,下载地址:https://www.rabbitmq.com/download.html,可以根据不同的操作系统选择下载。

我们在安装RabbitMQ服务端时需要Erlang环境的支持,所以我们需要先安装Erlang

  1. 我们通过Erlang官方网站http://www.erlang.org/downloads下载最新的安装包
  2. 我们访问RabiitmQ官方下载地址https://www.rabbitmq.com/download.html下载最新安装包。

因为是国外的网站所以下载比较慢,大家下载时会浪费时间,我已经将安装包分享到了百度网盘,下载地址:安装包下载地址,密码:pexf

  1. 运行安装Erlang
  2. 运行安装RabbitMQ

5.检查服务是否安装完成,RabbitMQ安装完成后会以服务的形式创建,并且随着开机启动,如下所示:

Rabbit服务

Mac OS X 安装

在Mac OS X中我们使用brew工具可以很简单的安装RabbitMQ服务端,步骤如下:

  1. brew更新到最新版本,执行:brew update
  2. 接下来我们安装Erlang,执行:brew install erlang
  3. 最后安装RabbitMQ,执行:brew install rabbitmq

我们通过上面的步骤安装后,RabbitMQ会被自动安装到/usr/local/Cellar/rabbitmq/目录下,下面我们进入cd sbin目录执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sudo ./rabbitmq-server

可以直接启动RabbitMQ服务。

Ubuntu 安装

Ubuntu操作系统中,我们可以直接使用APT仓库进行安装,我使用的系统版本是16.04,系统版本并不影响安装。

  1. 安装Erlang,执行命令:sudo apt-get install erlang
  2. 下面我们需要将RabbitMQ的安装源配置信息写入到系统的/etc/apt/sources.list.d配置文件内,执行如下命令:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
  1. 下面我们更新APT本地仓库的安装包列表,执行命令:sudo apt-get update
  2. 最后安装RabbitMQ服务,执行命令:sudo apt-get install rabbitmq-server

启用界面管理插件

RabbitMQ提供了界面管理的web插件,我们只需要启用指定的插件就可以了,下面我们来看看Windows操作系统下该怎么启动界面管理插件。

我们使用CMD进入RabbitMQ安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.14,然后我们进入sbin目录,可以看到目录内存在很多个bat脚本程序,我们找到rabbitmq-plugins.bat,这个脚本程序可以控制RabbitMQ插件启用禁用,我们执行如下脚本命令来启用界面管理插件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rabbitmq-plugins.bat enable rabbitmq_management

命令行输出内容如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
The following plugins have been enabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... started 6 plugins.

可以看到输出的内容RabbitMQ自动启动了6个插件,我们现在访问http://127.0.0.1:15672地址可以直接打开RabbitMQ的界面管理平台,而默认的用户名/密码分别为:guest/guest,通过该用户可以直接登录管理平台。

禁用界面管理插件

我们同样可以禁用RabbitMQ指定插件,执行如下命令:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rabbitmq-plugins.bat disable rabbitmq_management

命令创建输出内容则是相关停止插件的日志,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
The following plugins have been disabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... stopped 6 plugins.

这样我们再访问http://127.0.0.1:15672就会发现我们无法访问到界面。

构建项目

我们使用idea开发工具创建一个SpringBoot项目,添加依赖,pom.xml配置文件如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
        <!--rabbitmq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
        <!--测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

我们本章来模拟用户注册完成后,将注册用户的编号通过Provider模块发送到RabbitMQ,然后RabbitMQ根据配置的DirectExchange的路由键进行异步转发。

初始化用户表

下面我们先来创建所需要的用户基本信息表,建表SQL如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE `user_info` (
  `UI_ID` int(11) DEFAULT NULL COMMENT '用户编号',
  `UI_USER_NAME` varchar(20) DEFAULT NULL COMMENT '用户名称',
  `UI_NAME` varchar(20) DEFAULT NULL COMMENT '真实姓名',
  `UI_AGE` int(11) DEFAULT NULL COMMENT '用户年龄',
  `UI_BALANCE` decimal(10,0) DEFAULT NULL COMMENT '用户余额'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户基本信息表';

构建 rabbitmq-provider 项目

基于我们上述的项目创建一个Maven子模块,命名为:rabbitmq-provider,因为是直接创建的Module项目,IDEA并没有给我创建SpringApplication启用类。

创建入口类

下面我们自行创建一个Provider项目启动入口程序,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 消息队列消息提供者启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:14
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqProviderApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqProviderApplication.class);

    /**
     * 消息队列提供者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqProviderApplication.class,args);

        logger.info("【【【【【消息队列-消息提供者启动成功.】】】】】");
    }
}
application.properties配置文件

下面我们在src/main/resource目录下创建application.properties并将对应RabbitMQ以及Druid的配置加入,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

#数据源配置
spring.datasource.druid.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
spring.datasource.druid.username=root
spring.datasource.druid.password=123456

RabbitMQ内有个virtual-host虚拟主机的概念,一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列。

用户实体

本章数据库操作采用spring-data-jpa,相关文章请访问:第十三章:SpringBoot实战SpringDataJPA,我们基于user_info数据表对应创建实体,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Data
@Table(name = "user_info")
@Entity
public class UserEntity
    implements Serializable
{
    /**
     * 用户编号
     */
    @Id
    @GeneratedValue
    @Column(name = "UI_ID")
    private Long id;
    /**
     * 用户名称
     */
    @Column(name = "UI_USER_NAME")
    private String userName;
    /**
     * 姓名
     */
    @Column(name = "UI_NAME")
    private String name;
    /**
     * 年龄
     */
    @Column(name = "UI_AGE")
    private int age;
    /**
     * 余额
     */
    @Column(name = "UI_BALANCE")
    private BigDecimal balance;
}
用户数据接口

创建UserRepository用户数据操作接口,并继承JpaRepository获得spring-data-jpa相关的接口定义方法。如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 用户数据接口定义
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:35
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface UserRepository
    extends JpaRepository<UserEntity,Long>
{
}
用户业务逻辑实现

本章只是简单完成了数据的添加,代码如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 用户业务逻辑实现类
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:37
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
@Transactional(rollbackFor = Exception.class)
public class UserService
{
    @Autowired
    private UserRepository userRepository;
    /**
     * 消息队列业务逻辑实现
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 保存用户
     * 并写入消息队列
     * @param userEntity
     * @return
     */
    public Long save(UserEntity userEntity) throws Exception
    {
        /**
         * 保存用户
         */
        userRepository.save(userEntity);
        /**
         * 将消息写入消息队列
         */
        queueMessageService.send(userEntity.getId(), ExchangeEnum.USER_REGISTER, QueueEnum.USER_REGISTER);

        return userEntity.getId();
    }

在上面业务逻辑实现类内出现了一个名为QueueMessageService消息队列实现类,该类是我们定义的用于发送消息到消息队列的统一入口,在下面我们会详细讲解。

用户控制器

创建一个名为UserController的控制器类,对应编写一个添加用户的请求方法,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 用户控制器
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:41
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@RestController
@RequestMapping(value = "/user")
public class UserController
{
    /**
     * 用户业务逻辑
     */
    @Autowired
    private UserService userService;

    /**
     * 保存用户基本信息
     * @param userEntity
     * @return
     */
    @RequestMapping(value = "/save")
    public UserEntity save(UserEntity userEntity) throws Exception
    {
        userService.save(userEntity);
        return userEntity;
    }
}

到这我们添加用户的流程已经编写完成了,那么我们就来看下消息队列QueueMessageService接口的定义以及实现类的定义。

消息队列方法定义接口

创建一个名为QueueMessageService的接口并且继承了RabbitTemplate.ConfirmCallback接口,而RabbitTemplate.ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,就会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知,QueueMessageService接口如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 消息队列业务
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
    extends RabbitTemplate.ConfirmCallback
{
    /**
     * 发送消息到rabbitmq消息队列
     * @param message 消息内容
     * @param exchangeEnum 交换配置枚举
     * @param queueEnum 队列配置枚举
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;
}

接下来我们需要实现该接口内的所有方法,并做出一些业务逻辑的处理。

消息队列业务实现

创建名为QueueMessageServiceSupport实体类实现QueueMessageService接口,并实现接口内的所有方法,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 消息队列业务逻辑实现
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息队列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception {
        //设置回调为当前类对象
        rabbitTemplate.setConfirmCallback(this);
        //构建回调id为uuid
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //发送消息到消息队列
        rabbitTemplate.convertAndSend(exchangeEnum.getValue(),queueEnum.getRoutingKey(),message,correlationId);
    }

    /**
     * 消息回调确认方法
     * @param correlationData 请求数据对象
     * @param ack 是否发送成功
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }
}

convertAndSend方法用于将Object类型的消息转换后发送到RabbitMQ服务端,发送是的消息类型要与消息消费者方法参数保持一致。

confirm方法内,我们仅仅打印了消息发送时的id,根据ack参数输出消息发送状态。

在上面代码中我们注入了RabbitTemplate消息队列模板实例,而通过该实例我们可以将消息发送到RabbitMQ服务端。那么这个实例具体在什么地方定义的呢?我们带着这个疑问来创建下面的模块,我们需要将RabbitMQ相关的配置抽取出来作为一个单独的Module存在。

构建 rabbitmq-common 项目

该模块项目很简单,只是添加RabbitMQ相关的配置信息,由于Module是一个子模块所以继承了parent所有的依赖,当然我们用到的RabbitMQ相关依赖也不例外。

配置rabbitmq

在创建配置类之前,我们先来定义两个枚举,分别存放了队列的交换信息、队列路由信息,

  • ExchangeEnum (存放了队列交换配置信息)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * rabbitmq交换配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用户注册交换配置枚举
     */
    USER_REGISTER("user.register.topic.exchange")
    ;
    private String value;

    ExchangeEnum(String value) {
        this.value = value;
    }
}
  • QueueEnum (存放了队列信息以及队列的路由配置信息)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 队列配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用户注册枚举
     */
    USER_REGISTER("user.register.queue","user.register")
    ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}

创建名为UserRegisterQueueConfiguration的实体类用于配置本章用到的用户注册队列信息,如果你得项目中使用多个队列,建议每一个业务逻辑创建一个配置类,分开维护,这样不容易出错。配置信息如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 用户注册消息队列配置
 * ========================
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {
    /**
     * 配置路由交换对象实例
     * @return
     */
    @Bean
    public DirectExchange userRegisterDirectExchange()
    {
        return new DirectExchange(ExchangeEnum.USER_REGISTER.getValue());
    }

    /**
     * 配置用户注册队列对象实例
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue userRegisterQueue()
    {
        return new Queue(QueueEnum.USER_REGISTER.getName(),true);
    }

    /**
     * 将用户注册队列绑定到路由交换配置上并设置指定路由键进行转发
     * @return
     */
    @Bean
    public Binding userRegisterBinding()
    {
        return BindingBuilder.bind(userRegisterQueue()).to(userRegisterDirectExchange()).with(QueueEnum.USER_REGISTER.getRoutingKey());
    }
}

该配置类大致分为如下三部分:

  • 配置交换实例 配置DirectExchange实例对象,为交换设置一个名称,引用ExchangeEnum枚举配置的交换名称,消息提供者与消息消费者的交换名称必须一致才具备的第一步的通讯基础。
  • 配置队列实例 配置Queue实例对象,为消息队列设置一个名称,引用QueueEnum枚举配置的队列名称,当然队列的名称同样也是提供者与消费者之间的通讯基础。
  • 绑定队列实例到交换实例 配置Binding实例对象,消息绑定的目的就是将Queue实例绑定到Exchange上,并且通过设置的路由Key进行消息转发,配置了路由Key后,只有符合该路由配置的消息才会被转发到绑定交换上的消息队列。

我们的rabbitmq-common模块已经编写完成。

添加 rabbitmq-provider 依赖 rabbitmq-common

下面我们回到rabbitmq-provider模块,修改pom.xml配置文件,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
        <!--添加common模块依赖-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <!--mysql依赖-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--druid数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.5</version>
        </dependency>
        <!--data jpa依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>

可以看到我们将rabbitmq-common模块添加到了rabbitmq-provider模块的pom配置文件内,完成了模块之间的相互依赖,这样我们rabbitmq-provider就自动添加了对应的消息队列配置。

构建rabbitmq-consumer

我们再来创建一个rabbitmq-consumer队列消息消费者模块,用于接受消费用户注册消息。

创建入口类

同样我们先来创建一个SpringApplication入口启动类,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 消息队列消息消费者入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerApplication.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerApplication.class,args);

        logger.info("【【【【【消息队列-消息消费者启动成功.】】】】】");
    }
}
application.properties配置文件

配置文件的消息队列配置信息要与rabbitmq-provider配置文件一致,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.application.name=rabbitmq-consumer
#启动端口
server.port=1111
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

我们修改了程序启动的端口号,为了我们下面进行测试的时候不出现端口占用的情况。

如果RabbitMQ配置信息与rabbitmq-provider不一致,就不会收到消费消息。

用户注册消息消费者

创建名为UserConsumer类,用于完成消息监听,并且实现消息消费,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 用户注册消息消费者
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    @RabbitHandler
    public void execute(Long userId)
    {
        System.out.println("用户:" + userId+",完成了注册");

        //...//自行业务逻辑处理
    }
}

在消息消费者类内,有两个陌生的注解:

  • @RabbitListener RabbitMQ队列消息监听注解,该注解配置监听queues内的队列名称列表,可以配置多个。队列名称对应本章rabbitmq-common模块内QueueEnum枚举name属性。
  • @RabbitHandler RabbitMQ消息处理方法,该方法的参数要与rabbitmq-provider发送消息时的类型保持一致,否则无法自动调用消费方法,也就无法完成消息的消费。

运行测试

我们接下来在rabbitmq-provider模块src/test/java下创建一个测试用例,访问用户注册控制器请求路径,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class UserTester
{
    /**
     * 模拟mvc测试对象
     */
    private MockMvc mockMvc;

    /**
     * web项目上下文
     */
    @Autowired
    private WebApplicationContext webApplicationContext;

    /**
     * 所有测试方法执行之前执行该方法
     */
    @Before
    public void before() {
        //获取mockmvc对象实例
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
    }

    /**
     * 测试添加用户
     * @throws Exception
     */
    @Test
    public void testUserAdd() throws Exception
    {
        mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                .param("userName","yuqiyu")
                .param("name","恒宇少年")
                .param("age","23")
        )
                .andDo(MockMvcResultHandlers.log())
                .andReturn();
    }
}

调用测试用例时会自动将参数保存到数据库,并且将用户编号发送到RabbitMQ服务端,而RabbitMQ根据交换配置以及队列配置转发消息到消费者实例。

启动 rabbitmq-consumer

我们先来把rabbitmq-consumer项目启动,控制台输出启动日志如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
.....
51.194  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2017-12-03 16:58:51.196  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2017-12-03 16:58:51.216  INFO 2340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2017-12-03 16:58:51.237  INFO 2340 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#443ff8ef:0/SimpleConnection@4369ac5c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
2017-12-03 16:58:51.287  INFO 2340 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.354 seconds (JVM running for 3.026)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】

该部分启动日志就是我们配置的RabbitMQ初始化信息,我们可以看到项目启动时会自动与配置的RabbitMQ进行关联:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
运行测试用例

接下来我们执行rabbitmq-provider项目的测试用例,来查看控制台的输出内容如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
......
 回调id:e08f6d82-57bc-4c3f-9899-31c4b990c5be
消息发送成功
......

已经可以正常的将消息发送到RabbitMQ服务端,并且接收到了回调通知,那么我们的rabbitmq-consumer项目是不是已经执行了消息的消费呢?我们打开rabbitmq-consumer控制台查看输出内容如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
用户:2,完成了注册

看以看到已经可以成功的执行UserConsumer消息监听类内的监听方法逻辑,到这里消息队列路由一对一的方式已经讲解完了。

总结

本章主要讲解了RabbitMQ在不同操作系统下的安装方式,以及通过三个子模块形象的展示了消息的分布式处理,整体流程:rabbitmq-provider -> RabbitMQ服务端 -> rabbitmq-consumer,消息的转发是非常快的,RabbitMQ在收到消息后就会检索当前服务端是否存在该消息的消费者,如果存在将会马上将消息转发。

本章源码已经上传到码云:

SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter

SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter

SpringBoot相关系列文章请访问:目录:SpringBoot学习目录

QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录

SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录

SpringBoot相关文章请访问:目录:SpringBoot学习目录,感谢阅读!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.12.03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费
我们在之前的两个章节第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费、第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费提高了RabbitMQ消息队列的DirectExchange交换类型的消息消费,我们之前的章节提到了RabbitMQ比较常用的交换类型有三种,我们今天来看看TopicExchange主题交换类型。 本章目标 基于SpringBoot平台完成RabbitMQ的TopicE
恒宇少年
2018/06/27
1.3K0
第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费
在上一章第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费我们讲解到了RabbitMQ消息队列的DirectExchange路由键消息单个消费者消费,源码请访问SpringBoot对应章节源码下载查看,消息队列目的是完成消息的分布式消费,那么我们是否可以为一个Provider创建并绑定多个Consumer呢? 本章目标 基于SpringBoot平台整合RabbitMQ消息队列,完成一个Provider绑定多个Consumer进行消息消费。 Spring
恒宇少年
2018/06/27
7440
第四十六章:SpringBoot & RabbitMQ完成消息延迟消费
在2018-3-1日SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档。 本章目标 基于SpringBoot整合RabbitMQ完成消息延迟消费。 构建项目 注意前言 由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出4
恒宇少年
2018/06/27
8570
手把手带你Springboot整合RabbitMq ,一篇讲完
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用,消息回调、手动确认等。(但是关于rabbitMq的安装,就不介绍了)
java进阶架构师
2020/06/16
2K0
手把手带你Springboot整合RabbitMq ,一篇讲完
RabbitMQ六种队列模式之路由模式
本文接着带大家伙了解RabbitMQ队列模式中的路由模式,其实只要看过我前面写的发布订阅模式的文章后,相信路由模式上手就非常 easy 了,唯一差距就是两个参数,exchange类型和 routingKey 。
黎明大大
2021/03/09
6410
第四十八章:SpringBoot2.0新特性 - RabbitMQ信任package设置本章目标SpringBoot 企业级核心技术学习专题构建项目总结
在这次SpringBoot升级后,之前的系统内使用实体传输受到了限制,如果使用SpringBoot默认的序列化方式不会出现信任package的问题,之所以出现这个问题是因为项目使用fastjson方式进行类的序列化已经反序列化,在之前SpringBoot 1.5.10版本的时候 RabbitMQ依赖内的DefaultClassMapper类在构造函数内配置*,表示信任项目内的所有package,在SpringBoot 2.0.0版本时,DefaultClassMapper类源码构造函数进行了修改,不再信任全
恒宇少年
2018/06/27
1.2K0
第四十四章: 基于SpringBoot & AOP完成统一资源自动查询映射
本章内容比较偏向系统设计方面,简单的封装就可以应用到系统中使用,从而提高我们的编码效率以及代码的可读性。统一资源在系统内是不可避免的模块,资源分类也有很多种,比较常见如:图片资源、文本资源、视频资源等,那么资源统一处理的好处是什么呢?大家有可能会有疑问,我把资源存放到业务表内岂不更好吗?这样查询起来也方便,并不需要关联资源信息表!当然设计不分好坏,只有更适合、更简单!接下来带着疑问进入本章的内容。 本章目标 基于SpringBoot平台结合AOP完成统一资源的自动查询映射。 构建项目 本章使用到的依赖相对来
恒宇少年
2018/06/27
1.5K0
第二十七章:SpringBoot使用ApplicationEvent&Listener完成业务解耦
ApplicationEvent以及Listener是Spring为我们提供的一个事件监听、订阅的实现,内部实现原理是观察者设计模式,设计初衷也是为了系统业务逻辑之间的解耦,提高可扩展性以及可维护性。事件发布者并不需要考虑谁去监听,监听具体的实现内容是什么,发布者的工作只是为了发布事件而已。 我们平时日常生活中也是经常会有这种情况存在,如:我们在平时拔河比赛中,裁判员给我们吹响了开始的信号,也就是给我们发布了一个开始的事件,而拔河双方人员都在监听着这个事件,一旦事件发布后双方人员就开始往自己方使劲。而裁判
恒宇少年
2018/06/27
1.1K0
mall整合RabbitMQ实现延迟消息
1.安装Erlang,下载地址:http://erlang.org/download/otpwin6421.3.exe
macrozheng
2019/07/22
5040
mall整合RabbitMQ实现延迟消息
SpringBoot整合RabbitMQ消息组件
1、RabbitMQ是一个在AMQP基础上构建的新一代企业级消息系统,该组件由Pivotal公司提供,使用ErLang语言开发。
别先生
2020/11/26
3070
SpringBoot整合RabbitMQ消息组件
第四十五章:基于SpringBoot 设计业务逻辑异常统一处理
在我们平时的项目研发过程中,异常一般都是程序员最为头疼的问题,异常的抛出、捕获、处理等既涉及事务回滚,还会涉及返回前端消息提醒信息。那么我们怎么设计可以解决上面的两个的痛点呢?我们可不可以统一处理业务逻辑然后给出前端对应的异常提醒内容呢? 本章目标 基于SpringBoot平台构建业务逻辑异常统一处理,异常消息内容格式化。 SpringBoot 企业级核心技术学习专题 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spri
恒宇少年
2018/06/27
1.9K0
Springboot使用RabbitMQ看这几篇就够了(模式案例篇)!
上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。
青衫染红尘
2021/01/19
4.7K0
Springboot使用RabbitMQ看这几篇就够了(模式案例篇)!
第三十九章:基于SpringBoot & Quartz完成定时任务分布式单节点持久化
定时任务在企业项目比较常用到,几乎所有的项目都会牵扯该功能模块,定时任务一般会处理指定时间点执行某一些业务逻辑、间隔时间执行某一些业务逻辑等。我们在之前有讲过SpringBoot是已经集成了定时任务的,详见:第二十六章:SpringBoot使用@Scheduled创建定时任务,那么我们本章将会采用外置的quartz定时任务框架来完成定时任务的分布式单节点持久化,我们为什么要持久化定时任务呢? 在一些项目中定时任务可能是必不可少的,由于某种特殊的原因定时任务可能丢失,如重启定时任务服务项目后,原内存中的定时任
恒宇少年
2018/06/27
2.5K0
MQ系列(3)——rabbitMQ结合springboot使用(1)
从这一节开始我们进入rabbitMQ的实战环节,项目环境是spring-boot 加maven。首先让我们创建一个spring-boot项目,然后引入web依赖和 rabbitMQ的依赖
六个核弹
2022/12/23
2.1K0
MQ系列(3)——rabbitMQ结合springboot使用(1)
RabbitMQ高级特性之延迟队列
很多时候我们想定时去做某件事情的时候我们会首先想到定时任务,quartz是个不错的选择,但是也有缺点,假如配置在项目中,集群部署会有重复执行的问题,如果持久化在mysql中,解决了集群的问题,但是过于依赖mysql,耦合严重,当然还有日志量庞大、执行时间精度、过于耗费系统资源等等问题。所以这时候使用消息队列中间件的的延时队列就是一个很好得解决方案,我们设置要触发消费的时间和必要的参数入队mq,到时监听queue的消费者自然拿到消息然后去走业务流程,这里介绍的是基于rabbitmq中间件实现的TTL版的延时队列。
黎明大大
2021/03/25
1.2K0
RabbitMQ高级特性之延迟队列
SpringBoot整合RabbitMQ实现延迟消息
在上一篇文章一篇文章搞懂RabbitMQ 延迟消息中作者详细介绍了RabbitMq实现延迟消息队列的两种方式:
用户3587585
2023/08/10
8340
SpringBoot整合RabbitMQ实现延迟消息
第三十章:SpringBoot使用MapStruct自动映射DTO
MapStruct是一种类型安全的bean映射类生成java注释处理器。 我们要做的就是定义一个映射器接口,声明任何必需的映射方法。在编译的过程中,MapStruct会生成此接口的实现。该实现使用纯java方法调用的源和目标对象之间的映射,MapStruct节省了时间,通过生成代码完成繁琐和容易出错的代码逻辑。下面我们来揭开它的神秘面纱 本章目标 基于SpringBoot平台完成MapStruct映射框架的集成。 SpringBoot 企业级核心技术学习专题 专题 专题名称 专题描述 001 Spring
恒宇少年
2018/06/27
5.5K0
RabbitMQ高级特性之消费端限流
假设我们现在有这么一个场景,我们的消费端由于某些原因导致全部宕机等不可用,导致RabbitMQ服务器队列中存储了大量消息未被消费掉,那么等恢复消费端服务器后,就会有巨大量的消息全部推送过来,但是我们单个客户端无法同事处理这么多消息,就是导致消费端一些不可预测错误,甚至又会重复发生宕机,所以在实际业务场景中,限流保护还是非常重要的。
黎明大大
2021/03/08
7940
SpringBoot 整合 RabbitMQ 实现消息可靠传输
生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。
用户10384376
2023/02/26
3920
SpringBoot 整合 RabbitMQ 实现消息可靠传输
分布式专题|进入BAT必备之Springboot 集成 RabbitMQ的五种工作模式
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
AI码师
2020/11/19
3400
分布式专题|进入BAT必备之Springboot 集成 RabbitMQ的五种工作模式
推荐阅读
相关推荐
第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费
更多 >
LV.0
山东有鸿信息技术有限公司架构师
目录
  • 本章目标
  • SpringBoot 企业级核心技术学习专题
  • Exchange
    • 安装RabbitMQ
      • Windows下安装
      • Mac OS X 安装
      • Ubuntu 安装
    • 启用界面管理插件
      • 禁用界面管理插件
  • 构建项目
    • 构建 rabbitmq-provider 项目
    • 构建 rabbitmq-common 项目
    • 构建rabbitmq-consumer
  • 运行测试
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档