基本介绍
消息会发送到所有与该交换机绑定的队列中
图2-1
消息通过RoutingKey精准匹配对应的队列中
图2-2
消息通过RoutingKey模糊匹配到对应的队列中
不依赖于路由键的匹配规则路由消息,根据发送的消息内容headers属性进行完全匹配(键值对形式)。性能差,基本不使用。
队列参数
channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除
参数名称 | 描述 | Features |
---|---|---|
x-message-ttl | 队列中的消息的生存周期,单位毫秒 | TTL |
x-expires | 队列在指定的时间内没有被使用(访问)就会被删除 | Exp |
x-max-length | 设置队列最大长度(先进先丢) | Lim |
x-max-length-bytes | 队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃 | Lim B |
x-overflow | 队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息 | Ovfl |
x-single-active-consumer | 一次只能有一个消费者消费消息 | SAC |
x-dead-letter-exchange | 设置当前队列的死信交换机 | DLX |
x-dead-letter-routing-key | 设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列 | DLK |
x-max-priority | 队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息 | Pri |
x-queue-mode | 懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。 | Args |
x-queue-master-locator | master queue host 的分配策略:min-masters、client-local和random |
消息参数
参数名称 | 描述 |
---|---|
content_type | 消息内容的类型 |
content_encoding | 消息内容的编码格式 |
priority | 消息的优先级 |
correlation_id | 用于将RPC响应与请求相关联 |
reply_to | 回调队列 |
expiration | 消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期 |
message_id | 消息id |
timestamp | 消息的时间戳 |
type | 类型 |
user_id | 用户id |
app_id | 应用程序id |
cluster_id | 集群id |
basicGet
方法拉取多条消息,处理完毕一次性返回ACKredis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)
生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID
配置死信队列
public static void SendMessage()
{
var exchangeA = "exchange";
var routeA = "routekey";
var queueA = "queue";
var exchangeD = "dlx.exchange";
var routeD = "dlx.route";
var queueD = "dlx.queue";
var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
// 创建死信交换机
channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false);
// 创建死信队列
channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false);
// 绑定死信交换机和队列
channel.QueueBind(queueD, exchangeD, routeD);
channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments:
new Dictionary<string, object> {
{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
// { "x-message-ttl",10000}, //设置消息的存活时间,即过期时间
{ "x-max-length",5}//设置队列最大长度
});
channel.QueueBind(queueA, exchangeA, routeA);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息
channel.BasicPublish(exchange: exchangeA,
routingKey: routeA,
basicProperties: properties,
body: Encoding.UTF8.GetBytes("hello rabbitmq message"));
}
}
}
重试失败特定次数后放入死信队列
private static string _exchangeNormal = "Exchange.Normal"; //定义一个用于接收 正常 消息的交换机
private static string _exchangeRetry = "Exchange.Retry"; //定义一个用于接收 重试 消息的交换机
private static string _exchangeFail = "Exchange.Fail"; //定义一个用于接收 失败 消息的交换机
private static string _queueNormal = "Queue.Noraml"; //定义一个用于接收 正常 消息的队列
private static string _queueRetry = "Queue.Retry"; //定义一个用于接收 重试 消息的队列
private static string _queueFail = "Queue.Fail"; //定义一个用于接收 失败 消息的队列
public static void Test()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
//声明交换机
channel.ExchangeDeclare(_exchangeNormal, "topic", true, false, null);
channel.ExchangeDeclare(_exchangeRetry, "topic", true, false, null);
channel.ExchangeDeclare(_exchangeFail, "topic", true, false, null);
//定义队列参数
var queueNormalArgs = new Dictionary<string, object>();
{
queueNormalArgs.Add("x-dead-letter-exchange", _exchangeFail); //指定死信交换机,用于将 Normal 队列中失败的消息投递给 Fail 交换机
}
var queueRetryArgs = new Dictionary<string, object>();
{
queueRetryArgs.Add("x-dead-letter-exchange", _exchangeNormal); //指定死信交换机,用于将 Retry 队列中超时的消息投递给 Normal 交换机
queueRetryArgs.Add("x-message-ttl", 6000); //定义 queueRetry 的消息最大停留时间 (原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机) //定义最大停留时间为防止一些 待重新投递 的消息、没有定义重试时间而导致内存溢出
}
var queueFailArgs = new Dictionary<string, object>();
{
}
//声明队列
channel.QueueDeclare(queue: _queueNormal, durable: true, exclusive: false, autoDelete: false, arguments: queueNormalArgs);
channel.QueueDeclare(queue: _queueRetry, durable: true, exclusive: false, autoDelete: false, arguments: queueRetryArgs);
channel.QueueDeclare(queue: _queueFail, durable: true, exclusive: false, autoDelete: false, arguments: queueFailArgs);
//为队列绑定交换机
channel.QueueBind(queue: _queueNormal, exchange: _exchangeNormal, routingKey: "#");
channel.QueueBind(queue: _queueRetry, exchange: _exchangeRetry, routingKey: "#");
channel.QueueBind(queue: _queueFail, exchange: _exchangeFail, routingKey: "#");
#region 创建一个普通消息消费者
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var _sender = (EventingBasicConsumer)sender; //消息传送者
var _channel = _sender.Model; //消息传送通道
var _message = (BasicDeliverEventArgs)e; //消息传送参数
var _headers = _message.BasicProperties.Headers; //消息头
var _content = Encoding.UTF8.GetString(_message.Body.ToArray()); //消息内容
var _death = default(Dictionary<string, object>); //死信参数
if (_headers != null && _headers.ContainsKey("x-death"))
_death = (Dictionary<string, object>)(_headers["x-death"] as List<object>)[0];
try
#region 消息处理
{
Console.WriteLine();
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.0)消息接收:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");
throw new Exception("模拟消息处理失败效果。");
//处理成功时
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.1)处理成功:\r\n\t[deliveryTag={_message.DeliveryTag}]");
//消息确认 (销毁当前消息)
_channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
}
#endregion
catch (Exception ex)
#region 消息处理失败时
{
var retryCount = (long)(_death?["count"] ?? default(long)); //查询当前消息被重新投递的次数 (首次则为0)
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.2)处理失败:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[retryCount={retryCount}]");
if (retryCount >= 2)
#region 投递第3次还没消费成功时,就转发给 exchangeFail 交换机
{
//消息拒绝(投递给死信交换机,也就是上边定义的 ("x-dead-letter-exchange", _exchangeFail))
_channel.BasicNack(deliveryTag: _message.DeliveryTag, multiple: false, requeue: false);
}
#endregion
else
#region 否则转发给 exchangeRetry 交换机
{
var interval = (retryCount + 1) * 10; //定义下一次投递的间隔时间 (单位:秒)
//定义下一次投递的间隔时间 (单位:毫秒)
_message.BasicProperties.Expiration = (interval * 1000).ToString();
//将消息投递给 _exchangeRetry (会自动增加 death 次数)
_channel.BasicPublish(exchange: _exchangeRetry, routingKey: _message.RoutingKey, basicProperties: _message.BasicProperties, body: _message.Body);
//消息确认 (销毁当前消息)
_channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
}
#endregion
}
#endregion
};
channel.BasicConsume(queue: _queueNormal, autoAck: false, consumer: consumer);
}
#endregion
}
public static void ConsumerMessage()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
var exchangeArgumets = new Dictionary<string, object>
{
{ "x-delayed-type", "topic" } //延迟交换机的类型
};
channel.ExchangeDeclare("delay_exchange", "x-delayed-message", true, false, exchangeArgumets);
// 创建队列
string queueName1 = "delay_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "delay_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "delay_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
channel.QueueBind(queue: queueName1, exchange: "delay_exchange", routingKey: "delayed-direct1");
channel.QueueBind(queue: queueName2, exchange: "delay_exchange", routingKey: "delayed-direct2");
channel.QueueBind(queue: queueName3, exchange: "delay_exchange", routingKey: "delayed-direct3");
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 标记消息持久化
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
var _headers = ea.BasicProperties.Headers; //消息头
int delay = 0;
if (_headers == null)
{
ea.BasicProperties.Headers = new Dictionary<string, object>();
}
else if ( _headers.ContainsKey("x-delay"))
{
delay = Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
delay = delay + 20000;
}
ea.BasicProperties.Headers["x-delay"] = delay; //消息头设置消息延迟的时间
Console.WriteLine($" {DateTime.Now}=={delay}");
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
channel.BasicPublish(ea.Exchange, ea.RoutingKey, basicProperties: ea.BasicProperties, body);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName3,
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}