rabbitMQ学习笔记
2017年12月31日星期日 Lee
环境:centos7
版本:rabbitmq-server-3.7.2-1
准备了3台主机做实验。先配置hosts如下。
cat /etc/hosts
192.168.5.71 node1
192.168.5.72 node72
192.168.5.73 node73
建议看下美团分享的rabbitmq基础:http://mp.weixin.qq.com/s/OABseRR0BnbK9svIPyLKXw
AMQP三个组件:交换器、队列、绑定
处理流程:消息 ---> 交换器 ---> Client
交换器的4种类型:
direct 根据路由键投递消息
fanout 广播方式,绑定到同一个exchange上的queue都能收到同样的msg。不需要路由键,如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃
topic 根据queue定义的topic名称来接收相应的msg
header (很少用,直接忽略它)
虚拟主机vhost:
每个vhost相当于一个mini的rabbitmq服务器,有独立的权限控制机制。不同的vhost可以存在同名的交换器和队列名。
在rabbitmq中,权限控制是以vhost为单位的。
rabbitmqctl [add_vhost|delete_vhost] vhost1 即可添加/删除一个名为vhost1的虚拟主机
消息确认机制:
RabbitMQ提供了transaction、confirm两种消息确认机制。
transaction即事务机制,手动提交和回滚;
confirm机制提供了Confirmlistener和waitForConfirms两种方式。
confirm机制效率明显会高于transaction机制,但transaction的优势在于强一致性。如果没有特别的要求,建议使用conrim机制。
1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
basicAck:成功消费,消息从队列中删除
basicNack:requeue=true,消息重新进入队列,false被删除
basicReject:等同于basicNack
basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer
发送方确认机制:
客户端发送请求(消息)时,在消息的属性(Message Properties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)。服务器端收到消息处理完后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
erlang节点:
erlang类似JVM,erlang节点能自动尝试启动所在节点的应用程序。
在node1上执行如下的操作步骤:
yum localinstall rabbitmq-server-3.7.2-1.el7.noarch.rpm erlang-19.3.6.5-1.el7.centos.x86_64.rpm -y
rpm -qpl rabbitmq-server-3.7.2-1.el7.noarch.rpm | less 可以列出rabbitmq的软件释放文件路径,如下:
/etc/rabbitmq
/usr/lib/rabbitmq/
/var/lib/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.7.2/rabbitmq.config.example /etc/rabbitmq/
cd /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
vim rabbitmq.config 开启部分注释后,具体取消注释的地方如下:
[
{rabbit,
[
{tcp_listeners, [5672]},
{loopback_users, []}
]}
].
systemctl start rabbitmq-server 或者使用rabbitmq-server -detached 这种启动方式
常用命令:
rabbitmqctl status 列出rabbitmq的运行状态,版本信息等非常多的运行参数
列出队列情况
[root@node1 /root ]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
查看当前exchanges的情况
[root@node1 /root ]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.fanout fanout
amq.match headers
amq.direct direct
amq.rabbitmq.log topic
amq.topic topic
amq.rabbitmq.trace topic
amq.headers headers
direct
查看bingding,刚安装服务的时候也没有binding
[root@node1 /root ]# rabbitmqctl list_bindings
Listing bindings for vhost /...
列出用户
[root@node1 /root ]# rabbitmqctl list_users
Listing users ...
guest [administrator]
列出vhosts
[root@node1 /root ]# rabbitmqctl list_vhosts
Listing vhosts ...
/
添加用户
[root@node1 /root ]# rabbitmqctl add_user ops 123456
Adding user "ops" ...
修改密码
[root@node1 /root ]# rabbitmqctl change_password ops 12345678
设置用户权限
[root@node1 /root ]# rabbitmqctl set_user_tags ops administrator
Setting tags for user "ops" to [administrator] ...
删除用户(会连带这个账户相关的访问控制条目都删掉)
[root@node1 /root ]# rabbitmqctl delete_user ops
Deleting user "ops" ...
列出有哪些插件
[root@node1 /root ]# rabbitmq-plugins list
安装管控页面【web管理页面】
rabbitmq-plugins enable rabbitmq_management
卸载插件
rabbitmq-plugins disable xxxxx
停止rabbitmq程序及节点(erlang)
rabbitmqctl stop
单独停止rabbitmq
rabbitmqctl stop_app
停止远程节点:
rabbitmqctl stop -n rabbit@[remote_hostname]
启动rabbitmq-server后执行rabbitmq-plugins enable rabbitmq_management安装web管理插件,然后在浏览器访问
http://192.168.5.71:15672/
默认用户名和密码都是guest
单节点很少用,因此这里一些命令就不说了, 统一放到后面集群环境下说。
在node72和node73上安装rabbitmq,启动rabbitmq进程,然后安装好rabbitmq_management插件。
rpm包安装的rabbitmq在 /var/lib/rabbitmq/ 路径下,有一个 .erlang.cookie的文件。
在node1上执行:
cd /var/lib/rabbitmq/
chmod u+w .erlang.cookie
scp .erlang.cookie root@node72:/var/lib/rabbitmq/
scp .erlang.cookie root@node73:/var/lib/rabbitmq/
然后执行 rabbitmqctl cluster_status 可以看到cluster_name 集群的名称。
在3台主机都上执行:
rabbitmqctl stop // 执行这个命令确保rabbitmq进程是停止状态的
rabbitmq-server -detached // 使用-detached参数后台启动tabbitmq
然后在node72和node73上执行如下命令,将其加入到node1的集群环境中:
rabbitmqctl stop_app
rabbitmqctl reset // (可选)第一次加入集群的新主机的话,建议执行下reset这个命令 清空节点的状态
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
这时候,再到node1上执行rabbitmqctl cluster_status 可以看到集群running node信息
再web节目也有显示了:
3.1 配置镜像队列:
在任意一个节点上执行:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' -p /
这个命令会将/ 这个vhost的所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致。
rabbitmqctl add_vhost vh4
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' -p vh4
这个命令会将vh4 这个vhost的所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致。
至此,RabbitMQ 高可用集群就已经搭建好了,最后一个步骤就是搭建均衡器
3.2 查看vhost名为vh3的全部镜像队列是否同步完成:
rabbitmqctl list_queues name pid slave_pids synchronised_slave_pids -p vh3
如上图,两个红色框内的信息一模一样,说明所有的slave队列里的数据都同步了。这样的话,我们才能放心的剔除某个节点且保证数据不丢失。
3.3 镜像队列节点宕机的情况:
如果镜像队列丢失了一个从节点,则对任何消费者都不会有影响。
主节点宕机,集群内部会重新选主。消费者需要重新附加到主队列。
rabbitmq要求集群中至少一个磁盘节点【为了提高性能,不需要全部节点都是disc的节点】,所以我们可以启动部分节点为RAM模式
当节点加入或者离开集群时,他们必须通知到至少一个磁盘节点。
如果集群内就1个磁盘节点,其余都是内存节点。如果这个唯一的一个磁盘节点宕机了,那么集群还是可以路由消息的。但是这个时候不支持如下操作:
创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点
以node72、node73为例。
step1、将node72 、node73从集群中移除
在node72上执行:
rabbitmqctl stop_app
rabbitmqctl reset
然后,执行下面命令,将节点再次加回集群中:
rabbitmqctl join_cluster --ram rabbit@node1 // 说明:我们也可以在一开始搭建集群的时候,就用 --ram参数添加节点
rabbitmqctl start_app
再次查看集群状态,如下图:
可以使用Haproxy、Nginx做4层负载均衡,如果使用的是阿里云环境 还可以直接使用使用阿里云的SLB服务。
配置文件: /etc/rabbitmq/rabbitmq.config
部分参数:
{vm_memory_high_watermark, 0.4} 控制rabbitmq允许消耗的内存占宿主机内存的的百分比。
{msg_store_file_size_limit, 16777216} 控制rabbitmq垃圾收集存储内容之前,消息存储数据库的最大大小。默认16MB
{queue_index_max_journal_entries, 32768} 在转储到消息存储数据库并提交前,消息存储日志中的最大条目数,默认32768条
具体的运行参数还有很多,官方文档都有很详细的说明,其实我们一般要改的参数也不多。
例子:
[root@node1 /root ]# rabbitmqctl add_user ops 123456 -- 添加一个普通账号ops
[root@node1 /root ]# rabbitmqctl add_vhost vhost1 -- 添加一个虚拟主机vhost1
[root@node1 /root ]# rabbitmqctl set_permissions -p vhost1 ops ".*" ".*" ".*" -- 给ops账户的vhost1虚拟主机授权:配置、写、读 的权限。.*表示匹配任意队列和交换器
[root@node1 /root ]# rabbitmqctl list_permissions -p vhost1 -- 列出vhost1当前开放的权限:
Listing permissions for vhost "vhost1" ...
ops .* .* .*
[root@node1 /root ]# rabbitmqctl set_permissions -p vhost2 ops "coupons-queue-.*" ".*" ".*"
[root@node1 /root ]# rabbitmqctl list_permissions -p vhost2
Listing permissions for vhost "vhost2" ...
ops coupons-queue-.* .* all
在rabbitmq的web控制台,看到的ops账户的权限如下:
[root@node1 /root ]# rabbitmqctl clear_permissions -p vhost1 ops -- 删除vhost1虚拟主机上ops账号的权限
[root@node1 /root ]# rabbitmqctl clear_permissions -p vhost2 ops -- 删除vhost2虚拟主机上ops账号的权限
说明:下面的命令实验的结果,是因为我在rabbitmq的web界面执行了一些入队、bind等系列操作后的结果
列出vhost1虚拟主机的全部队列情况(name、messages、consumers、memory情况)
[root@node1 /root ]# rabbitmqctl list_queues -p vhost1 name messages consumers memory
Timeout: 60.0 seconds ...
Listing queues for vhost vhost1 ...
msg-inbox-errors 7 0 55760
检查vhost1虚拟主机的队列属性
[root@node1 /root ]# rabbitmqctl list_queues -p vhost1 name durable auto_delete
Timeout: 60.0 seconds ...
Listing queues for vhost vhost1 ...
msg-inbox-errors true false -- 可以看出队列是持久化的,未设置自动删除
查看交换器和绑定情况
[root@node1 /root ]# rabbitmqctl list_exchanges -p vhost1
Listing exchanges for vhost vhost1 ...
amq.topic topic
amq.headers headers
amq.match headers
amq.direct direct
amq.rabbitmq.trace topic
amq.fanout fanout
direct
查看rabbitmq默认交换器的属性设置
[root@node1 /root ]# rabbitmqctl list_exchanges name type durable auto_delete
Listing exchanges for vhost / ...
amq.fanout fanout true false
direct true false
amq.rabbitmq.trace topic true false
amq.rabbitmq.log topic true false
amq.topic topic true false
amq.headers headers true false
amq.direct direct true false
amq.match headers true false
查看绑定信息
[root@node1 /root ]# rabbitmqctl list_bindings -p vhost1
Listing bindings for vhost vhost1...
exchange msg-inbox-errors queue msg-inbox-errors []
exchange q1 queue q1 []
amq.fanout exchange q1 queue []
test_change exchange msg-inbox-errors queue []
## 上面的q1、msg-inbox-errors队列是我在rabbitmq管理界面手动绑定的。test_change是我人工创建的一个交换器
显示broker的状态
rabbitmqctl environment
查看rabbitmq的运行详情(结果特别细致)
rabbitmqctl status
列出consumer
list_consumers
列出channel
list_channels
列出连接信息
list_connections
默认日志文件的路径是 /var/log/rabbitmq/
rabbitmq的启动日志类似下图这样:
此外,这个文件还记录了执行了哪些的管理操作,连接建立及断开等很多运行状态情况。
切割日志命令:
rabbitmqctl rotate_logs
rabbitmqctl reset命令:
reset 命令在节点为单机状态和是集群的一部分时行为有点不太一样。
节点单机状态时,reset 命令将清空节点的状态,并将其恢复到空白状态。当节点是集群的一部分时,该命令也会和集群中的磁盘节点通信,告诉他们该节点正在离开集群。
这很重要,不然,集群会认为该节点出了故障,并期望其最终能够恢复回来,在该节点回来之前,集群禁止新的节点加入。
官方文档:http://www.rabbitmq.com/heartbeats.html
https://www.cnblogs.com/Tommy-Yu/p/5775852.html
http://blog.csdn.net/jiao_fuyou/article/details/23186407
http://www.linuxidc.com/Linux/2014-01/95715.htm
心跳案例:http://holys.im/2016/09/20/why-rabbitmq-message-not-consumed/
心跳超时间隔
The heartbeat timeout value defines after what period of time the peer TCP connection should be considered dead by RabbitMQ and client libraries. This value is negotiated between the client and RabbitMQ server at the time of connection. The client must be configured to request heartbeats. In RabbitMQ versions 3.0 and higher, the broker will attempt to negotiate heartbeats by default (although the client can still veto them). The timeout is in seconds, and default value is 60 (580 prior to release 3.5.5).
心跳超时值决定了tcp相互连接的最大时间, 超过了这个时间, 该连接即被RMQ和客户端视为丢失(dead)。 这个值在客户端和服务器建立连接的时候协商确定。客户端需配才能发心跳包。 RMQ3.0及以上版本, RMQ将试着将beatheart协调为默认值(客户端可以否决这个值)。 超时时间单位为秒,默认值为60( 3.5.5发布版之前是580)。
Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable. Different clients manifest this differently but the TCP connection will be closed. When a client detects that RabbitMQ node is unreachable due to a heartbeat, it needs to re-connect.
心跳包每半个超时时间发送一次。 丢失了两个心跳包, 连接被认为不可抵达。 不同的客户端有不同的提示, 但tcp连接都会被关闭。 当客户端检测到RMQ节点不可抵达(根据心跳判定), 它需要重新连接(到服务器)。
Heartbeats can be disabled by setting the timeout interval to 0. This is not a recommended practice.
心跳机制可以被禁用:设定超时间隔为0。但是不建议这样设置。
级联复制:rabbitmq_shovel插件(用在跨IDC间的复制,一般情况下,用的很少)
安装方法:
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
官方介绍很详细。这里略过。
官方文档:http://www.rabbitmq.com/management-cli.html
wget https://raw.githubusercontent.com/rabbitmq/rabbitmq-management/v3.7.2/bin/rabbitmqadmin
chmod +x rabbitmqadmin
mv rabbitmqadmin /usr/local/bin
rabbitmqadmin --help
rabbitmqadmin -V "/" list queues
rabbitmqadmin -V "/" list exchanges
在默认的vhost / 下面 创建一个名为cli_test的exchange
rabbitmqadmin -V "/" -u guest -p guest declare exchange name=cli_test type=topic
列出连接列表
rabbitmqadmin list connections names
关闭消费者
rabbitmqadmin close connection name="127.0.0.1:11111"
1、使用 zabbix监控。
2、prometheus的监控的配置部署稍有复杂(需要在rabbitmq上额外的安装plugin)。
3、使用rabbitmq自带的REST API来采集监控(监控配置项的正确性,如是否配置了持久化、是否配置了镜像队列)