RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收 件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型 。 如图:
Producer: 生产者,就是投递消息的 一方。
生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签 CLabel) 。消息体也可以称之为
payload ,在实际应用中,消 息体 一般是一个带有业务逻辑结构 的数据,比如一个 JSON 字符串。当然可以进一步对这个
消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键生产者把消息交由 RabbitMQ ,
RabbitMQ 之后会根据标签把消息发送给感兴趣 的消费者(Consumer)。
public class Procuder {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
//2 通过连接工厂创建连接
Connection connection = ConnectionUtils.getConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 通过Channel发送数据
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//发送一条消息包括消息体和标签
//1 exchange 2 routingKey 用来表述这条消息 , 比如一个交换器的名称和一个路由键
channel.basicPublish("", "test001", null, msg.getBytes());
}
//5 记得要关闭相关的连接
channel.close();
connection.close();
}
}
public class ConnectionUtils {
private static final String IP_ADDRESS = "192.168.1.188";
private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672
/**
* 获取mq的链接 定义一个链接工厂
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException{
//定义一个连接工厂
ConnectionFactory factory =new ConnectionFactory();
//设置服务地址
factory.setHost(IP_ADDRESS);
//AMQP 5672
factory.setPort(PORT);
//vhost
factory.setVirtualHost("/sunfeng");
//用户名
factory.setUsername("sunfeng");
//密码
factory.setPassword("sunfeng");
return factory.newConnection();
}
}
Consumer: 消费者,就是接收消息的 一方。
消费者连接到 RabbitMQ 服务器,并订阅到队列上 。 当消费者消费一条消息时 , 只是消费
消息的消息体 C payload ) 。 在消息路由的过程中 , 消息的标签会丢弃 , 存入到队列中的消息只
有消息体,消费者也只会消费到消息体 , 也就不知道消息的生产者是谁,当然消费者也不需要
知道 。
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
//2 通过连接工厂创建连接
Connection connection = ConnectionUtils.getConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
//6 设置Channel
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
System.out.println(" recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
启动消费者 然后启动生产者 即可以看到如下消息
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/repository/ch/qos/logback/logback-classic/1.1.11/logback-classic-1.1.11.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
recv message: Hello RabbitMQ!
recv message: Hello RabbitMQ!
recv message: Hello RabbitMQ!
recv message: Hello RabbitMQ!
recv message: Hello RabbitMQ!
Broker: 消息中间件的服务节点 。
对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可 以简单地看作一个 RabbitMQ 服务节点 ,
或者 RabbitMQ 服务实例 。 大多数情况下也可 以将一个 RabbitMQ Broker 看作一 台 RabbitMQ
服务器 。
图 2-2 展示 了 生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整 个流程。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。