RabbitMQ 是一个开源的消息代理。它支持 AMQP 协议,用于在分布式系统中发送和接收消息。它通过队列管理消息,支持多种通信方式(如点对点、发布/订阅)。它常用于微服务之间传递数据。
在 ASP.NET Core 中使用 RabbitMQ 有以下好处
常见用途包括
http://localhost:15672
,默认用户名和密码为 guest/guest
。docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
5672
是 RabbitMQ 的主端口。15672
是管理界面的端口。rabbitmq:3-management
镜像可以开启 Web 控制台。打开浏览器访问 http://localhost:15672
,输入默认账号登录。查看是否有队列或交换机信息。
创建一个新项目
dotnet new webapi -n RabbitMQExample
cd RabbitMQExample
添加 RabbitMQ 客户端库
dotnet add package RabbitMQ.Client
如果想用更高级的功能,也可以安装 MassTransit,可以参考昨日发布的文章
MassTransit :一个开源免费的 .NET 应用程序框架
在 appsettings.json
文件中添加如下内容:
{
"RabbitMQ":{
"HostName":"localhost",
"Port":5672,
"UserName":"guest",
"Password":"guest",
"QueueName":"my-queue"
}
}
新建一个类 RabbitMQService.cs
:
public interfaceIRabbitMQService
{
IConnection CreateConnection();
}
publicclassRabbitMQService : IRabbitMQService
{
privatereadonly IConfiguration _config;
public RabbitMQService(IConfiguration config)
{
_config = config;
}
public IConnection CreateConnection()
{
var factory = new ConnectionFactory
{
HostName = _config["RabbitMQ:HostName"],
Port = int.Parse(_config["RabbitMQ:Port"]),
UserName = _config["RabbitMQ:UserName"],
Password = _config["RabbitMQ:Password"]
};
return factory.CreateConnection();
}
}
注册服务:
builder.Services.AddSingleton<IRabbitMQService, RabbitMQService>();
创建一个类 ProducerService.cs
:
public classProducerService
{
privatereadonly IRabbitMQService _rabbitMQService;
privatereadonlystring _queueName;
public ProducerService(IRabbitMQService service, IConfiguration config)
{
_rabbitMQService = service;
_queueName = config["RabbitMQ:QueueName"];
}
public void PublishMessage(string message)
{
usingvar connection = _rabbitMQService.CreateConnection();
usingvar channel = connection.CreateModel();
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: _queueName, basicProperties: null, body: body);
Console.WriteLine($"发送消息:{message}");
}
}
注册服务:
builder.Services.AddSingleton<ProducerService>();
在控制器中调用:
[ApiController]
[Route("api/[controller]")]
publicclassMessageController : ControllerBase
{
privatereadonly ProducerService _producer;
public MessageController(ProducerService producer)
{
_producer = producer;
}
[HttpPost]
public IActionResult Post([FromBody] string message)
{
_producer.PublishMessage(message);
return Ok("消息已发送");
}
}
创建一个后台服务 ConsumerService.cs
:
public classConsumerService : BackgroundService
{
privatereadonly IRabbitMQService _rabbitMQService;
privatereadonlystring _queueName;
public ConsumerService(IRabbitMQService service, IConfiguration config)
{
_rabbitMQService = service;
_queueName = config["RabbitMQ:QueueName"];
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
usingvar connection = _rabbitMQService.CreateConnection();
usingvar channel = connection.CreateModel();
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"收到消息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
await Task.CompletedTask;
}
}
注册服务:
builder.Services.AddHostedService<ConsumerService>();
·············· END ··············