首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【RabbitMQ】RabbitMQ核心概念 && AMQP && RabbitMQ快速入门

【RabbitMQ】RabbitMQ核心概念 && AMQP && RabbitMQ快速入门

原创
作者头像
lirendada
发布2026-01-10 17:13:43
发布2026-01-10 17:13:43
3640
举报
文章被收录于专栏:中间件中间件

Ⅰ. RabbitMQ核心概念

在安装完 RabbitMQ 之后,我们接下来学习如何去使用 RabbitMQ

在上一个篇幅,我们讲了 RabbitMQ 的安装,并安装了管理界面。

界面上的导航栏共分6部分,这6部分分别是什么意思呢,我们先看看 RabbitMQ 的工作流程。

RabbitMQ 是一个消息中间件,也是一个生产者消费者模型。它负责接收,存储并转发消息。

消息传递的过程类似邮局。当你要发送一个邮件时,你把你的邮件放到邮局,邮局接收邮件,并通过邮递员送到收件人的手上。

按照这个逻辑,Producer 就类似邮件发件人。Consumer 就是收件人,RabbitMQ 就类似于邮局。

一、ProducerConsumer

Producer:生产者,是 RabbitMQ Server 的客户端,RabbitMQ 发送消息

Consumer:消费者,也是 RabbitMQ Server 的客户端,RabbitMQ 接收消息

Broker其实就是 RabbitMQ Server,主要是接收和收发消息。

  • 生产者(Producer)创建消息,然后发布到 RabbitMQ 中。在实际应用中,消息通常是一个带有一定业务逻辑结构的数据,比如 JSON 字符串。消息可以带有一定的标签,RabbitMQ 会根据标签进行路由,把消息发送给感兴趣的消费者(Consumer)。
  • 消费者(Consumer)连接到 RabbitMQ 服务器,就可以消费消息了,消费的过程中,标签会被丢掉。消费者只会收到消息,并不知道消息的生产者是谁,当然消费者也不需要知道。
  • 对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

二、ConnectionChannel

Connection:连接。是客户端和 RabbitMQ 服务器之间的一个 TCP 连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息。

Channel:通道,信道。Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel

通道的主要作用:将消息的读写操作复用到同一个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能

三、Virtualhost

Virtualhost:虚拟主机。这是一个虚拟概念。它为消息队列提供了一种逻辑上的隔离机制。

对于 RabbitMQ 而言,一个 Broker Server 上可以存在多个 Virtual Host。当多个不同的用户使用同一个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

类似 MySQLdatabase,是一个逻辑上的集合一个 MySQL 服务器可以有多个 database

四、Queue

Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。

多个消费者,可以订阅同一个队列;反过来,一个消费者也可以订阅多个队列。

五、Exchange

Exchange:交换机。message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中。

Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息

类似于发快递之后,物流公司怎么处理呢,根据咱们的地址来分派这个快递到不同的站点,然后再送到收件人手里。这个分配的工作,就是交换机来做的。

六、RabbitMQ工作流程

理解了上面的概念之后,再来回顾一下这个图,来看 RabbitMQ 的工作流程

  1. Producer 生产了一条消息
  2. Producer 连接到 RabbitMQ Broker,建立一个 Connection,开启一个信道 Channel
  3. Producer 声明一个交换机(Exchange),路由消息
  4. Producer 声明一个队列(Queue),存放信息
  5. Producer 发送消息至 RabbitMQ Broker
  6. RabbitMQ Broker 接收消息,并存入相应的队列(Queue)中,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。

Ⅱ. AMQP

AMQPAdvanced Message Queuing Protocol)是一种高级消息队列协议,AMQP 定义了一套确定的消息交换功能,包括交换器(Exchange),队列(Queue)等。这些组件共同工作,使得生产者能够将消息发送到交换器。然后由队列接收并等待消费者接收。AMQP 还定义了一个网络协议,允许客户端应用通过该协议与消息代理和 AMQP 模型进行交互通信

RabbitMQAMQP 协议的 Erlang的具体实现

Ⅲ. RabbitMQ快速入门

步骤如下所示:

  1. 引入依赖
  2. 编写生产者代码
  3. 编写消费者代码

一、引入依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

二、编写生产者代码

1. 创建连接

RabbitMQ 默认的用于客户端连接的 TCP 端口号是 5672,需要提前进行开放

代码语言:javascript
复制
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 2. 设置参数
factory.setHost("113.45.137.183");   // 设置ip
factory.setPort(5672);               // 设置端口
factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
factory.setUsername("admin");        // 设置用户名
factory.setPassword("123123");       // 设置密码

// 3. 创建连接Connection
Connection connection = factory.newConnection();

2. 创建Channel

注意:生产者和消费者创建的 channel 并不是同一个

代码语言:javascript
复制
// 4. 创建channel通道
Channel channel = connection.createChannel();

3. 声明一个队列Queue

代码语言:javascript
复制
/* queueDeclare(String queue, 
                boolean durable, 
                boolean exclusive, 
                boolean autoDelete, 
                Map<String, Object> arguments)
*/
channel.queueDeclare("hello", true, false, false, null);
  1. queue
    1. 队列的名字。这里叫 "hello"
    2. RabbitMQ 里所有操作(发送消息、消费消息)都是基于队列名来进行的。
  2. durable
    1. 是否持久化队列。
    2. true:队列在 RabbitMQ 服务重启 后仍然存在。
    3. false:服务一重启,队列就没了。
    4. 注意:队列持久化 ≠ 消息持久化,消息要额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN 才能持久保存。
  3. exclusive
    1. 是否为 排他队列
    2. true:该队列只允许 当前连接 使用,并且连接关闭后自动删除
    3. false:队列可以被多个连接/信道访问。
    4. 常用于临时队列(比如 RPC 返回队列)。
  4. autoDelete
    1. 是否 自动删除
    2. true当最后一个消费者取消订阅后,队列会自动删除
    3. false:不自动删除。
    4. 常用于临时性的工作队列。
  5. arguments
    1. 额外参数,用来对队列做一些高级配置。
    2. 比如:
      • 设置消息的 TTL(存活时间)x-message-ttl
      • 设置 最大长度x-max-length
      • 配置 死信队列x-dead-letter-exchange
    3. 这里写 null,表示没有额外配置。

