RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
RabbitMQ提供了6种工作模式,6中模式如下:
<!-- 引入 rabbitmq 的相关依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
生产者发送消息到队列,有一个或多个消费者,当多个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息,即一个队列中一条消息,只能被一个消费者消费。
代码如下所示,无论是生产者还是消费者都需要先调用getChannel()
方法获取连接,获取连接的步骤是固定的:
首先启动消费者时刻监听hello
队列,一旦有消息传来立即消费,并打印消息到控制台,这里的消费者可以创建多个。然后运行producer()
生产者方法,向队列发送信息。
代码
package cn.com.codingce.test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
public class CodingceHello {
// 队列名称
private final static String QUEUE_NAME = "codingceHelloWorld";
public static void main(String[] args) throws Exception {
// 创建生产者
Channel channel = getChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
///////////////////////////////////////////////////////////
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingceHello Consumer customer1收到消息:{}", message);
}
};
// 创建消费者
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingceHello Consumer customer2收到消息:{}", message);
}
};
// 监听队列并消费消息
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME, true, consumer2);
}
/**
* 获取RabbitMQ连接对象
*/
public static Channel getChannel() throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
return connection.createChannel();
}
}
Log
10:54:23.526 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceHello - CodingceHello Consumer customer1收到消息:Hello World!
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
direct
直连交换机:最简单的交换机类型,它将消息直接路由到与消息中的路由键完全匹配的队列。当路由键与绑定时指定的路由键完全匹配时,消息将被投递到对应的队列。fanout
扇形交换机:扇形交换机将消息广播到绑定到该交换机的所有队列,它忽略消息的路由键,只需将消息发送到所有绑定的队列。topic
主题交换机:主题交换机根据消息的路由键和绑定的模式进行匹配,将消息路由到一个或多个队列。模式可以使用通配符进行匹配,例如*
代表一个单词,#
代表零个或多个单词。headers
头交换机:头交换机根据消息的头部属性进行匹配和路由。消息中的头部属性与绑定时指定的头部属性进行匹配,如果匹配成功,则消息被路由到对应的队列。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列,有两种匹配规则:fanout
只和交换机有关,只要消费者绑定在此交换机就会收到消息;而Direct的消费者只有当交换机和路由键同时匹配才能收到消息;Topic提供更为强大的通配符来表示路由,类似MySQL的like模糊查询功能;Headers与路由键无关,匹配消息头中的属性信息,用的较少。fanout
,有多个消费者消费,也就是发布消息后,所有订阅此交换机的都会收到消息进行消费。direct
,绑定交换机后再通过路由Key确定消费队列。生产者,一个交换机(fanoutExchange),没有路由规则,多个队列,多个消费者。生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。
在发布订阅模式下可以实现一个生产者发送的消息,可以被多个消费者多次消费,之前的消息只能消费一次。来看下面代码,生产者加了交换机名称和路由Key,在本案例中,路由Key等于没用,因为交换机类型设置为fanout
,后文有说明。
而消费者创建了两个q1和q2队列,绑定到my_exchange
队列上进行消费,当发送消息时,两个队列的消费者会同时接收到消息。如果q1有多个消费者,那么只会有一个q1的消费者接收到消息。
代码
package cn.com.codingce.test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
public class CodingcePubSub {
// 交换机名称
private static final String EXCHANGE_NAME = "codingce_exchange";
// 路由Key
private static final String ROUTING_KEY = "codingce_key";
public static void main(String[] args) throws Exception {
// 创建生产者
Channel channel = CodingceHello.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Hello World EXCHANGE_NAME ROUTING_KEY !";
// 比之前多了一个交换机名称,发送6条消息方便测试
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
log.info("CodingcePubSub basicPublish done");
////////////////////////////////////////////////
// 声明一个交换机,并设置其类型为"fanout"
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明一个队列并绑定交换机和路由Key
String queueName = channel.queueDeclare("q1", false, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
String queueName2 = channel.queueDeclare("q2", false, false, false, null).getQueue();
channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingcePubSub Consumer handleDelivery q1 customer收到消息:{}", message);
}
};
Consumer consumer1 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingcePubSub Consumer handleDelivery q11 1customer收到消息:{}", message);
}
};
// q1队列有两个消费者,但每次只会有一个q1的消费者收到消息
channel.basicConsume("q1", true, consumer);
channel.basicConsume("q1", true, consumer1);
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingcePubSub Consumer handleDelivery q2 customer收到消息:{}", message);
}
};
// q2只绑定一个消费者,所以这个消费者100%会收到消息。
channel.basicConsume("q2", true, consumer2);
}
}
Log
10:51:56.868 [main] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub basicPublish done
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.877 [pool-1-thread-4] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q1 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-6] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-7] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
10:51:56.884 [pool-1-thread-7] INFO cn.com.codingce.test.CodingcePubSub - CodingcePubSub Consumer handleDelivery q2 customer收到消息:Hello World EXCHANGE_NAME ROUTING_KEY !
生产者分别向ROUTING_KEY1和ROUTING_KEY2发送一条消息,两个Customer都只监听ROUTING_KEY1,控制台只打印ROUTING_KEY1的两条消息。
package cn.com.codingce.test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
public class CodingceRouting {
// 交换机名称
private static final String EXCHANGE_NAME = "codingce_exchange_routing";
// 路由Key
private static final String ROUTING_KEY1 = "routing1";
private static final String ROUTING_KEY2 = "routing2";
public static void main(String[] args) throws Exception {
// 创建生产者
Channel channel = CodingceHello.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "ROUTING_KEY1 我是1的消息";
String message2 = "ROUTING_KEY2 我是2的消息";
// 比之前多了一个交换机名称
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY1, null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY2, null, message2.getBytes());
log.info("CodingceRouting basicPublish done");
/////////////////////////////////////////////////////////////////
// 声明一个交换机,并设置其类型为"fanout"
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明一个队列并绑定交换机和路由Key
String queueName = channel.queueDeclare("r1", false, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
String queueName2 = channel.queueDeclare("r2", false, false, false, null).getQueue();
channel.queueBind(queueName2, EXCHANGE_NAME, ROUTING_KEY1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingceRouting r1 customer收到消息:{}", message);
}
};
channel.basicConsume("r1", true, consumer);
Consumer consumer2 = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("CodingceRouting r2 customer收到消息:{}", message);
}
};
channel.basicConsume("r2", true, consumer2);
}
}
Log
11:01:05.262 [main] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting basicPublish done
11:01:05.270 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting r1 customer收到消息:ROUTING_KEY1 我是1的消息
11:01:05.271 [pool-1-thread-4] INFO cn.com.codingce.test.CodingceRouting - CodingceRouting r2 customer收到消息:ROUTING_KEY1 我是1的消息
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。