
在安装完 RabbitMQ 之后,我们接下来学习如何去使用 RabbitMQ。
在上一个篇幅,我们讲了 RabbitMQ 的安装,并安装了管理界面。

界面上的导航栏共分6部分,这6部分分别是什么意思呢,我们先看看 RabbitMQ 的工作流程。

RabbitMQ 是一个消息中间件,也是一个生产者消费者模型。它负责接收,存储并转发消息。
消息传递的过程类似邮局。当你要发送一个邮件时,你把你的邮件放到邮局,邮局接收邮件,并通过邮递员送到收件人的手上。
按照这个逻辑,Producer 就类似邮件发件人。Consumer 就是收件人,RabbitMQ 就类似于邮局。

Producer 和 ConsumerProducer:生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息。
Consumer:消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息。
Broker:其实就是 RabbitMQ Server,主要是接收和收发消息。

RabbitMQ 中。在实际应用中,消息通常是一个带有一定业务逻辑结构的数据,比如 JSON 字符串。消息可以带有一定的标签,RabbitMQ 会根据标签进行路由,把消息发送给感兴趣的消费者(Consumer)。RabbitMQ 服务器,就可以消费消息了,消费的过程中,标签会被丢掉。消费者只会收到消息,并不知道消息的生产者是谁,当然消费者也不需要知道。RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。Connection 和 ChannelConnection:连接。是客户端和 RabbitMQ 服务器之间的一个 TCP 连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息。
Channel:通道,信道。Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel 的。

通道的主要作用:将消息的读写操作复用到同一个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能。
VirtualhostVirtualhost:虚拟主机。这是一个虚拟概念。它为消息队列提供了一种逻辑上的隔离机制。
对于 RabbitMQ 而言,一个 Broker Server 上可以存在多个 Virtual Host。当多个不同的用户使用同一个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
类似 MySQL 的 database,是一个逻辑上的集合。一个 MySQL 服务器可以有多个 database。
QueueQueue:队列,是 RabbitMQ 的内部对象,用于存储消息。

多个消费者,可以订阅同一个队列;反过来,一个消费者也可以订阅多个队列。
ExchangeExchange:交换机。message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中。
Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息。

类似于发快递之后,物流公司怎么处理呢,根据咱们的地址来分派这个快递到不同的站点,然后再送到收件人手里。这个分配的工作,就是交换机来做的。
理解了上面的概念之后,再来回顾一下这个图,来看 RabbitMQ 的工作流程

Producer 生产了一条消息Producer 连接到 RabbitMQ Broker,建立一个 Connection,开启一个信道 ChannelProducer 声明一个交换机(Exchange),路由消息Producer 声明一个队列(Queue),存放信息Producer 发送消息至 RabbitMQ BrokerRabbitMQ Broker 接收消息,并存入相应的队列(Queue)中,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。AMQP(Advanced Message Queuing Protocol)是一种高级消息队列协议,AMQP 定义了一套确定的消息交换功能,包括交换器(Exchange),队列(Queue)等。这些组件共同工作,使得生产者能够将消息发送到交换器。然后由队列接收并等待消费者接收。AMQP 还定义了一个网络协议,允许客户端应用通过该协议与消息代理和 AMQP 模型进行交互通信
RabbitMQ 是 AMQP 协议的 Erlang的具体实现。