4. 发送消息

当一个新的 RabbitMQ 节点启动时,它会预声明(declare)几个内置的交换机,内置交换机名称是空字符串("")。生产者发送的消息会根据队列名称直接路由到对应的队列。

例如:如果有一个名为 "hello" 的队列,生产者可以直接发送消息到 "hello" 队列,而消费者可以从 "hello" 队列中接收消息,而不需要关心交换机的存在。这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是一对一的。

代码语言:javascript
复制
// 6. 通过channel发送消息到队列中
/*
basicPublish(String exchange, 
            String routingKey, 
            AMQP.BasicProperties props,
            byte[] body)
*/
String msg = "Hello World";
// 使用的是内置交换机. 使用内置交换机时, routingKey要和队列名称一样, 才可以路由到对应的队列上去
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println(msg + "消息发送成功");
  • exchange → 消息发到哪个交换机
  • routingKey → 消息怎么路由
  • props → 消息的附加属性(持久化、格式、headers 等)
  • body → 消息内容(类型为字节数组 byte[]

5. 释放资源

代码语言:javascript
复制
// 显式地关闭Channel是个好习惯,但这不是必须的,Connection关闭的时候,Channel也会自动关闭。
channel.close();
connection.close();

三、编写消费者代码

消费者代码和生产者前三步都是一样的,第四步改为消费当前队列:

  1. 创建连接
  2. 创建Channel
  3. 声明一个队列Queue
  4. 消费消息
  5. 释放资源(实际上消费者相当于是一个监听程序,不需要关闭资源)

下面只讲消费消息的内容,因为其它部分和生产者是一样的!

代码语言:javascript
复制
/*
  参数:
    1. queue: 队列名称
    2. autoAck: 是否自动确认。true会直接将消息删除,false需要消费者手动删除消息
    3. callback: 收到消息后进行具体操作的回调对象
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

其中 Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息时,需要提供一个实现了 Consumer 接口的对象。

DefaultConsumer 是 RabbitMQ 提供的一个默认消费者实现了 Consumer 接口

我们需要实现这个默认消费者中的核心方法:

  • handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body):从队列接收到消息时,会自动调用该方法。
  • 参数说明如下:
    • consumerTag:消费者标签,通常是消费者在订阅队列时指定的。
    • envelope:包含消息的封包信息,如队列名称,交换机等。
    • properties:一些配置信息
    • body:消息的具体内容

在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等。

代码语言:javascript
复制
// 6. 接收消息, 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息: " + new String(body));
    }
};
channel.basicConsume("hello", true, consumer);

四、完整代码

生产者代码

代码语言:javascript
复制
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2. 设置参数
        factory.setHost("113.45.137.183");   // 设置ip
        factory.setPort(5672);               // 设置端口
        factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
        factory.setUsername("admin");        // 设置用户名
        factory.setPassword("123123");       // 设置密码

        // 3. 创建连接Connection
        Connection connection = factory.newConnection();

        // 4. 通过连接Connection创建通道Channel
        Channel channel = connection.createChannel();

        // 5. 声明一个队列
        channel.queueDeclare("test", true, false, false, null);

        // 6. 发送消息(当使用内置交换机的时候,routingKey必须和队列名称保持一致)
        String text = "hello lirendada!";
        channel.basicPublish("", "test", null, text.getBytes(StandardCharsets.UTF_8));
        System.out.println(text + "消息发送成功");

        // 7. 释放资源
        connection.close();
    }
}

消费者代码

代码语言:javascript
复制
public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂(同生产者)
        ConnectionFactory factory = new ConnectionFactory();

        // 2. 设置参数(同生产者)
        factory.setHost("113.45.137.183");   // 设置ip
        factory.setPort(5672);               // 设置端口
        factory.setVirtualHost("lirendada"); // 设置虚拟主机名称
        factory.setUsername("admin");        // 设置用户名
        factory.setPassword("123123");       // 设置密码

        // 3. 创建连接Connection(同生产者)
        Connection connection = factory.newConnection();

        // 4. 通过连接Connection创建通道Channel(同生产者)
        Channel channel = connection.createChannel();

        // 5. 声明一个队列(同生产者)(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
        channel.queueDeclare("test", true, false, false, null);

        // 6. 接收消息,进行消费💥
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume("test", true, defaultConsumer);
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Ⅰ. RabbitMQ核心概念
    • 一、Producer 和 Consumer
    • 二、Connection 和 Channel
    • 三、Virtualhost
    • 四、Queue
    • 五、Exchange
    • 六、RabbitMQ工作流程
  • Ⅱ. AMQP
  • Ⅲ. RabbitMQ快速入门
    • 一、引入依赖
    • 二、编写生产者代码
      • 1. 创建连接
      • 2. 创建Channel
      • 3. 声明一个队列Queue
      • 4. 发送消息
      • 5. 释放资源
    • 三、编写消费者代码
    • 四、完整代码
      • 生产者代码
      • 消费者代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档