前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Net中RabbitMq.Client7.0通过依赖注入DI来管理RabbitMQ客户端的生命周期

Net中RabbitMq.Client7.0通过依赖注入DI来管理RabbitMQ客户端的生命周期

原创
作者头像
Net分享
修改2024-12-04 16:14:23
修改2024-12-04 16:14:23
2090
举报
文章被收录于专栏:Net分享Net分享

在 RabbitMQ.Client 7.0.0 版本中,

IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel,

IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。

前言

关于RabbitMq的更多知识点在: https://www.dotnetshare.com

公众号:Net分享,欢迎关注

下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期

1. 安装RabbitMQ客户端库

首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:

代码语言:shell
复制
Install-Package RabbitMQ.Client

2. 配置RabbitMQ连接字符串

在你的appsettings.json文件中,添加RabbitMQ的连接配置:

代码语言:json
复制
{
  "RabbitMQ": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest"
  }
}

3. 创建RabbitMQ服务配置类

创建一个配置类来封装RabbitMQ的连接信息:

代码语言:csharp
复制
public class RabbitMQOptions
{
    public string HostName { get; set; }
    public int Port { get; set; }
    public string UserName { get; set; }
    public string Password { get; set; }
}

4. 注册RabbitMQ服务

Startup.cs或程序启动时的配置方法中,注册RabbitMQ服务:

代码语言:csharp
复制
// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOptions>(builder.Configuration.GetSection("RabbitMQ"));

// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{
    var options = sp.GetRequiredService<IOptions<RabbitMQOptions>>().Value;
    var factory = new ConnectionFactory() { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };
    return new RabbitMQConnection(factory);
}); 

// 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();

5. 创建RabbitMQ连接和通道工厂

创建一个工厂类来管理RabbitMQ的连接和通道:

代码语言:csharp
复制
    public interface IRabbitMQConnection : IDisposable
    {
        Task<IChannel> CreateChannel();
    }

    public class RabbitMQConnection : IRabbitMQConnection
    {
        private readonly ConnectionFactory _factory;
        private readonly IConnection _connection;
        private bool _isDisposed;

        public RabbitMQConnection(ConnectionFactory factory)
        {
            _factory = factory ?? throw new ArgumentNullException(nameof(factory));
            _connection = factory.CreateConnectionAsync().Result;
        }

        public async Task<IChannel> CreateChannel()
        {
            EnsureNotDisposed();
            return await _connection.CreateChannelAsync();
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (_isDisposed) return;

            if (disposing)
            {
                // Free any other managed objects here.
            }

            // Free any unmanaged objects here.
            _connection.Dispose();

            _isDisposed = true;
        }

        ~RabbitMQConnection()
        {
            Dispose(false);
        }

        private void EnsureNotDisposed()
        {
            if (_isDisposed)
            {
                throw new ObjectDisposedException(nameof(RabbitMQConnection));
            }
        }
    }

6. 使用RabbitMQ服务

在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):

代码语言:csharp
复制
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;

public class RabbitMQService
{
    private readonly IRabbitMQConnection _connection;

    public RabbitMQService(IRabbitMQConnection connection)
    {
        _connection = connection ?? throw new ArgumentNullException(nameof(connection));
    }


    public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
    {
         
            try
            {
                using var channel = _connection.CreateChannel();
                var mesjson = JsonSerializer.Serialize(message);
                Console.WriteLine("发送消息:" + mesjson);
                var body = Encoding.UTF8.GetBytes(mesjson);
                var properties = new RabbitMQ.Client.BasicProperties
                {
                    Persistent = true // 设置消息持久化
                };
                channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);

            }
            catch (OperationCanceledException ex)
            {
                Console.WriteLine($"Operation was canceled: {ex.Message}");
                //throw; // Re-throw if you want to propagate the cancellation
            }
            catch (Exception ex)
            {
                Console.WriteLine($"An error occurred: {ex.Message}");
                //throw; // Re-throw if you want to propagate the error
            }  
    }

    public async Task ReceiveAsync(string queueName, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default)
    {
        var channel = _connection.CreateChannel();
        await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            try
            {
                // 直接传递 model 和 body 给 callback,不需要转换
                await callback(channel, body);
            }

            finally
            {
                //await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
            }
        };
        await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
        // Prevent the method from returning immediately
        await Task.Delay(-1, cancellationToken);
    }
}

7.生产端和消费端的使用

消费
代码语言:csharp
复制
var app = builder.Build();


var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
    // 处理接收到的消息
    //string message = Encoding.UTF8.GetString(body);
    //Console.WriteLine($"收到消息 message: {message}");
    //// 确认消息
    //await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);

}, cancellationToken);
生产端

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 1. 安装RabbitMQ客户端库
  • 2. 配置RabbitMQ连接字符串
  • 3. 创建RabbitMQ服务配置类
  • 4. 注册RabbitMQ服务
  • 5. 创建RabbitMQ连接和通道工厂
  • 6. 使用RabbitMQ服务
  • 7.生产端和消费端的使用
    • 消费
    • 生产端
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档