前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【橙子老哥】C# RabbitMQ-RPC/发布确认/延迟队列(二)

【橙子老哥】C# RabbitMQ-RPC/发布确认/延迟队列(二)

作者头像
郑子铭
发布2024-12-30 10:18:42
发布2024-12-30 10:18:42
7400
代码可运行
举报
运行总次数:0
代码可运行

今天,我们来继续上一期的【橙子老哥】C# 实操RabbitMQ七大模式(一)

1、概述

我们在上一期,实操了前5个模式,因为篇幅有限,以及前5种模式用的比较多,还剩下RPC和发布者确认留在下一期讲

  1. 简单模式(Simple)
  2. 工作队列模式(Work Queue)
  3. 发布订阅模式(Publish/Subscribe)
  4. 路由模式(Routing)
  5. 通配符模式(Topics)
  6. RPC模式(RPC)
  7. 发布确认模式(Publish Confirms)

废话少说,直接来吧!

2、RPC模式(RPC)

看到这个标题,肯定很蒙蔽,大家肯定接触过GRPC,这rabbitmq怎么也和这玩意儿挂上关系了?其实我们把这个单词叫全,大家就清楚了

RPC,全称为远程过程调用(Remote Procedure Call),是一种计算机通信协议,允许程序在不同的地址空间(通常是不同的计算机)上执行代码

有人疑惑了,在web程序里面,我们http接口调用,不都是在远程过程调用吗?其实这类似异步和多线程的区别,不是一个维度的东西

  • Http是一个通讯协议
  • RPC是一种通讯模式

举个最明显的区别,大家熟悉的gRpc还是还是基于Http2.0呢

这里说的是通过Rabbitmq实现RPC模式,是基于Rabbitmq,而不是Http

如果客户端要去调用服务端的一个方法,我们来看看Rabbitmq是怎么做的

上图看起来东西很多,其实不然,我们把上图灰色的先去掉,客户端发送请求,resquest请求丢到一个队列,服务端完成之后再把response响应丢到另一个队列,客户端去接收,不就完成了?

灰色的方块只是对消息的标记,我们给每个消息顶一个(reply_to)服务端回复队列(correlation_id)追踪id,客户端发送消息到rpc_queue中,服务端通过解析reply_to把消息转到对应的队列中,客户端再通过订阅这个订阅,根据correlation_id解析对应的数据即可了

我们直接上代码:客户端:

代码语言:javascript
代码运行次数:0
复制
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Collections.Concurrent;
using System.Text;


public class Rpc
{
    //我们的客户端
    public static async Task Main(string[] args)
    {
        Console.WriteLine("RPC Client");
        string n = args.Length > 0 ? args[0] : "30";
        
        //这里去调用服务端方法
        await InvokeAsync(n);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static async Task InvokeAsync(string n)
    {
        //我们封装一个rpc操作类
        var rpcClient = new RpcClient();
        //初始化一下
        await rpcClient.StartAsync();

        Console.WriteLine(" [x] Requesting fib({0})", n);
        //发送请求,接收response
        var response = await rpcClient.CallAsync(n);
        Console.WriteLine(" [.] Got '{0}'", response);
        //打印结果,是不是很简单了,
    }
}


//因为客户端调用,每次都要写很多重复的代码,特别是对异步操作,所以这里客户端要封装了一个类
public class RpcClient : IAsyncDisposable
{
    private const string QUEUE_NAME = "rpc_queue";
    
    private readonly IConnectionFactory _connectionFactory;
    
    //其他没啥,这里注意客户端放一个TaskCompletionSource的安全字典
    //TaskCompletionSource是c#中异步的类,用于可以手动操作异步的结果
    private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper
        = new();

    private IConnection? _connection;
    private IChannel? _channel;
    private string? _replyQueueName;

    //初始化一下
    public RpcClient()
    {
        _connectionFactory = new ConnectionFactory { HostName = "localhost" };
    }

