
先以 java 应用程序的方式连接 RabbitMQ ,后期再考虑与 Spring Boot 整合工作。
生产者和消费者、RabbitMQ 可以不用位于同一个主机,一个应用程序即可以生产者,也可以是消费者。
交换器类型
direct:根据路由键匹配队列发送消息。 topic:根据路由键匹配队列,可以全匹配和模糊匹配。 headers: fanout:消息广播。
pom文件
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>竞争者的消费模式。
流程图
这里虽然交换器是空的,但是 RabbitMQ 有一个默认的交换器,如果不设置则会发送到默认的交换器上去。
特点
如果存在多个消费者,那么 mq 将会以轮询的方式发送消息,并且消息只能被消费一次。
代码演示
生产者
public class Producer {
public static final String TEST_QUEUE = "test_queue";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setUsername("XX");
connectionFactory.setPassword("CC$");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(TEST_QUEUE, true, false, false, null);
String message = "Hello World!";
// 发送消息
channel.basicPublish("", TEST_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}解释一下以上部分方法的参数作用:
【队列名称】
【是否持久化,如果为true,则 mq 重启后,消息队列还在。】
【是否是排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。】
【自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。】
【附加参数】
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments);【交换器名称】
【路由键】
【当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。默认是false】
【当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。默认是false】
【消息的附加属性】
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);设置消息的持久化,deliveryMode = 1 为不持久,deliveryMode = 2 为持久,mq重启后消息不会丢失。
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);消费者
public class Consumer {
public static final String TEST_QUEUE = "test_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("XXX");
factory.setPassword("XXX$");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 确保该队列存在
channel.queueDeclare(TEST_QUEUE, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 消费端能接收的消息数
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
// 手动消息应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TEST_QUEUE, false, deliverCallback, consumerTag -> {});
}
}ack 机制:
RabbitMQ 分手动 ack 和自动 ack,如果设置为自动 ack,那么 mq 在发送消息后会立即删除消息,如果消费者消费消息出现异常则消息不会重新发送,因为消息已经被删除了,同时如果消费者被异常终止,则消费者所接收到未处理的消息则会丢失。而手动 ack 则会等待 ack 确认之后才会删除消息,如果 channel 被关闭,或者 Connection 被关闭,或者 TCP 连接丢失,则消息会进行重新排队。
方法参数作用:
【消息内容限制,单位是字节】
【接收mq的消息数量限制。假设有两个消费者,一个定义当前参数未设置为 A,一个设置 2 为 B,当生产者发送 100 条消息给消费者时,B 轮询接收到了 2 条消息后将停止消息接收,直到轮询时 B 可再次接收消息时再次接收】
【是否将当前设置应用于整个 channel 而不是每个消费者】
void basicQos(int prefetchSize, int prefetchCount, boolean global);basicQos 配合 autoAck = falsechannel.basicConsume(TEST_QUEUE, false, deliverCallback, consumerTag -> {});使用。如果不将 autoAck 设置为 false,那么 basicQos 的设置是无效的,因为 mq 不会查看消费者未确认的消息数,它只会不停的发送消息给消费者。
RabbitMQ 消息传递的核心是生产者从不将消息直接发送到队列中,实际上生产者根本不知道是否将消息传递到其他队列中。生产者只能将消息发送到交换机。
一次向多个消费者发送消息。
向多个消费者发送消息
如果交换机上没有队列绑定,则发送消息后消息会丢失。
fanout 交换器对于 routingKey 的值将会忽略。
public class Producer {
private static final String EXCHANGE_NAME = "log";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setUsername("XX");
connectionFactory.setPassword("CC$");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "send message 3";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
public class Consumer {
private static final String EXCHANGE_NAME = "log";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("cc");
factory.setPassword("cc$");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 确保该交换器存在
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 获取一个非持久、排他的、自动删除的随时队列
String queueName = channel.queueDeclare().getQueue();
// 队列绑定交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}我们先启动 Producer 然后再启动 Consumer 发现消费者那边没有接收到消息,说明在没有队列监听的时候,消息是丢失了。
如果我们将队列修改成这个 channel.queueDeclare(QUEUE_NAME,false,false,false,null);,然后启动两个相同的消费者发现,消息是存在竞争机制的,因为消息被轮询发给了不同的消费者。
根据路由键匹配发送消息到队列。
一个路由键可以绑定多个队列,一个队列也可以绑定多个路由键。
public class Producer {
private static final String EXCHANGE_NAME = "direct_log";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setUsername("cc");
connectionFactory.setPassword("cc$");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "send message 4-error";
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
public class Consumer {
private static final String EXCHANGE_NAME = "direct_log";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("cc");
factory.setPassword("cc$");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 确保该交换器存在
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取一个非持久、排他的、自动删除的随时队列
String queueName = channel.queueDeclare().getQueue();
// 队列绑定交换机
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}更改消费者的队列绑定的路由键,启动不同的客户端,然后更改生产者的路由键发送多条消息可以看到生产者发送到了相匹配路由键的队列中去了。同时如果在交换机中不存在绑定队列,则消息会丢失。
* 可以匹配一个单词,# 匹配多个单词,路由键以 “.” 分割,比如 test.log 的路由键可以进入 *.log 的队列,test.log.user 可以进入 test.# 的队列而匹配不了 test.*的队列。
如果不填 *、#那么就和 direct 交换器一样了。
注意是消费者的路由键绑定模糊匹配的key。
public class Producer {
private static final String EXCHANGE_NAME = "topic_log";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setUsername("rr");
connectionFactory.setPassword("rr$");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "send message 5-error";
channel.basicPublish(EXCHANGE_NAME, "test.error.log", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
public class Consumer {
private static final String EXCHANGE_NAME = "topic_log";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("ff");
factory.setPassword("ff$");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 确保该交换器存在
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 获取一个非持久、排他的、自动删除的随时队列
String queueName = channel.queueDeclare().getQueue();
// 队列绑定交换机
channel.queueBind(queueName, EXCHANGE_NAME, "test.*.log");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}以上与 direct 的区别是只将交换器类型换成了 topic,并且消费者的路由键加了一个模糊匹配的规则。
几种工作模式差不多就是对应几种交换器,并且如果一个队列里面存在多个消费者,那么都会有竞争消费的情况。交换器的匹配功能分别为广播、全匹配路由键与模糊匹配路由键。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。