前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Springboot 整合RabbitMQ ---基于Class的开发

Springboot 整合RabbitMQ ---基于Class的开发

作者头像
用户5927264
发布2019-08-01 10:24:37
4670
发布2019-08-01 10:24:37
举报
文章被收录于专栏:OSChina

1 加载配置文件

代码语言:javascript
复制
package com.zjxnjz.mall.core.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  


/**
 * 
 * @author daiyy
 * RabbitMQ配置读取文件
 */
@Configuration
//@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
	@Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    // 构建mq实例工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

2 创建实体对象(交换机,队列,俩者的绑定)

代码语言:javascript
复制
package com.zjxnjz.mall.core.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.zjxnjz.mall.core.util.RabbitMqEnum;

/**
 * 用于配置交换机和队列对应关系
 * 新增消息队列应该按照如下步骤
 * 1、增加queue bean,参见queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author daiyy
 * @create 2018-4-12 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
	/** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

    /**
     * @Author:daiyy
     * @Description: 主题型交换机
     * @param
     * @return
     */
    /*@Bean
    TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主题型交换机bean实例化");
        return contractTopicExchange;
    }*/
    /**
     * 直连型交换机
     */
    @Bean
    DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
        DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
        rabbitAdmin.declareExchange(contractDirectExchange);
        logger.debug("完成直连型交换机bean实例化");
        return contractDirectExchange;
    }

    //在此可以定义队列

    @Bean
    Queue queueTest(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("测试队列实例化完成");
        return queue;
    }

    /*//topic 1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列1实例化完成");
        return queue;
    }
    //topic 2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列2实例化完成");
        return queue;
    }*/


    //在此处完成队列和交换机绑定
    @Bean
    Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与直连型交换机绑定完成");
        return binding;
    }
    //topic binding1
    /*@Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机1绑定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机2绑定完成");
        return binding;
    }*/
    
    
    /**
     * -----------------------------------app 端用户添加/和积分变动
     */
    
    // 话题性 交换机  
    @Bean
    TopicExchange appDirectExchange(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主题型交换机bean实例化");
        return contractTopicExchange;
    }
    
    //  定义俩个对列   app端用户添加队列  -----queue1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_USERADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue1-- 实例化完成");
        return queue;
    }
    //topic    app 端积分变动队列-----queue2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_INTRAGATLADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue2-- 实例化完成");
        return queue;
    }
    
    //  积分商城用户添加队列-----queue3
    @Bean  
    Queue queueTopicTest3(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue3-- 实例化完成");
        return queue;
    }
    //topic   积分商城积分变动队列-----queue4
    @Bean
    Queue queueTopicTest4(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue4-- 实例化完成");
        return queue;
    }
    
    
    //topic binding1 话题型交换机与队列的绑定
    @Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("队列与话题交换机绑定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("队列与话题交换机绑定完成");
        return binding;
    }
}

3 创建发送者工具类

代码语言:javascript
复制
package com.zjxnjz.mall.core.util;

import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * rabbitmq发送消息工具类
 *
 * @author daiyy
 * @create 2018-4-12 上午10:33
 **/
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("confirm: " + correlationData.getId());
    }

    /**
     * 发送到 指定routekey的指定queue
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqDirect(String routeKey,Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
    }

    /**
     * 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqTopic(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
    }
    
    /**
     * ----------------------------------start - SHF-----------------------------------
     * 发送到   用户添加的队列中 
     * 指定交换机 和  路由规则
     * @param obj
     */
    public void sendUserAddTopic(Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(), RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode(), obj, correlationData);
    }

    
    /**
     * 发送到 积分变动的 queue
     * 指定交换机 和  路由规则 
     * @param obj
     */
    public void sendIntragatlAddTopic(Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(),  RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode() , obj, correlationData);
    }
    //----------------------------------end-----------------------------------
    
    /**
     * 从指定队列中接受消息
     * @author SHF
     * @version 创建时间:2018年7月27日  下午2:17:33
     *  @param queueName
     *  @return
     */
    public Object reciveRabbitMqObject(String queueName) {
    	Object object = this.rabbitTemplate.receiveAndConvert(queueName);
    	return object;
    }

}

4 创建 消费者工具类 (监听)

代码语言:javascript
复制
package com.zjxnjz.mall.restfulapi.modular.intragatl;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zjxnjz.mall.core.config.RabbitMqConfig;
import com.zjxnjz.mall.core.util.RabbitMqEnum;

@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class UserIntragatlListener {
	
	@Autowired
	private UserSynchronization userSynchronization;
	
	@Autowired
	private IntragatlSynchronization intragatlSynchronization;
	
	
	/**
	 *  *************************用户添加 监听***************************
	 * @author SHF
	 * @version 创建时间:2018年7月30日  下午4:11:50
	 *  @param connectionFactory
	 *  @return
	 */
    @Bean("userAddContainer")
    public MessageListenerContainer userAddListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
        container.setMessageListener(userAddListener1());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

    @Bean("userAddListener")
    public ChannelAwareMessageListener userAddListener1(){
    	
        return new ChannelAwareMessageListener() {
			@Override
			public void onMessage(Message message, Channel channel) throws Exception {
				Map map= JSON.parseObject(new String(message.getBody()));
                System.out.println("QUEUE_APP_USERADD ===" + map);
                userSynchronization.reciveUserAddFromBuliss(map); //调用 用户添加函数
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
			}
        };
    }
    
    
    /**
     * *************************积分变动 监听***************************
     * @author SHF
     * @version 创建时间:2018年7月30日  下午4:12:15
     *  @param connectionFactory
     *  @return
     */
    @Bean("IntragatlChangeContainer")
    public MessageListenerContainer IntragatlChangeListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
        container.setMessageListener(IntragatlChangeListener2());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

    @Bean("IntragatlChangeListener")
    public ChannelAwareMessageListener IntragatlChangeListener2(){
    	
        return new ChannelAwareMessageListener() {
			@Override
			public void onMessage(Message message, Channel channel) throws Exception {
				Map map= JSON.parseObject(new String(message.getBody()));
                System.out.println("QUEUE_APP_USERADD ===" + map);
                intragatlSynchronization.bqylIntragatlChange(map); //调用积分变动函数
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
			}
        };
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档