    //准备调用,进行声明,这里将服务端要返回的响应队列,进行订阅
    public async Task StartAsync()
    {
        //创建连接
        _connection = await _connectionFactory.CreateConnectionAsync();
        //创建信道
        _channel = await _connection.CreateChannelAsync();

        // 声明一个队列,并且将队列名进行保存,方便下面call方法直接调用,,一个客户端一个
        QueueDeclareOk queueDeclareResult = await _channel.QueueDeclareAsync();
        _replyQueueName = queueDeclareResult.QueueName;
        var consumer = new AsyncEventingBasicConsumer(_channel);

        //订阅消息
        consumer.ReceivedAsync += (model, ea) =>
        {
            //这里我们可以从消息中的BasicProperties,获取到CorrelationId 追踪id
            string? correlationId = ea.BasicProperties.CorrelationId;

            if (false == string.IsNullOrEmpty(correlationId))
            {
                //如果消息响应到了,这将存储的TaskCompletionSource设置一个数据和状态,并在集合中移除即可
                if (_callbackMapper.TryRemove(correlationId, out var tcs))
                {
                    var body = ea.Body.ToArray();
                    var response = Encoding.UTF8.GetString(body);
                    tcs.TrySetResult(response);
                }
            }

            return Task.CompletedTask;
        };
        //订阅
        await _channel.BasicConsumeAsync(_replyQueueName, true, consumer);
    }

    //客户端真正去调用服务端方法
    public async Task<string> CallAsync(string message,
        CancellationToken cancellationToken = default)
    {
        if (_channel is null)
        {
            throw new InvalidOperationException();
        }

        //创建一个追踪id
        string correlationId = Guid.NewGuid().ToString();
        
        //这里我们发送消息的时候,可以给消息加个基础信息,就是CorrelationId和ReplyTo
        var props = new BasicProperties
        {
            CorrelationId = correlationId,
            ReplyTo = _replyQueueName
        };

        //创建TaskCompletionSource,结果是string
        var tcs = new TaskCompletionSource<string>(
                TaskCreationOptions.RunContinuationsAsynchronously);
        //加入集合,需要start的消息消费
        _callbackMapper.TryAdd(correlationId, tcs);

        //发送消息,这里的routingKey写死,并且传入basicProperties
        var messageBytes = Encoding.UTF8.GetBytes(message);
        await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: QUEUE_NAME,
            mandatory: true, basicProperties: props, body: messageBytes);

        //通过CancellationTokenRegistration进行管理cancellationToken的注销
        //CancellationTokenRegistration这个也是c#中异步的类,用于管理cancellationToken
        using CancellationTokenRegistration ctr =
            cancellationToken.Register(() =>
            {
                _callbackMapper.TryRemove(correlationId, out _);
                tcs.SetCanceled();
            });

        //对TaskCompletionSource进行异步等待,返回结果即可
        return await tcs.Task;
    }

    public async ValueTask DisposeAsync()
    {
        if (_channel is not null)
        {
            await _channel.CloseAsync();
        }

        if (_connection is not null)
        {
            await _connection.CloseAsync();
        }
    }
}

服务端:

代码语言:javascript
代码运行次数:0
复制
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

const string QUEUE_NAME = "rpc_queue";

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

//服务端订阅订阅,就是客户端发送的那个队列QUEUE_NAME
await channel.QueueDeclareAsync(queue: QUEUE_NAME, durable: false, exclusive: false,
    autoDelete: false, arguments: null);

//消费者设置,当有1个未确认的消息,不允许再接收,设置仅影响当前通道上的消费者,而不是全局设置
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) =>
{
    AsyncEventingBasicConsumer cons = (AsyncEventingBasicConsumer)sender;
    IChannel ch = cons.Channel;
    string response = string.Empty;

    byte[] body = ea.Body.ToArray();
    //这里,我们从刚刚客户端消息的BasicProperties,拿出CorrelationId

    //将CorrelationId进行一个透传
    IReadOnlyBasicProperties props = ea.BasicProperties;
    var replyProps = new BasicProperties
    {
        CorrelationId = props.CorrelationId
    };

    try
    {
        var message = Encoding.UTF8.GetString(body);
        int n = int.Parse(message);
        Console.WriteLine($" [.] Fib({message})");
        
        //运行服务端的方法
        response = Fib(n).ToString();
    }
    catch (Exception e)
    {
        Console.WriteLine($" [.] {e.Message}");
        response = string.Empty;
    }
    finally
    {
        var responseBytes = Encoding.UTF8.GetBytes(response);
        
        //服务端客户端给的CorrelationId进行区分,找到对应的回复id要回复到哪个队列上,props.ReplyTo
        await ch.BasicPublishAsync(exchange: string.Empty, routingKey: props.ReplyTo!,
            mandatory: true, basicProperties: replyProps, body: responseBytes);
        
        //消息确认
        await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
    }
};

