消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不同平台之间进行通信,常用来屏蔽各种平台协议之间的特性,实现应用程序之间的协同。优点在于能够在客户端和服务器之间进行同步和异步的连接,并且在任何时刻都可以将消息进行传送和转发,是分布式系统中非常重要的组件,主要用来解决应用耦合、异步通信、流量削峰等问题。
P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。
P2P的特点:
Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点:
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
消息队列的使用场景是怎样的?
对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如: 1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何防止丢失? 2)如何降低发送者和接收者的耦合度? 3)如何让Priority高的接收者先接到数据? 4)如何做到load balance?有效均衡接收者的负载? 5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。 6)如何做到可扩展,甚至将这个通信模块发到cluster上? 7)如何保证接收者接收到了完整,正确的数据? AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。
RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
下载地址:http://www.rabbitmq.com/download.html
下载erlang:http://www.erlang.org/download/otp_win64_17.3.exe
安装:
erlang安装完成。
RabbitMQ安装完成。
启动、停止、重新安装等。
第一步:点击打开RabbitMQ的命令窗口。如图:
第二步:输入命令rabbitmq-plugins enable rabbitmq_management
这个命令的意思是安装RabbitMQ的插件。
第三步:测试是否安装成功。
方法:访问地址:http://127.0.0.1:15672/
默认账号:guest/guest
添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -ivh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
yum install erlang
1、用 yum 安装 RabbitMQ
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# this example assumes the CentOS 7 version of the package
yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
2、用 rpm 手动安装
下载:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
上传rabbitmq-server-3.7.13-1.el7.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm
几个常用命令:
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
chkconfig rabbitmq-server on //设置开机自启
设置配置文件:
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
设置用户远程访问:
vim /etc/rabbitmq/rabbitmq.config
去掉后面的逗号
开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
防火墙开放15672端口(CentOS7 不用操作)
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
1、界面的介绍
注意设置虚拟主机与添加用户这块。
命令行添加用户,设置tags
rabbitmqctl list_users
rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
关于虚拟主机,Virtual Host,其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
用户角色有下面几种:
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
无法登陆管理控制台,通常就是普通的生产者和消费者。
在Mac下安装RabbitMQ是非常简单的,一般默认RabbitMQ服务器依赖的Erlang已经安装,只需要用下面两个命令就可以完成RabbitMQ的安装(前提是homebrew已经被安装):
brew update
brew install rabbitmq
耐心等待,安装完成后需要将/usr/local/sbin添加到$PATH,可以将下面这两行加到~/.bash_profile:
# RabbitMQ Config
export PATH=$PATH:/usr/local/sbin
编辑完后:wq保存退出,使环境变量立即生效。
source ~/.bash_profile
上面配置完成后,需要关闭终端窗口,重新打开,然后输入下面命令即可启动RabbitMQ服务:
rabbitmq-server
浏览器输入localhost:15672
,账号密码全输入guest即可登录。
启动监控管理器:rabbitmq-plugins enable rabbitmq_management 关闭监控管理器:rabbitmq-plugins disable rabbitmq_management 启动rabbitmq:rabbitmq-service start 关闭rabbitmq:rabbitmq-service stop 查看所有的队列:rabbitmqctl list_queues 清除所有的队列:rabbitmqctl reset 关闭应用:rabbitmqctl stop_app 启动应用:rabbitmqctl start_app
添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username “.*” “.*” “.*”
(后面三个”*”代表用户拥有配置、写、读全部权限)
消息中间件RabbitMQ,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息异步。以下将介绍RabbitMQ+HA方式进行部署。
RabbitMQ是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署Rabbitmq分布式集群时要先安装Erlang,并把其中一个服务的cookie复制到另外的节点。
RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,一般的,均应建立为磁盘节点,为了防止机器重启后的消息消失;
RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制。
普通模式下,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。
镜像模式下,将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。
多台机器部署RabbitMQ的cluster,
1、所有节点需要再同一个局域网内;
2、所有节点需要有相同的 erlang cookie,否则不能正常通信,为了实现cookie内容一致,采用scp的方式进行。
3、准备三台虚拟机,配置相同
rabbitmq01 192.168.101.11
rabbitmq02 192.168.101.12
rabbitmq03 192.168.101.13
操作系统:centos7.5
node1 192.168.101.11
node2 192.168.101.12
node3 192.168.101.13
1、安装erlang
安装依赖包
yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
编译安装
wget http://erlang.org/download/otp_src_21.3.tar.gz
tar -zxvf otp_src_21.3.tar.gz
cd otp_src_21.3
./configure --prefix=/usr/local/bin/erlang --without-javac
make && make install
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile
出现 erl 命令则说明安装成功;
2、安装rabbitmq
编译安装
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz
yum install -y xz
xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.15.tar -C /usr/local/bin/
echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile
source /etc/profile
3、导入 rabbitmq 的管理界面
rabbitmq-plugins enable rabbitmq_management
4、设置 erlang
找到erlang cookie文件的位置,官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie
;第二个地方就是/var/lib/rabbitmq/.erlang.cookie
。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie
。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq
目录下。
这里将 node1 的该文件复制到 node2、node3,注意这个文件的权限是 400(默认即是400),因此采用scp的方式只拷贝内容即可;
可以通过cat $home/.erlang.cookie
来查看三台机器的cookie是否一致,设置erlang的目的是要保证集群内的cookie内容一致。
使用-detached参数运行各节点
rabbitmqctl stop
rabbitmq-server -detached
然后可以通过 rabbitmqctl cluster_status
查看节点状态。
注意:要先拷贝cookie到另外两台机器上,保证三台机器上的cookie是一致的,然后再启动服务。
由于guest这个用户,只能在本地访问,所以我们要新增一个用户并赋予权限:
添加用户并设置密码:
rabbitmqctl add_user admin 123456
添加权限(使admin用户对虚拟主机“/” 具有所有权限):
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用户角色(加入administrator用户组)
rabbitmqctl set_user_tags admin administrator
然后就可以远程访问了,然后可直接配置用户权限等信息。到此,就可以通过http://ip:15672 使用admin 123456 进行登陆了。
到这里的话,每个节点是作为单独的一台RabbitMQ存在的,也可以正常提供服务了
rabbitmq-server 启动时,会一起启动节点和应用,它预先设置RabbitMQ应用为standalone模式。要将一个节点加入到现有的集群中,你需要停止这个应用,并将节点设置为原始状态。如果使用./rabbitmqctl stop,应用和节点都将被关闭。所以使用rabbitmqctl stop_app仅仅关闭应用。
1、将 node2、node3与 node1 组成集群,这里以node2为例
node2# rabbitmqctl stop_app
node2# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字一定不要写错了
node2# rabbitmqctl start_app
2、将node3重复上述操作,也加入node1的集群。
node3# rabbitmqctl stop_app
node3# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字一定不要写错了
node3# rabbitmqctl start_app
则此时 node2 与 node3 也会自动建立连接,集群配置完毕;
#使用内存节点加入集群
node2 # rabbitmqctl join_cluster --ram rabbit@node1
3、在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status
来查看是否集群配置成功。
node3# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
{running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
{cluster_name,<"rabbit@node1">}, #集群的名称默认为 rabbit@node1
{partitions,[]},
{alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
4、也可通过在web页面上的“Queues”的列表中,查看有如下显示为“同步镜像到node2”,则也表示集群配置成功
5、设置镜像队列策略
在任意一个节点上执行如下操作(这里在node1上执行)
首先,在web界面,登陆后,点击“Admin–Virtual Hosts(页面右侧)”,在打开的页面上的下方的“Add a new virtual host”处增加一个虚拟主机,同时给用户“admin”和“guest”均加上权限(在页面直接设置、点点点即可);
然后,在linux中执行如下命令
rabbitmqctl set_policy -p coresystem ha-all "^" '{"ha-mode":"all"}'
“coresystem” vhost名称, “^”匹配所有的队列, ha-all 策略名称为ha-all, ‘{“ha-mode”:”all”}’ 策略模式为 all 即复制到所有节点,包含新增节点。
则此时镜像队列设置成功。(这里的虚拟主机coresystem是代码中需要用到的虚拟主机,虚拟主机的作用是做一个消息的隔离,本质上可认为是一个rabbitmq-server,是否增加虚拟主机,增加几个,这是由开发中的业务决定,即有哪几类服务,哪些服务用哪一个虚拟主机,这是一个规划)。
6、镜像队列策略设置说明
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。完成这 6 个步骤后,RabbitMQ 高可用集群搭建完成,最后一个步骤就是搭建均衡器。
7、安装并配置负载均衡器HA
注意:如果使用阿里云,可以使用阿里云的内网slb来实现负载均衡,不用自己搭建HA。
1、在192.168.101.11安装HAProxy
yum -y install HAProxy
2、修改 /etc/haproxy/haproxy.cfg
vim /etc/haproxy/haproxy.cfg
global
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/run/haproxy.pid
maxconn 4000
user haproxy
group haproxy
daemon
stats socket /var/lib/haproxy/stats
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 120s
srvtimeout 120s
listen rabbitmq_cluster 192.168.101.11:5670
mode tcp
balance roundrobin
server rabbit1 192.168.101.11:5672 check inter 5000 rise 2 fall 2
server rabbit2 192.168.101.12:5672 check inter 5000 rise 2 fall 2
3、重启HAProxy
service haproxy restart
登录浏览器输入地址http://192.168.101.11:8100/rabbitmqstats查看HAProxy的状态
三、常见问题
常见错误:
1、使用 rabbitmq-server -detached命令启动rabbitmq时,出现以下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。
2、由于更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概如下
Cluster status of node rabbit@web2 ...
Error: unable to connect to node rabbit@web2: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@web2]
rabbit@web2:
* connected to epmd (port 4369) on web2
* epmd reports node 'rabbit' running on port 25672
* TCP connection succeeded but Erlang distribution failed
* Hostname mismatch: node "rabbit@mq2" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@mq2"
current node details:
- node name: 'rabbitmq-cli-11@web2'
- home dir: /root
- cookie hash: SGwxMdJ3PjEXG1asIEFpBg==
此时先ps aux | grep mq
,然后kill -9
该进程,然后再rabbitmq-server -detached
即可解决。(即先强杀,再重新启动)
3、使用rabbitmqctl stop
,rabbitmq-server -detached
重新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还需要重新添加。
采用脚本启动,在脚本中写好启动好需要加载的各配置项(创建admin用户并授权,创建虚拟主机并授权,配置镜像队列)。
4、命令
rabbitmqctl stop_app #仅关闭应用,不关闭节点
rabbitmqctl start_app #开启应用
rabbitmq--server -detached #启动节点和应用
rabbitmqctl stop #关闭节点和应用
4、常用命令:
Rabbitmq服务器的主要通过rabbitmqctl和rabbimq-plugins两个工具来管理,以下是一些常用功能。
1、 服务器启动与关闭
启动: rabbitmq-server –detached
关闭: rabbitmqctl stop
若单机有多个实例,则在rabbitmqctl后加 –n 指定名称
2、插件管理
开启某个插件:rabbitmq-plugins enable xxx
关闭某个插件:rabbitmq-plugins disable xxx
注意:重启服务器后生效。
3、virtual_host管理
新建virtual_host:rabbitmqctl add_vhost xxx
撤销virtual_host:rabbitmqctl delete_vhost xxx
4、用户管理
新建用户:rabbitmqctl add_user xxxpwd
删除用户: rabbitmqctl delete_user xxx
查看用户:rabbitmqctl list_users
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctlset_user_tags {username} {tag ...}
Tag可以为 administrator,monitoring, management
5、 权限管理
权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
Vhostpath: Vhost路径
user: 用户名
Conf: 一个正则表达式match哪些配置资源能够被该用户访问。
Write: 一个正则表达式match哪些配置资源能够被该用户读。
Read: 一个正则表达式match哪些配置资源能够被该用户访问。
6、获取服务器状态信息
服务器状态:rabbitmqctl status ##其中可查看rabbitmq的版本信息
7、获取集群状态信息
rabbitmqctl cluster_status