前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习总结

RabbitMQ学习总结

作者头像
蓝夏
发布2018-06-14 15:03:26
9660
发布2018-06-14 15:03:26
举报
文章被收录于专栏:bluesummer

关于RabbitMQ是什么以及它的概念,不了解的可以先查看一下下面推荐的几篇博客

https://blog.csdn.net/whoamiyang/article/details/54954780

https://www.cnblogs.com/frankyou/p/5283539.html

https://blog.csdn.net/mx472756841/article/details/50815895

官网介绍:http://www.rabbitmq.com/getstarted.html

本文github源码:http://www.cnblogs.com/bluesummer/p/8992225.html

因为之前不了解交换机及AMQP协议,上来就研究RabbitMQ,很多概念都有点蒙圈,所以建议大家在学习RabbitMQ之前先对一些概念有基本的了解

安装与配置:

服务相关命令

  • rabbitmq-plugins enable rabbitmq_management //开启管理插件
  • rabbitmq-service.bat start //开启服务
  • rabbitmq-service.bat stop //关闭服务
  • rabbitmqctl list_queues //查看任务

注意在执行命令rabbitmqctl list_queues时若报错unable to perform an operation on node。。。。,可将C:\Users\用户名\.erlang.cookie.erlang.cookie文件拷贝到C:\Windows\System32\config\systemprofile\.erlang.cookie中替换,然后重启服务

至此RabbitMQ服务我们已经安装好了

后台管理

开启管理插件后我们重启rabbitmq服务,打开http://localhost:15672/后台管理界面, 用户名和密码均为guest

guest账户在最新版本只能通过localhost登陆了,如果想要通过ip来登陆需要设置一下配置文件:

找到/rabbitmq_server-x.x.x/ebin下面的rabbit.app文件文件: 找到:loopback_users将里面的<<”guest”>>删除。 删除后的内容为:{loopback_users, []},然后重启服务

关于用户密码管理的操作我们都可以在管理页面中设置

默认端口:

  1. client端通信口5672
  2. 管理口15672
  3. server间内部通信口25672
  4. erlang发现口:4369

想要修改默认端口可修改 安装目录下 etc/rabbitmq.config文件,有个默认的example,改一改就可以了

发送消息

我们先构建一个应用程序,建议创建一个winform或wpf程序,控制台在这里并不太好用。 项目中引用nuget包:RabbitMQ.Client

接下来我们编写一个发送消息和接收消息的代码:

代码语言:javascript
复制
  public void SendMsg(string message)
    {
        //这里的端口及用户名都是默认的,可以直接设置一个hostname=“localhost”其他的不用配置
        var factory = new ConnectionFactory() { HostName = "192.168.1.15",Port=5672,UserName= "guest",Password= "guest" };
        //创建一个连接,连接到服务器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                //创建一个名称为hello的消息队列
                //durable:队列持久化,为了防止RabbitMQ在退出或者crash等异常情况下数据不会丢失,可以设置durable为true
                //exclusive:排他队列,只对首次声明它的连接(Connection)可见,不允许其他连接访问,在连接断开的时候自动删除,无论是否设置了持久化
                //autoDelete:自动删除,如果该队列已经没有消费者时,该队列会被自动删除。这种队列适用于临时队列。
                channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null);
                //channel.BasicConsume("hello", autoAck: true);
               
                var props = channel.CreateBasicProperties();
                //消息持久化,若启用durable则该属性启用
                props.Persistent = true;
                //封装消息主体
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: props, body: body);
                Console.WriteLine(" 发送消息{0}", message);
            }
        }
    }
    
     public class Consumer : IDisposable
    {
        public static int _number;
        private static ConnectionFactory factory;
        private static IConnection connection;
        static Receive()
        {
            factory = new ConnectionFactory() { HostName = "localhost" };
        }
        public Receive()
        {
            _number++;
        }
        public void ReceiveMsg(Action<string> callback)
        {
            if(connection==null||!connection.IsOpen)
                connection = factory.CreateConnection();
            IModel _channel = connection.CreateModel();
            _channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: true, arguments: null);
            _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
            // 创建事件驱动的消费者
            var consumer = new EventingBasicConsumer(_channel); 
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                callback($"number:{_number}.message:{message}");
                //模拟消息处理需要两秒
                Thread.Sleep(2000);
                //显示发送ack确认接收并处理完成消息,只有在前面进行启用显示发送ack机制后才奏效。
                _channel.BasicAck(ea.DeliveryTag, false);
            };
            //指定消费队列,autoAct是否自动确认
            string result = _channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

            //设置后当所有的channel都关闭了连接会自动关闭
            //connection.AutoClose = true;
        }
        public void Dispose()
        {
            if (connection != null && connection.IsOpen)
                connection.Dispose();
        }
    }