await channel.BasicConsumeAsync(QUEUE_NAME, false, consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

//这段方法,是服务端的,客户端要远程调用这个方法
static int Fib(int n)
{
    if (n is 0 or 1)
    {
        return n;
    }

    return Fib(n - 1) + Fib(n - 2);
}

3、发布确认模式(Publish Confirms)

在传统的非确认模式下,发布者发送消息后并不能确认消息是否成功发送到 RabbitMQ 的队列中,可能会导致消息丢失。

而开启发布者确认模式后,RabbitMQ 会在成功接收消息后,向发布者发送确认,确保消息已被正确接收

或者说,生产者发布消息,有没有到队列中不知道啊,为了提高生产者发布的消息到队列中的正确率

当然,通常这种情况非常的极端

RabbitMQ 的发布者确认模式(Publisher Confirms)主要用于解决发布消息(message)到队列(queue)的可靠传输问题。当你启用发布者确认模式时,发布者会等待消息被RabbitMQ接收,或者通知发布者消息未能成功接收

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展, 因此,默认情况下不会启用它们。

我们只需要在生产者创建连接的时候,进行声明下即可

代码语言:javascript
代码运行次数:0
复制
  var channelOpts = new CreateChannelOptions(
    //设置开启发布者消息确认
    publisherConfirmationsEnabled: true,
    //开启发布者确认追踪
    publisherConfirmationTrackingEnabled: true,
    //限流
    outstandingPublisherConfirmationsRateLimiter: new  ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

这里确认方式,又有很多了,官方提供了3种,性能是从低到高,但是对应的实现难度也是指数倍增加

  • 单个确认
  • 批量确认
  • 异步确认

生产者代码实现:

代码语言:javascript
代码运行次数:0
复制
using System.Buffers.Binary;
using System.Diagnostics;
using System.Text;
using RabbitMQ.Client;

const ushort MAX_OUTSTANDING_CONFIRMS = 256;

const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions(
    //设置开启发布者消息确认
    publisherConfirmationsEnabled: true,
    //开启发布者确认追踪
    publisherConfirmationTrackingEnabled: true,
    //限流
    outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

var props = new BasicProperties
{
    Persistent = true
};

string hostname = "localhost";
if (args.Length > 0)
{
    if (false == string.IsNullOrWhiteSpace(args[0]))
    {
        hostname = args[0];
    }
}


//单个确认
await PublishMessagesIndividuallyAsync();

//批量确认
await PublishMessagesInBatchAsync();

//异步确认
await HandlePublishConfirmsAsynchronously();

Task<IConnection> CreateConnectionAsync()
{
    var factory = new ConnectionFactory { HostName = hostname };
    return factory.CreateConnectionAsync();
}

//单个单个确认
//这种技术非常简单,但也有一个主要缺点: 它会显著减慢发布速度,因为消息的确认会阻止发布 的所有后续消息。这种方法不会提供 每秒发布几百条消息。不过,这可能是 对于某些应用程序来说已经足够好了。
async Task PublishMessagesIndividuallyAsync()
{
    Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");

    await using IConnection connection = await CreateConnectionAsync();
    //将定义的channelOpts配置信道
    await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

    // 声明队列
    QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
    string queueName = queueDeclareResult.QueueName;
    
    var sw = new Stopwatch();
    sw.Start();

    for (int i = 0; i < MESSAGE_COUNT; i++)
    {
        byte[] body = Encoding.UTF8.GetBytes(i.ToString());
        try
        {
            //发送5w条持久消息,设置消息mandatory失败告诉生产者
            await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, basicProperties: props, mandatory: true);
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}");
        }
    }

    sw.Stop();

    Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
}

//批量确认
//一个缺点是,在发生故障时,我们不知道到底出了什么问题
async Task PublishMessagesInBatchAsync()
{
    Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

    await using IConnection connection = await CreateConnectionAsync();
    await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
    
    QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
    string queueName = queueDeclareResult.QueueName;

    int batchSize = MAX_OUTSTANDING_CONFIRMS / 2;
    int outstandingMessageCount = 0;

    var sw = new Stopwatch();
    sw.Start();

   //将生产消息装到这里
    var publishTasks = new List<ValueTask>();
    for (int i = 0; i < MESSAGE_COUNT; i++)
    {
        byte[] body = Encoding.UTF8.GetBytes(i.ToString());
        //这里注意,并没一个一个去执行,而是将一批的消息异步执行
        publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
        outstandingMessageCount++;

        //当循环到需要确认的值的时候(现在设置的是一半)
        if (outstandingMessageCount == batchSize)
        {
            foreach (ValueTask pt in publishTasks)
            {
                try
                {
                    //等到批量的确认结果
                    await pt;
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
                }
            }
            publishTasks.Clear();
            outstandingMessageCount = 0;
        }
    }

    //再做一次将消息清空
    if (publishTasks.Count > 0)
    {
        foreach (ValueTask pt in publishTasks)
        {
            try
            {
                await pt;
            }
            catch (Exception ex)
            {
                Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
            }
        }
        publishTasks.Clear();
        outstandingMessageCount = 0;
    }

    sw.Stop();
    Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
}

//异步确认
//这里性能最佳,带代码也是最繁琐的
async Task HandlePublishConfirmsAsynchronously()
{
    Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");

    await using IConnection connection = await CreateConnectionAsync();

    channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
    await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

    // declare a server-named queue
    QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
    string queueName = queueDeclareResult.QueueName;

    var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
    var outstandingConfirms = new LinkedList<ulong>();
    var semaphore = new SemaphoreSlim(1, 1);
    int confirmedCount = 0;
    async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
    {
        if (debug)
        {
            Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
                DateTime.Now, deliveryTag, multiple);
        }

        await semaphore.WaitAsync();
        try
        {
            if (multiple)
            {
                do
                {
                    LinkedListNode<ulong>? node = outstandingConfirms.First;
                    if (node is null)
                    {
                        break;
                    }
                    if (node.Value <= deliveryTag)
                    {
                        outstandingConfirms.RemoveFirst();
                    }
                    else
                    {
                        break;
                    }

                    confirmedCount++;
                } while (true);
            }
            else
            {
                confirmedCount++;
                outstandingConfirms.Remove(deliveryTag);
            }
        }
        finally
        {
            semaphore.Release();
        }

        if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
        {
            allMessagesConfirmedTcs.SetResult(true);
        }
    }

    channel.BasicReturnAsync += (sender, ea) =>
    {
        ulong sequenceNumber = 0;

        IReadOnlyBasicProperties props = ea.BasicProperties;
        if (props.Headers is not null)
        {
            object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
            if (maybeSeqNum is not null)
            {
                sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
            }
        }

        Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
        return CleanOutstandingConfirms(sequenceNumber, false);
    };

    channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
    channel.BasicNacksAsync += (sender, ea) =>
    {
        Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
        return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
    };

    var sw = new Stopwatch();
    sw.Start();

    var publishTasks = new List<ValueTuple<ulong, ValueTask>>();
    for (int i = 0; i < MESSAGE_COUNT; i++)
    {
        string msg = i.ToString();
        byte[] body = Encoding.UTF8.GetBytes(msg);
        ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync();
        if ((ulong)(i + 1) != nextPublishSeqNo)
        {
            Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
        }
        await semaphore.WaitAsync();
        try
        {
            outstandingConfirms.AddLast(nextPublishSeqNo);
        }
        finally
        {
            semaphore.Release();
        }

        string rk = queueName;
        if (i % 1000 == 0)
        {
            // This will cause a basic.return, for fun
            rk = Guid.NewGuid().ToString();
        }
        (ulong, ValueTask) data =
            (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
        publishTasks.Add(data);
    }

    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
    // await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
    foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
    {
        try
        {
            await datum.PublishTask;
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'");
        }
    }

    try
    {
        await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token);
    }
    catch (OperationCanceledException)
    {
        Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
    }
    catch (TimeoutException)
    {
        Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
    }

    sw.Stop();
    Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
}

