“ RockeMQ是阿里巴巴自主研发的消息中间件,实现业务削峰,分布式事务等,已捐献给Apache,成为MQ家族的重要成员。”
01
—
安装RocketMQ
RocketMQ集群的部署可以分为很多种模式:
nameserver可以多个;
broker也可以部署多个:两主两从、两主无从等,本文采用两主无从的模式;
# 下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
# 解压
unzip rocketmq-all-4.6.0-bin-release.zip
# 重命名
mv rocketmq-all-4.6.0-bin-release/ rocketmq
# 配置环境变量,三个节点都需要配置!!!
vim .bash_profile
export ROCKETMQ=/opt/install/rocketmq/
export PATH=$PATH:$ROCKETMQ/bin
source .bash_profile
# 修改日志文件位置
sed 's#${user.home}#/opt/install/rocketmq#g' *.xml -i
# 分发到其他节点
scp -P 12324 -r rocketmq node02:$PWD
scp -P 12324 -r rocketmq node03:$PWD
在node01上启动nameserver
nohup sh /opt/install/rocketmq/bin/mqnamesrv >/dev/null 2>&1 &
# broker配置
vim runbroker.sh
# 默认 ${JAVA_OPT} -server -Xms 8g -Xmx2 8g -Xmn 4g
# 可能机器内存不足,导致启动不起来,根据机器配置调小点
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
# 修改配置文件,这里修改conf/2m-noslave/.properties
# 注:另一个节点是node03
brokerName=node02
# 0 (master) 非0 slave
brokerId=0
brokerRole=ASYNC_MASTER
#所属集群名字
brokerClusterName=rocketmq-cluster
#nameServer地址,分号分割
namesrvAddr=10.255.20.147:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/opt/install/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/opt/install/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/opt/install/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/install/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/install/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/opt/install/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#刷盘方式
flushDiskType=ASYNC_FLUSH
# 启动broker node02
nohup sh /opt/install/rocketmq/bin/mqbroker -c /opt/install/rocketmq/conf/2m-noslave/node02.properties >/dev/null 2>&1 &
# 启动broker node03
nohup sh /opt/install/rocketmq/bin/mqbroker -c /opt/install/rocketmq/conf/2m-noslave/node03.properties >/dev/null 2>&1 &
# rocketmq 控制台安装
# 注:需要安装git、maven
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals-maste/rocketmq-console/src/main/resources/
vim application.properties
# 设置后台访问端口
server.port=8080
# nameserver配置
rocketmq.config.namesrvAddr=10.255.20.147:9876
cd rocketmq-externals\rocketmq-console
# maven打包
mvn clean package -Dmaven.test.skip=true
前台启动:
java -jar rocketmq-console-ng-1.0.1.jar
后台启动:
nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=10.255.20.147:9876 >/dev/null 2>&1 &
02
—
RockMQ架构
RocketMQ是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
由四个组成部分:
NameServer:提供轻量级的服务发现和路由。每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速的存储扩展;
Broker:提供轻量级的 TOPIC 和 QUEUE 机制来照顾消息存储。它们支持“推和拉”模型,包含容错机制(2个或3个副本),并提供强大的峰值填充功能和按其原始时间顺序累积数千亿条消息的能力。此外,代理提供故障恢复,丰富的指标统计信息和警报机制;
每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有Name Server;
Broker 分为 Master 与 Slave;
Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId 为0 表示Master,非0 表示Slave;
Producer:支持分布式部署。分布式生产者通过多种负载均衡模式将消息发送到Broker集群。发送过程支持快速失败(如何避免消息丢失)并且延迟低。
Consumer:消费者也支持Push和Pull模型中的分布式部署。它还支持集群使用和消息广播。它提供了实时消息订阅机制,可以满足大多数消费者的需求。
Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系非常松散。具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息。类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。
Tag,换句话的意思就是子主题,为用户提供了额外的灵活性。有了Tag,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能。
GroupName,代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。
Queue,是数据分片的产物,首先,一个Topic分布在一个Broker上的子集定义为一个Topic分片,将每个分片在划分成多个Queue,每个Topic分片等分的Queue的数量可以不同,由用户在创建Topic时指定。
RocketMQ的负载均衡策略规定:Consumer数量应该小于等于Queue数量,如果Consumer超过Queue数量,那么多余的Consumer 将不能消费消息。
在一个Consumer Group内,Queue和Consumer之间的对应关系是一对多的关系:一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue。这样的分配规则,每个Queue只有一个消费者,可以避免消费过程中的多线程处理和资源锁定,有效提高各Consumer消费的并行度和处理效率。
Queue是Topic在一个Broker上的分片等分为指定份数后的其中一份,是负载均衡过程中资源分配的基本单元。
03
—
关键特性
消息持久化
刷盘策略
RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。
顺序消息查询
可以根据key查询和根据msgId查询 顺序消息
MQ提供一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:
分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费;
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费;
以上是RocketMQ的顺序消息的实现,将不同订单路由到不同的分区中,而消费端一个分区只有一个线程进行消费,从而保证消费的顺序。不过,也存在一些缺陷,如下:
1、发送顺序消息无法利用集群 FailOver 特性;
2、消费顺序消息的并行度依赖于队列数量;
3、队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
4、遇到消息失败的消息,无法跳过,当前队列消费暂停;
发送消息负载均衡
订阅消息负载均衡
并行消费:指定滑动窗口,可以有多个线程并行消费; 消息重试:默认16次,进入死信队列; HA,Master和Slave之间复制,支持同步、异步方式;