前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ延迟消息学习

RabbitMQ延迟消息学习

作者头像
河岸飞流
发布2019-08-09 14:45:32
6170
发布2019-08-09 14:45:32
举报
文章被收录于专栏:开发杂记

准备做一个禁言自动解除的功能,立马想到了订单的超时自动解除,刚好最近在看RabbitMQ的实现,于是想用它实现,查询了相关文档发现确实可以实现,动手编写了这篇短文。

准备工作

1、Erlang安装请参考windows下安装Erlang

2、mq安装晴参考RabbitMQ安装

3、延迟消息插件安装rabbitmq_delayed_message_exchange

代码语言:javascript
复制
    #插件下载地址(选择与mq版本匹配的插件版本)
    http://www.rabbitmq.com/community-plugins.html
    #安装命令如下(在安装目录sbin下执行如下命令)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
创建项目

我选择的是在springboot中集成RabbitMQ,配置相对简单很多。

项目创建好后,在application.properties中加入RabbitMQ参数:

代码语言:javascript
复制
#RabbitMQ config
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#Custom config
rabbitmq.exchange=test_exchange
rabbitmq.queue=test_queue_1
定义ConnectionFactory和RabbitTemplate
代码语言:javascript
复制
    package com.xsh.mq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}
Exchange和Queue配置
代码语言:javascript
复制
    package com.xsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置队列
 */
@Configuration
public class QueueConfig {

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    @Value("${rabbitmq.queue}")
    private String queueName;

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
       //使用的是CustomExchange,不是DirectExchange,另外CustomExchange的类型必须是x-delayed-message
        return new CustomExchange(exchangeName, "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue(queueName, true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with(queueName).noargs();
    }
}
消息发送
代码语言:javascript
复制
    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceImpl {

    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
    /**
     * rabbitMQ模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    /**
     * 发送消息
     * @param queueName 队列名称
     * @param msg 消息内容
     * @param delay 延迟时长 默认3秒
     */
    public void sendMsg(String queueName,String msg,Integer delay) {
        if(null == delay){
            delay = 3000;
        }
        logger.info("》》》》发送消息");
        Integer finalDelay = delay;
        rabbitTemplate.convertAndSend(exchangeName, queueName, msg, message -> {
            //必须添加header x-delay
            message.getMessageProperties().setHeader("x-delay", finalDelay);
            return message;
        });
    }
}

这里发送消息我定义了一个延迟参数,传入的延迟是多少,消息就延迟多少,方便消息延迟不一样

消费消息
代码语言:javascript
复制
    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    @RabbitListener(queues = "${rabbitmq.queue}")
    public void receive(String msg) {
        logger.info("收到消息:{}", msg);
    }
}
测试发送接收

先运行springboot项目,然后编写单元测试用例

代码语言:javascript
复制
      package com.xsh.mq;

  import com.xsh.mq.service.MessageServiceImpl;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;

  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class MqApplicationTests {

      @Test
      public void contextLoads() {
      }

      @Autowired
      private MessageServiceImpl messageService;

      @Value("${rabbitmq.queue}")
      private String queueName;

      @Test
      public void send() {
          messageService.sendMsg(queueName, "delayMsg2", 1000 * 60 * 2);
          messageService.sendMsg(queueName, "delayMsg1", 1000 * 60);
          messageService.sendMsg(queueName, "delayMsg3", 1000 * 60*3);
      }

  }

这里我发送了三条延迟消息,控制台结果如图:

消费者接收到的消息为:

从执行结果来看,demo基本实现,RabbitMQ其他细节还有待继续看。

参考文章:Scheduling Messages with RabbitMQ

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-12-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 准备工作
  • 创建项目
  • 定义ConnectionFactory和RabbitTemplate
  • Exchange和Queue配置
  • 消息发送
  • 消费消息
  • 测试发送接收
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档