4、延迟队列

最后,就是rabbitmq中说烂了的延迟队列,主要用于需要延迟执行的消息,例如培训机构最喜欢的电商系统中,取消订单的操作 rabbitmq要实现这个,有两种方式

  • 安装插件rabbitmq_delayed_message_exchange
  • 使用TTL(Time-To-Live)和队列的死信交换机(Dead Letter Exchange)来实现
4.1 使用插件:

在申明交换机的时候,选择x-delayed-message类型即可

代码语言:javascript
代码运行次数:0
复制
await channel.ExchangeDeclareAsync("my-exchange", "x-delayed-message",durable:true,autoDelete:false,
    new Dictionary<string, object?>()
    {
        {"x-delayed-type", "direct"}
    });
4.2 TTL和DLX

TTL:说白了,就是可以设置消息的过期时间,如果超出时间没有消费,就是死信

DLX:当一个队列中的消息成为死信后,如果这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机。通过这种方式,可以将各种原因导致的失败消息集中到一个特定的地方,以便于分析和处理。

这里大家可能有个误区,认为死信交换机是一种特殊的交换机,其实并不是,它之所以叫做死信交换机,是因为有其他队列把死信消息绑定给了它

消息先到了TTL队列,等5秒过期之后,就到了死信队列,消费者即时消费这个死信队列即可

