首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ基础教程之Spring&JavaConfig使用篇

RabbitMQ基础教程之Spring&JavaConfig使用篇

作者头像
一灰灰blog
发布于 2018-06-04 07:17:46
发布于 2018-06-04 07:17:46
78200
代码可运行
举报
文章被收录于专栏:小灰灰小灰灰
运行总次数:0
代码可运行

RabbitMQ基础教程之Spring使用篇

相关博文,推荐查看:

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之基本使用篇
  4. RabbitMQ基础教程之使用进阶篇

在实际的应用场景中,将RabbitMQ和Spring结合起来使用的时候可能更加频繁,网上关于Spring结合的博文中,大多都是xml的方式,这篇博文,则主要介绍下利用JavaConfig的结合,又会是怎样的

<!--more-->

I. Spring中RabbitMQ的基本使用姿势

1. 准备

开始之前,首先添加上必要的依赖,主要利用 spring-rabbit 来实现,这个依赖中,内部又依赖的Spring相关的模块,下面统一改成5.0.4版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

流程分析

实现主要分为两块,一个是投递服务,一个是消费服务,结合前面RabbitMQ的基本使用姿势中的流程,即便是使用Spring,我们也避免不了下面几步

  • 建立连接
  • 声明Exchange ,声明Queue
  • 建立Queue和Exchange之间的绑定关系
  • 发送消息
  • 消费消息(ack/nak)

2. 基本case

首先借助Spring,来实现一个最基本的最简单的实现方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Created by yihui in 19:53 18/5/30.
 */
public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        RabbitAdmin admin = new RabbitAdmin(factory);

        // 创建队列
        Queue queue = new Queue("hello", true, false, false, null);
        admin.declareQueue(queue);

        //创建topic类型的交换机
        TopicExchange exchange = new TopicExchange("topic.exchange");
        admin.declareExchange(exchange);

        //交换机和队列绑定,路由规则为匹配"foo."开头的路由键
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));


        //设置监听
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        Object listener = new Object() {
            public void handleMessage(String foo) {
                System.out.println(" [x] Received '" + foo + "'");
            }
        };
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        container.setMessageListener(adapter);
        container.setQueues(queue);
        container.start();

        //发送消息
        RabbitTemplate template = new RabbitTemplate(factory);
        template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
        Thread.sleep(1000);

        // 关闭
        container.stop();
    }
}

3. 逻辑分析

上面这一段代码中,包含了消息投递和消费两块,从实现而言,基本上逻辑和前面的基础使用没有什么太大的区别,步骤如下:

  1. 建立连接: new CachingConnectionFactory("127.0.0.1", 5672)
  2. 声明Queue: new Queue("hello", true, false, false, null)
  3. 声明Exchange: new TopicExchange("topic.exchange")
  4. 绑定Queue和Exchange: admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
  5. 投递消息: template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
  6. 消费消息: 设置MessageListenerAdapter

这里面有几个类需要额外注意:

  • RabbitTemplate: Spring实现的发送消息的模板,可以直接发送消息
  • SimpleMessageListenerContainer: 注册接收消息的容器

II. Spring结合JavaConfig使用RabbitMQ使用姿势

1. 公共配置

主要是将公共的ConnectionFactory 和 RabbitAdmin 抽取出来

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@ComponentScan("com.git.hui.rabbit.spring")
public class SpringConfig {

    private Environment environment;

    @Autowired
    public void setEnvironment(Environment environment) {
        this.environment = environment;
        System.out.println("then env: " + environment);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

2. 消息投递

发送消息的组件就比较简单了,直接利用 AmqpTemplate 即可

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class AmqpProducer {

    private AmqpTemplate amqpTemplate;

    @Autowired
    public void amqpTemplate(ConnectionFactory connectionFactory) {
        amqpTemplate = new RabbitTemplate(connectionFactory);
    }

    /**
     * 将消息发送到指定的交换器上
     *
     * @param exchange
     * @param msg
     */
    public void publishMsg(String exchange, String routingKey, Object msg) {
        amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

3. DirectExchange消息消费

根据不同的Exchange类型,分别实现如下

DirectExchange方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class DirectConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct.exchange");
        directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return directExchange;
    }

    @Bean
    public Queue directQueue() {
        Queue queue = new Queue("aaa");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding directQueueBinding() {
        Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener directConsumer() {
        return new BasicConsumer("direct");
    }

    @Bean(name = "directMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(directQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(directConsumer());
        return container;
    }
}

从上面的实现,基本上都是重新定义了一个Queue, Exchange, Binding, MessageListenerContainer(用来监听消息),并将消息的消费抽出了一个公共类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class BasicConsumer implements ChannelAwareMessageListener {
    private String name;

    public BasicConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            byte[] bytes = message.getBody();
            String data = new String(bytes, "utf-8");
            System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag());
        } catch (Exception e) {
            log.error("local cache rabbit mq localQueue error! e: {}", e);
        }
    }
}

4. 测试

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class SprintUnit {
    @Autowired
    private AmqpProducer amqpProducer;

    @Test
    public void testDirectConsumer() throws InterruptedException {
        String[] routingKey = new String[]{"hello.world", "world", "test1"};
        for (int i = 0; i < 10; i++) {
            amqpProducer
                    .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
        }
        System.out.println("-------over---------");

        Thread.sleep(1000 * 60 * 10);
    }
}

这个测试类中,虽然主要是往MQ中投递消息,但在Spring容器启动之后,接收MQ消息并消费的实际任务,是通过前面的MessageListenerContainer托付给Spring容器了,上面测试执行之后,输出为

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
direct data: >>> hello test1>>> 2 tagId: 1
direct data: >>> hello test1>>> 5 tagId: 2
direct data: >>> hello test1>>> 8 tagId: 3

5. Topic & Fanout策略

上面的一个写出来之后,再看这两个就比较相似了

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class TopicConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public TopicExchange topicExchange() {
        TopicExchange topicExchange = new TopicExchange("topic.exchange");
        topicExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return topicExchange;
    }