步骤如下所示:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>RabbitMQ 默认的用于客户端连接的 TCP 端口号是 5672,需要提前进行开放
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("113.45.137.183"); // 设置ip
factory.setPort(5672); // 设置端口
factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
factory.setUsername("admin"); // 设置用户名
factory.setPassword("123123"); // 设置密码
// 3. 创建连接Connection
Connection connection = factory.newConnection();注意:生产者和消费者创建的 channel 并不是同一个
// 4. 创建channel通道
Channel channel = connection.createChannel();/* queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments)
*/
channel.queueDeclare("hello", true, false, false, null);"hello"。true:队列在 RabbitMQ 服务重启 后仍然存在。false:服务一重启,队列就没了。MessageProperties.PERSISTENT_TEXT_PLAIN 才能持久保存。true:该队列只允许 当前连接 使用,并且连接关闭后自动删除。false:队列可以被多个连接/信道访问。true:当最后一个消费者取消订阅后,队列会自动删除。false:不自动删除。x-message-ttlx-max-lengthx-dead-letter-exchangenull,表示没有额外配置。当一个新的 RabbitMQ 节点启动时,它会预声明(declare)几个内置的交换机,内置交换机名称是空字符串("")。生产者发送的消息会根据队列名称直接路由到对应的队列。

例如:如果有一个名为 "hello" 的队列,生产者可以直接发送消息到 "hello" 队列,而消费者可以从 "hello" 队列中接收消息,而不需要关心交换机的存在。这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是一对一的。

// 6. 通过channel发送消息到队列中
/*
basicPublish(String exchange,
String routingKey,
AMQP.BasicProperties props,
byte[] body)
*/
String msg = "Hello World";
// 使用的是内置交换机. 使用内置交换机时, routingKey要和队列名称一样, 才可以路由到对应的队列上去
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println(msg + "消息发送成功");exchange → 消息发到哪个交换机routingKey → 消息怎么路由props → 消息的附加属性(持久化、格式、headers 等)body → 消息内容(类型为字节数组 byte[])// 显式地关闭Channel是个好习惯,但这不是必须的,Connection关闭的时候,Channel也会自动关闭。
channel.close();
connection.close();消费者代码和生产者前三步都是一样的,第四步改为消费当前队列:
下面只讲消费消息的内容,因为其它部分和生产者是一样的!
/*
参数:
1. queue: 队列名称
2. autoAck: 是否自动确认。true会直接将消息删除,false需要消费者手动删除消息
3. callback: 收到消息后进行具体操作的回调对象
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;其中 Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息时,需要提供一个实现了 Consumer 接口的对象。
DefaultConsumer 是 RabbitMQ 提供的一个默认消费者,实现了 Consumer 接口。
我们需要实现这个默认消费者中的核心方法:
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body):从队列接收到消息时,会自动调用该方法。consumerTag:消费者标签,通常是消费者在订阅队列时指定的。envelope:包含消息的封包信息,如队列名称,交换机等。properties:一些配置信息body:消息的具体内容在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等。
// 6. 接收消息, 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume("hello", true, consumer);public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("113.45.137.183"); // 设置ip
factory.setPort(5672); // 设置端口
factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
factory.setUsername("admin"); // 设置用户名
factory.setPassword("123123"); // 设置密码
// 3. 创建连接Connection
Connection connection = factory.newConnection();
// 4. 通过连接Connection创建通道Channel
Channel channel = connection.createChannel();
// 5. 声明一个队列
channel.queueDeclare("test", true, false, false, null);
// 6. 发送消息(当使用内置交换机的时候,routingKey必须和队列名称保持一致)
String text = "hello lirendada!";
channel.basicPublish("", "test", null, text.getBytes(StandardCharsets.UTF_8));
System.out.println(text + "消息发送成功");
// 7. 释放资源
connection.close();
}
}public class consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂(同生产者)
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数(同生产者)
factory.setHost("113.45.137.183"); // 设置ip
factory.setPort(5672); // 设置端口
factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
factory.setUsername("admin"); // 设置用户名
factory.setPassword("123123"); // 设置密码
// 3. 创建连接Connection(同生产者)
Connection connection = factory.newConnection();
// 4. 通过连接Connection创建通道Channel(同生产者)
Channel channel = connection.createChannel();
// 5. 声明一个队列(同生产者)(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
channel.queueDeclare("test", true, false, false, null);
// 6. 接收消息,进行消费💥
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume("test", true, defaultConsumer);
}
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。