请注意,声明的这个TTL严禁有任何的消费者,或者TTL都过期不了直接被消费了

生产者:

代码语言:javascript
代码运行次数:0
复制
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
    const string QueueName = "normal_queue";
    const string DlxExchangeName = "dlx_exchange";
    const string DlxQueueName = "delay_queue";

    
    static async Task Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "xxx",Port = 5672,Password = "xxx",UserName = "xxx"};
        await using var connection = await factory.CreateConnectionAsync();
        await using var channel =await connection.CreateChannelAsync();
        
        // 声明死信交换机,其实和正常交换机没有任何区别
        await  channel.ExchangeDeclareAsync(DlxExchangeName, ExchangeType.Fanout,durable:true,autoDelete:false);
            
        // 声明死信队列,其实和正常交换机没有任何区别
        await channel.QueueDeclareAsync(DlxQueueName,exclusive:false, durable: true, autoDelete: false);
        await channel.QueueBindAsync(DlxQueueName, DlxExchangeName,string.Empty);

        // 声明TTL队列,和死信队列绑定
        var arguments = new Dictionary<string, object?>
        {
            {"x-dead-letter-exchange",DlxExchangeName},
            {"x-message-ttl", 5000 } // 设置 TTL为5000ms (5秒)
        };
        await channel.QueueDeclareAsync(QueueName, durable: true,exclusive:false, autoDelete: false,arguments:arguments);
        

        // 发送消息到延迟队列
        var message = "这是一个延迟消息";
        var body = Encoding.UTF8.GetBytes(message);
        await  channel.BasicPublishAsync(exchange: string.Empty,routingKey:QueueName, body);
        Console.WriteLine(" [x] Sent {0}", message);
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

消费者直接正常消费死信队列即可

Rabbitmq 只有这些内容吗?当然不止!各各组件特性搭配,根据业务,玩出花都可以

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-12-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 今天,我们来继续上一期的【橙子老哥】C# 实操RabbitMQ七大模式(一)
  • 1、概述
  • 2、RPC模式(RPC)
  • 3、发布确认模式(Publish Confirms)
  • 4、延迟队列
    • 4.1 使用插件:
    • 4.2 TTL和DLX
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档