1 加载配置文件
package com.zjxnjz.mall.core.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
/**
*
* @author daiyy
* RabbitMQ配置读取文件
*/
@Configuration
//@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.publisher-confirms}")
private Boolean publisherConfirms;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
// 构建mq实例工厂
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
2 创建实体对象(交换机,队列,俩者的绑定)
package com.zjxnjz.mall.core.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.zjxnjz.mall.core.util.RabbitMqEnum;
/**
* 用于配置交换机和队列对应关系
* 新增消息队列应该按照如下步骤
* 1、增加queue bean,参见queueXXXX方法
* 2、增加queue和exchange的binding
* @author daiyy
* @create 2018-4-12 上午10:33
**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);
/**
* @Author:daiyy
* @Description: 主题型交换机
* @param
* @return
*/
/*@Bean
TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
rabbitAdmin.declareExchange(contractTopicExchange);
logger.debug("完成主题型交换机bean实例化");
return contractTopicExchange;
}*/
/**
* 直连型交换机
*/
@Bean
DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
rabbitAdmin.declareExchange(contractDirectExchange);
logger.debug("完成直连型交换机bean实例化");
return contractDirectExchange;
}
//在此可以定义队列
@Bean
Queue queueTest(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("测试队列实例化完成");
return queue;
}
/*//topic 1
@Bean
Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("话题测试队列1实例化完成");
return queue;
}
//topic 2
@Bean
Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("话题测试队列2实例化完成");
return queue;
}*/
//在此处完成队列和交换机绑定
@Bean
Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("测试队列与直连型交换机绑定完成");
return binding;
}
//topic binding1
/*@Bean
Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("测试队列与话题交换机1绑定完成");
return binding;
}
//topic binding2
@Bean
Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("测试队列与话题交换机2绑定完成");
return binding;
}*/
/**
* -----------------------------------app 端用户添加/和积分变动
*/
// 话题性 交换机
@Bean
TopicExchange appDirectExchange(RabbitAdmin rabbitAdmin){
TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode());
rabbitAdmin.declareExchange(contractTopicExchange);
logger.debug("完成主题型交换机bean实例化");
return contractTopicExchange;
}
// 定义俩个对列 app端用户添加队列 -----queue1
@Bean
Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_USERADD.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("--queue1-- 实例化完成");
return queue;
}
//topic app 端积分变动队列-----queue2
@Bean
Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_INTRAGATLADD.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("--queue2-- 实例化完成");
return queue;
}
// 积分商城用户添加队列-----queue3
@Bean
Queue queueTopicTest3(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("--queue3-- 实例化完成");
return queue;
}
//topic 积分商城积分变动队列-----queue4
@Bean
Queue queueTopicTest4(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("--queue4-- 实例化完成");
return queue;
}
//topic binding1 话题型交换机与队列的绑定
@Bean
Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("队列与话题交换机绑定完成");
return binding;
}
//topic binding2
@Bean
Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("队列与话题交换机绑定完成");
return binding;
}
}
3 创建发送者工具类
package com.zjxnjz.mall.core.util;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* rabbitmq发送消息工具类
*
* @author daiyy
* @create 2018-4-12 上午10:33
**/
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMqSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
logger.info("confirm: " + correlationData.getId());
}
/**
* 发送到 指定routekey的指定queue
* @param routeKey
* @param obj
*/
public void sendRabbitmqDirect(String routeKey,Object obj)throws Exception {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
}
/**
* 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
* @param routeKey
* @param obj
*/
public void sendRabbitmqTopic(String routeKey,Object obj) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
}
/**
* ----------------------------------start - SHF-----------------------------------
* 发送到 用户添加的队列中
* 指定交换机 和 路由规则
* @param obj
*/
public void sendUserAddTopic(Object obj)throws Exception {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(), RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode(), obj, correlationData);
}
/**
* 发送到 积分变动的 queue
* 指定交换机 和 路由规则
* @param obj
*/
public void sendIntragatlAddTopic(Object obj)throws Exception {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(), RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode() , obj, correlationData);
}
//----------------------------------end-----------------------------------
/**
* 从指定队列中接受消息
* @author SHF
* @version 创建时间:2018年7月27日 下午2:17:33
* @param queueName
* @return
*/
public Object reciveRabbitMqObject(String queueName) {
Object object = this.rabbitTemplate.receiveAndConvert(queueName);
return object;
}
}
4 创建 消费者工具类 (监听)
package com.zjxnjz.mall.restfulapi.modular.intragatl;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zjxnjz.mall.core.config.RabbitMqConfig;
import com.zjxnjz.mall.core.util.RabbitMqEnum;
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class UserIntragatlListener {
@Autowired
private UserSynchronization userSynchronization;
@Autowired
private IntragatlSynchronization intragatlSynchronization;
/**
* *************************用户添加 监听***************************
* @author SHF
* @version 创建时间:2018年7月30日 下午4:11:50
* @param connectionFactory
* @return
*/
@Bean("userAddContainer")
public MessageListenerContainer userAddListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
container.setMessageListener(userAddListener1());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean("userAddListener")
public ChannelAwareMessageListener userAddListener1(){
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Map map= JSON.parseObject(new String(message.getBody()));
System.out.println("QUEUE_APP_USERADD ===" + map);
userSynchronization.reciveUserAddFromBuliss(map); //调用 用户添加函数
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
};
}
/**
* *************************积分变动 监听***************************
* @author SHF
* @version 创建时间:2018年7月30日 下午4:12:15
* @param connectionFactory
* @return
*/
@Bean("IntragatlChangeContainer")
public MessageListenerContainer IntragatlChangeListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
container.setMessageListener(IntragatlChangeListener2());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean("IntragatlChangeListener")
public ChannelAwareMessageListener IntragatlChangeListener2(){
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Map map= JSON.parseObject(new String(message.getBody()));
System.out.println("QUEUE_APP_USERADD ===" + map);
intragatlSynchronization.bqylIntragatlChange(map); //调用积分变动函数
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
};
}
}