    @Bean
    public Queue topicQueue() {
        Queue queue = new Queue("bbb");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding topicQueueBinding() {
        Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener topicConsumer() {
        return new BasicConsumer("topic");
    }

    @Bean(name = "topicMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(topicQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(topicConsumer());
        return container;
    }
}

对应的测试case

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Test
public void testTopicConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"};
    for (int i = 0; i < 20; i++) {
        amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

广播方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class FanoutConsumerConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
        fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return fanoutExchange;
    }

    @Bean
    public Queue fanoutQueue() {
        Queue queue = new Queue("ccc");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding fanoutQueueBinding() {
        Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener fanoutConsumer() {
        return new BasicConsumer("fanout");
    }

    @Bean(name = "FanoutMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(fanoutQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(fanoutConsumer());
        return container;
    }
}

对应的测试case

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Test
public void testFanoutConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"};
    for (int i = 0; i < 20; i++) {
        amqpProducer
                .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

II. 其他

项目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
JVM-直接内存(Direct Memory)
直接内存是Java堆之外的,直接向系统申请的内存空间,所以直接内存不是虚拟机的一部分,也不是《Java虚拟机规范》中定义的内存区域,也有可能导致OOM。
逍遥壮士
2021/07/05
1.7K0
JVM-直接内存(Direct Memory)
关于JVM直接内存触发Full GC
今天在研究JVM的直接内存溢出时发现直接内存和堆内存一样,在直接内存快满时会触发full gc,full gc会将未被引用的对象及其指向的直接内存释放掉,如下为测试代码: package test.oom; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; /**  * VM args: -XX:+PrintGCDetails -XX:MaxDirectMemorySize=500M  * @au
囚兔
2018/02/08
2.7K0
JVM内存与垃圾回收篇第11章直接内存
第 11 章 直接内存 1、直接内存概述 直接内存 不是虚拟机运行时数据区的一部分,也不是《Java虚拟机规范》中定义的内存区域。 直接内存是在Java堆外的、直接向系统申请的内存区间。 来源于NIO,通过存在堆中的DirectByteBuffer操作Native内存 通常,访问直接内存的速度会优于Java堆。即读写性能高。 因此出于性能考虑,读写频繁的场合可能会考虑使用直接内存。 Java的NIO库允许Java程序使用直接内存,用于数据缓冲区 代码示例 代码 /** * IO
yuanshuai
2022/08/17
5230
JVM内存与垃圾回收篇第11章直接内存
Thrift抛直接内存OOM一点解决思路
最近使用Thrift TThreadedSelectorServer服务方式,运行一段时间就会抛OutOfMemoryError: Direct buffer memory异常; java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_67] at java.nio.DirectByteBuffer.<init>(Dire
囚兔
2018/02/08
1.6K0
JVM创建对象之内存解析
加载类元信息 -》 为对象分配内存 -》处理并发问题 -》属性的默认初始化 -》设置对象头 -》init方法
程序员阿杜
2021/07/05
5210
JVM创建对象之内存解析
直接内存满了导致的OOM
OOM导致的溢出比较容易复现,并且很容易排查,在日常开发过程中要注意,不用的变量或引用要及时回收。
逍遥壮士
2021/07/29
1.5K0
聊聊netty的maxDirectMemory
本文主要研究一下netty的maxDirectMemory(io.netty.maxDirectMemory)
code4it
2019/04/02
1.3K0
聊聊netty的maxDirectMemory
常识四堆外内存
堆外内存除了在像netty开源框架中,在平常项目中使用的比较少,在现前的项目中,QPS要求高的系统中,堆外内存作为其中一级缓存是相当有成效的。所以来学习一下,文中主要涉及到这三分部内容
码农戏码
2021/03/23
2.8K0
【JVM调优实战100例】05——方法区调优实战(下)
直接内存由操作系统来管理。常见于NIO,用于数据缓冲,读写性能很高,分配回收花销较高。
半旧518
2022/10/26
4930
【JVM调优实战100例】05——方法区调优实战(下)
聊聊jvm的-XX:MaxDirectMemorySize
序 本文主要研究一下jvm的-XX:MaxDirectMemorySize jpoutisfj_03.png -XX:MaxDirectMemorySize -XX:MaxDirectMemorySize=size用于设置New I/O(java.nio) direct-buffer allocations的最大大小,size的单位可以使用k/K、m/M、g/G;如果没有设置该参数则默认值为0,意味着JVM自己自动给NIO direct-buffer allocations选择最大大小 System.i
code4it
2019/03/28
13.5K0
聊聊jvm的-XX:MaxDirectMemorySize
Java程序员必备:常见OOM异常分析
放假这几天,温习了深入理解Java虚拟机的第二章, 整理了JVM发生OOM异常的几种情况,并分析原因以及解决方案,希望对大家有帮助。
捡田螺的小男孩
2020/04/14
1.4K0
JAVA面试50讲之10:直接(堆外)内存原理及使用
HeapByteBuffer是堆内ByteBuffer,使用byte[]存储数据,是对数组的封装,比较简单。DirectByteBuffer是堆外ByteBuffer,直接使用堆外内存空间存储数据,是NIO高性能的核心设计之一。本文来分析一下DirectByteBuffer的实现。
用户1205080
2019/01/23
3K1
JVM内存模型之直接内存
直接内存 又称堆外内存,也就是说这不是jvm运行时数据区的一部分,也不是java虚拟机规范中定义的内存区域,但这部分也会被频繁的使用,而且也可能导致OOM。
shengjk1
2018/10/24
7240
作为 Java 开发者,你需要了解的堆外内存知识
本文来自作者 应书澜 在 GitChat 上分享 「深入解读 Java 堆外内存(直接内存)」
CSDN技术头条
2018/07/30
1.2K0
作为 Java 开发者,你需要了解的堆外内存知识
JVM-直接内存
JVM 直接内存(Direct Memory)是 JVM 运行时使用的一种特殊内存区域,它是 JVM 堆外的一块内存空间。在 Java 中,我们使用java.nio 包和java.lang.System类中的arraycopy()方法等来操作直接内存。
程序员朱永胜
2023/11/11
6441
JVM-直接内存
一文探讨堆外内存的监控与回收
记得那是一个风和日丽的周末,太阳红彤彤,花儿五颜六色,96 年的普哥微信找到我,描述了一个诡异的线上问题:线上程序使用了 NIO FileChannel 的 堆内内存作为缓冲区,读写文件,逻辑可以说相当简单,但根据监控却发现堆外内存飙升,导致了 OutOfMemeory 的异常。
kirito-moe
2019/04/30
1.3K0
一文探讨堆外内存的监控与回收
常见的 OOM 异常分析(硬核干货)
在《Java虚拟机规范》的规定里,除了程序计数器外,虚拟机内存的其他几个运行时区域都有发生 OutOfMemoryError 异常的可能。
Java3y
2020/07/09
2K0
常见的 OOM 异常分析(硬核干货)
程序员进阶系列:OOM 都搞不定,还敢妄称自己Java高级攻城狮?
正式开讲之前,先罗列一下所知的 OutOfMemoryError (简称 OOM)异常,看看这些异常工作中你是否也遇到过?
一猿小讲
2020/11/03
5770
工作中最常见的6种OOM问题
最近我写的几篇线上问题相关的文章:《糟糕,CPU100%了》《如何防止被恶意刷接口》《我调用第三方接口遇到的13大坑》,发表之后,在全网广受好评。
苏三说技术
2024/03/18
3000
工作中最常见的6种OOM问题
从0到1起步-跟我进入堆外内存的奇妙世界
堆外内存一直是Java业务开发人员难以企及的隐藏领域,究竟他是干什么的,以及如何更好的使用呢?那就请跟着我进入这个世界吧。
小程故事多
2018/08/22
4690
从0到1起步-跟我进入堆外内存的奇妙世界
推荐阅读
相关推荐
JVM-直接内存(Direct Memory)
更多 >
LV.0
这个人很懒,什么都没有留下~
目录
  • RabbitMQ基础教程之Spring使用篇
    • I. Spring中RabbitMQ的基本使用姿势
      • 1. 准备
      • 2. 基本case
      • 3. 逻辑分析
    • II. Spring结合JavaConfig使用RabbitMQ使用姿势
      • 1. 公共配置
      • 2. 消息投递
      • 3. DirectExchange消息消费
      • 4. 测试
      • 5. Topic & Fanout策略
    • II. 其他
      • 项目地址
      • 一灰灰Blog: https://liuyueyi.github.io/hexblog
      • 声明
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档