上面一个很简单的消息队列的发送者和消费者,解释下基本的流程:

Publisher中调用send函数先创建一个连接到服务器,然后用该连接创建了一个channel,接着用该channel声明了一个hello的队列,最后向默认的交换机发送了一条消息。(exchange: "") 空字符串即为默认的交换机 ,消息的路由为hello ,默认的交换机是direct类型,根据路由名称完全匹配队列的名称。所有的队列都会绑定到默认的交换机上,路由名称就是队列的名称。所以默认的交换机将消息发送到名声为hello 的队列。紧接着Consumer中调用ReceiveMsg 函数从hello 队列获取消息,获取到消息后调用act函数通知broker该消息已经被成功地消费,broker将这条消息删除,如下图

网上有部分示例是使用QueueingBasicConsumer来创建消费者的,我发现在新版本中已经过时了,原因是它容易造成内存溢出性能降低等一系列的问题,简单说一下QueueingBasicConsumer的处理流程,它接收到消息之后会把消息塞到一个Queue队列中,然后用户来循环这个队列处理消息,但是如果你一个消息处理的很慢,而消息又发送过来的很快很大,就会造成队列里面存的消息越来越多,最终造成内存溢出。所以现在推荐使用EventingBasicConsumer或者继承DefaultBasicConsumer来创建消费者,事件驱动就不会有这个问题了

上面的代码需要注意以下几点:

  1. 想要通过guest账户指定ip连接需要修改loopback_users配置
  2. 我们调用QueueDeclare函数声明一个队列,如果设置了队列持久化,即使重启服务队列仍然在。如果不是持久化,即使消息全都被消费了,只要服务没有重启,队列仍然存在。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,所以要么删除队列要么重新命名一个队列,删除队列可以通过管理界面来删除或者调用QueueDelete函数。
  3. 队列如果存在声明一次就够了,如果多次声明了一样的队列将不会有任何异常,但是如果消费者绑定了一个不存在的队列是会发生异常的:**_channel.BasicConsume**,所以习惯是在Woker中将需要监听的队列先声明一遍
  4. 排他队列:大概意思就是通过连接connectionA声明一个排他队列之后,以后也只能通过连接connectionA来访问该队列,其他连接一旦访问就会报队列被锁定的错误,这个实在想不到应用场景
  5. 队列持久化代表的是重启服务后队列仍然在,想要队列里的消息仍然存在需要同时设置消息持久化,但是如果只设置消息持久化不设置队列持久话也没有意义。但这也并不一定能保证消息一定不会丢失。首先必须要有消息确认机制来保证消息一定被正确消费了。最主要的问题是消息写入到磁盘需要一定的时间,如果服务接收到消息没有来得及写入磁盘就挂掉了,那么这个消息就丢失了,对于这一点可以查询一下RabbitMQ集群相关的文章
  6. 默认发送的消息都需要消费者确认,可以通过设置autoAct为true来自动确认消息,也可以调用BasicAck函数确认,总之如果消息需要确认,一定要在消息处理完成之后进行确认,否则当消费者连接关闭之后未被确认的消息很快就会被退回。
  7. 我上面定义的消费者原本是想要多次实例化Receive来模拟多个消费者的,然而事实证明并不好用,想要模拟多个消费者还是需要打开多个程序
  8. EventingBasicConsumer的监听会创建一个前台线程一直在运行,所以在winform中如果关闭程序需要dispose掉connection占用的线程

轮询调度

轮询调度就是同时运行多个消费者,当任务数量很多的时候RabbitMQ会将消息分发给不同的消费者(Worker)来减轻压力,想要让RabbitMQ公平的分发任务,需要在worker中用以下代码来设置一个worker的最大未确认消息数量

代码语言:javascript
复制
channel.BasicQos(0, 1, false);

参数1就代表此Worker同时只会处理一条消息,如果当前的消息没有处理完毕(没有act),rabbitmq就会把剩下的任务发送给其他的worker,如果所有的worker都很忙,就需要排队了

绑定

