前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >13-RabbitMQ高级特性-Confirm确认消息

13-RabbitMQ高级特性-Confirm确认消息

作者头像
彼岸舞
发布2022-10-06 08:38:37
3720
发布2022-10-06 08:38:37
举报
文章被收录于专栏:java开发的那点事

Confirm确认消息

理解Confirm消息确认机制

  • 消息的确认, 是指投递消息后, 如果Broker收到消息, 则会给我们生产者一个应答
  • 生产者进行接收应答用来确定这条消息是否正常的发送到Broker, 这种方式也是消息的可靠性投递的核心保障

Confirm确认消息流程解析

Confirm确认消息实现

  • 在Channel上开启确认模式: channel.confirmSelect()
  • 在Channel上添加监听: addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送, 记录日志或者等后续处理

代码实现

消费者

代码语言:javascript
复制
package com.dance.redis.mq.rabbit.confirm;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver4ConfirmListener {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_confirmlistener_exchange";
        String queueName = "test_confirmlistener_queue";
        String routingKey = "confirm.#";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        RabbitMQHelper.queueDeclare(channel,queueName);
        channel.queueBind(queueName, exchangeName, routingKey);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

代码语言:javascript
复制
package com.dance.redis.mq.rabbit.confirm;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Sender4ConfirmListener {
 
    
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_confirmlistener_exchange";
        String routingKey1 = "confirm.save";
        String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            // 处理失败
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("------- error ---------");
            }
            // 处理成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("------- ok ---------");
            }
        });
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
    
}

测试

启动消费者

启动生产者

收到ok

查看消费者

消费成功

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Confirm确认消息
    • 理解Confirm消息确认机制
      • Confirm确认消息流程解析
        • Confirm确认消息实现
          • 代码实现
            • 测试
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档