上面的一个示例中我们用的是默认的交换机发送消息,我们可以通过给exchange赋值来使用指定的交换机,通过QueueBind将交换机与队列进行绑定

代码语言:javascript
复制
_channel.QueueBind("log1", "logs", "info");

声明一个交换机的代码如下

代码语言:javascript
复制
_channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false);

我们将队列log1绑定到了交换机:logs上,路由为info,交换机的类型为Direct,Direct代表的是路由完全匹配,现在我们向logs交换机发送一条消息,路由为info,队列log1就会接收到消息了

代码语言:javascript
复制
channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body);

队列和交换机的关系是多对多的,交换机的类型常用的有三个:Direct,Fanout,Topic,Headers

Direct:要求路由键完全匹配 Fanout:忽略路由键,给所有绑定到交换机上的队列都发送消息 Topic:模糊匹配,通过字母配合符号“*”和“#”来设置路由键 Headers:Headers类型用的比较少,它也忽略路由键,而是匹配交换机的headers,headers为键值对的hashtable,对publisher和consumer两边设置的header进行匹配,需要指定匹配的方式是 all还是any,具体代码可看github

下面展示了一个使用direct类型交换机的相关代码

代码语言:javascript
复制
public class LogDirectPub
{
    public void SendMsg(string message)
    {
        var factory = new ConnectionFactory() { HostName = "192.168.1.15", Port = 5672, UserName = "guest", Password = "guest" };
        //创建一个连接,连接到服务器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var props = channel.CreateBasicProperties();
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "logs", routingKey: "info", basicProperties: props, body: body);
                channel.BasicPublish(exchange: "logs", routingKey: "error", basicProperties: props, body: body);
                Console.WriteLine("发送消息{0}", message);
            }
        }
    }
}


public class LogDirectConsumer : IDisposable
{
    private static ConnectionFactory factory;
    private static IConnection connection;
    static LogDirectConsumer()
    {
        factory = new ConnectionFactory() { HostName = "localhost" };
    }
    public void ReceiveMsg(Action<string> callback)
    {
        if (connection == null || !connection.IsOpen)
            connection = factory.CreateConnection();
        IModel _channel = connection.CreateModel();
        _channel.ExchangeDeclare("logs", ExchangeType.Direct, false, false);
        _channel.QueueDeclare(queue: "log1", durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind("log1", "logs", "info");
        _channel.QueueBind("log1", "logs", "error");
        _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            callback($"log1Write.message:{ea.RoutingKey}:{message}");
            //模拟消息处理需要两秒
            Thread.Sleep(2000);
            _channel.BasicAck(ea.DeliveryTag, false);
        };
        string result = _channel.BasicConsume(queue: "log1", autoAck: false, consumer: consumer);
    }
    public void Dispose()
    {
        if (connection != null && connection.IsOpen)
            connection.Dispose();
    }
}

RabbitMQ Management HTTP API

RabbitMQ有一套自己的http/api,地址为http://192.168.1.15:15672/api,可以查询你想查的所有信息配置,通过这些api,我们可以自己实现RabbitMQ的监控管理,英文看的头痛,这里有一篇中文的翻译文档:http://www.blogjava.net/qbna350816/archive/2016/08/13/431575.html

这是一个获取所有队列的简单示例:

代码语言:javascript
复制
    string username = "guest";
    string password = "guest";
    string queuesUrl = "http://localhost:15672/api/queues";
    /// <summary>
    /// 查询所有队列
    /// </summary>
    /// <returns></returns>
    public string GetAllQuenes()
    {
        string jsonContent = GetApiResult(queuesUrl).Result;
        List<QueueModel> queues = JsonConvert.DeserializeObject<List<QueueModel>>(jsonContent);
        return JsonConvert.SerializeObject(queues);
    }

    private async Task<string> GetApiResult(string Url)
    {
        var client = new HttpClient();
        var passByte = Encoding.UTF8.GetBytes(string.Format("{0}:{1}", username, password));
        client.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(passByte));
        using (HttpResponseMessage response = await client.GetAsync(Url).ConfigureAwait(false))
        {
            string result = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
            return result;
        }
    }

自定义Consumer

之前说过用QueueingBasicConsumer会有性能问题,但是eventconsumer无法阻塞线程,对于某些需要阻塞线程的功能用起来不太方便,这时我们就可以自定义一个Consumer继承DefaultBasicConsumer,只需要实现其中的HandleBasicDeliver函数就可以了,下面是我定义的一个consumer,用来实现后面的Rpc客户端

代码语言:javascript
复制
public class QueueingConsumer : DefaultBasicConsumer
{
    private IModel _channel;
    private BasicDeliverEventArgs args = new BasicDeliverEventArgs();

    private AutoResetEvent argResetEvent = new AutoResetEvent(false);
    public QueueingConsumer(IModel channel)
    {
        _channel = channel;
    }
    public override void HandleBasicDeliver(string consumerTag,
       ulong deliveryTag,
       bool redelivered,
       string exchange,
       string routingKey,
       IBasicProperties properties,
       byte[] body)
    {
        args = new BasicDeliverEventArgs
        {
            ConsumerTag = consumerTag,
            DeliveryTag = deliveryTag,
            Redelivered = redelivered,
            Exchange = exchange,
            RoutingKey = routingKey,
            BasicProperties = properties,
            Body = body
        };
        argResetEvent.Set();
    }

    public void GetResult(Action<BasicDeliverEventArgs> callback)
    {
        argResetEvent.WaitOne();
        callback(args);
    }

}

RPC实现

Rpc是什么不用多说了,反正我也就知道他是远程过程调用嘛。用RabbitMQ来实现Rpc,官网有一篇简单的示例,但个人感觉RabbitMQ并不太适合做Rpc。不过用这个示例作为对RabbitMQ的一个学习成果实践还是蛮不错的,下面请看代码:

代码语言:javascript
复制
public class RpcPub
{
    public async Task<string> SendMsg(string message)
    {
        ConnectionFactory factory = RabbitMQHelper.ConFactory;
        //创建一个连接,连接到服务器:
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                //定义一个临时的队列,用来接收返回的消息
                string replyQueueName = channel.QueueDeclare().QueueName;
                var consumer = new QueueingConsumer(channel);
                //监听该临时队列,自动act消息
                channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);


                string corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                //定义ReplyTo让服务端知道返回消息给哪个路由
                props.ReplyTo = replyQueueName;
                //定义CorrelationId作为消息的唯一关联ID
                props.CorrelationId = corrId;

                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
                Task<string> result = new Task<string>(() =>
                {
                    while (true)
                    {
                        string replystr = string.Empty;
                        consumer.GetResult((args) =>
                        {
                            if (args.BasicProperties.CorrelationId == corrId)
                            {
                                replystr = Encoding.UTF8.GetString(args.Body);
                            }
                        });
                        if (replystr != string.Empty)
                            return replystr;
                    }
                });
                result.Start();
                return await result;
            }
        }
    }
}


public class RpcConsumer : IDisposable
{

    private ConnectionFactory factory = RabbitMQHelper.ConFactory;
    private IConnection connection;


    public void ReceiveMsg(Action<string> callback)
    {
        if (connection == null || !connection.IsOpen)
            connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();

        channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
        //channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, arg) =>
        {
            var props = arg.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;
            callback($"接收到消息:{Encoding.UTF8.GetString(arg.Body)}");
            var responseBytes = Encoding.UTF8.GetBytes($"成功接收你的消息:{ Encoding.UTF8.GetString(arg.Body)}");
            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
            channel.BasicAck(deliveryTag: arg.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

    }
    public void Dispose()
    {
        if (connection != null && connection.IsOpen)
            connection.Dispose();
    }

}
基本流程:

  1. 当客户端发送消息之前,创建一个匿名的回调队列channel.QueueDeclare(),并监听该队列。
  2. 客户端获取匿名队列的名称,在请求中设置2个属性:replyTo=回调队列名称;CorrelationId=请求关联的唯一id
  3. 客户端发送请求到rpc_queue队列中。
  4. RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理消息,返回结果发送到replyTo指定的队列,在请求中设置1个属性:CorrelationId=请求过来的CorrelationId
  5. 客户端监听的队列收到消息,检查correlationId是否与之前生成的匹配,匹配成功返回结果。
  6. 对于为什么要验证correlationId这一项,有两个原因,1.消息可能并不是rpc服务器发送的 2.rpc服务如果在某个阶段突然挂掉,可能会发送一个不包含correlationId的消息
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-05-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装与配置:
    • 服务相关命令
      • 后台管理
        • 默认端口:
          • 基本流程:
      • 发送消息
      • 轮询调度
      • 绑定
      • RabbitMQ Management HTTP API
      • 自定义Consumer
      • RPC实现
      相关产品与服务
      云服务器
      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档