消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。是我们开发过程中经常使用的中间件,有必要对其进行一定的了解,方便在使用中快速定位解决问题。
消息队列一般拥有以下作用:
我们先来看下RocketMq的领域模型
服务器上部署的RocketMq进程一般称之为Broker
,Broker会接收Producer的消息,持久化到本地,然后push给Consumer,通常使用集群部署。主从之间会有数据同步
路由服务,类似与dubbo中的注册中心zk,它存储了Broker的路由信息,供Producer和Consumer使用,不然Producer怎么知道往哪个Broker发送消息。多个NameSever之间没有通信,每个NameSever都会保存所有路由信息。
生产者,即发送消息的一方,往Broker中写入数据
消费者,即消费消息的一方,从Broker中获取数据
Topic
翻译过来就是主题的意思,但它其实是个抽象概念,我们可以理解成数据集合
,比如订单系统有一个Topic叫topic_order_info
,这个Topic里面就是订单系统投递的订单信息,如果其他系统想要获取订单信息,就可以从这个Topic中获取。
MessageQueue即消息队列,在创建Topic的时候会让我们指定MessageQueue的数量,简单来说就是指定Topic中的队列数量。 那么MessageQueue到底是什么呢?这个问题要和Topic、Broker一起来看,大家想一想Topic在Broker中是如何存储的?要知道Broker是集群部署的,如果我们有2个Broker,那Topic中的数据哪些存储在这个Broker,哪些存储在另一个Broker呢?所以RocketMq引入了MessageQueue的概念,本质上是一个数据分片机制。 比如一个Topic指定了4个MessageQueue,该Topic有1W个消息,那么这1W个消息会均匀分配在4个MessageQueue中(实际是根据分配策略),而这4个MessageQueue又是放在Broker上的,一个Broker上存储2个MessageQueue。
了解了RocketMq的基本概念之后,我们再来看下它的架构,如图所示:
这是一个常见的RocketMq部署架构,所有节点都使用集群部署,Broker还是用了主从架构。 相信看到这里大家一定会有很多疑问,没关系,我们接着一步步分析。
看了这个架构图大家想必肯定会有很多疑问,没关系,我们来逐步分析。
首先是NamerServer,它的作用是注册Broker的路由信息,那么它是怎么和Producer、Broker、Consumer之间通信的呢?
Broker会和每个NameSever建立TCP长链接,每隔30s发送心跳到所有NamerServer,每个NameSever会每隔10s检查一次有哪些Broker超过120s没有发送心跳的,如果有,就认为该Broker已宕机,从路由信息中删除。
Producer如果要向Broker发送信息,必须要知道Broker的路由信息如(ip.port),所以需要和NameSever(随机选择)建立一个TCP长链接来获取Broker路由信息。然后就可以根据Topic知道要向哪几台Broker发送消息,再根据负载均衡算法选出一台Broker。
前面说到Broker是我们部署在服务器上的RocketMq进程,它起到了存储消息,接收、分发消息的作用,非常非常重要,所以我们要保重它的高可用。 一般Broker会采用主从+集群部署,如下图:
上图中我们有2个Broker服务,每个Broker又分为主从Broker,主从之间会有数据同步。 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。 Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。Producer完全无状态,可集群部署。
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Producer首先通过NameSever获取指定Topic的Broker路由信息,比如一个Topic有哪些MessageQueue,MessageQueue在哪几台Broker上,Broker的ip.port等等。
上图中,一个Topic有4个MessageQueue,分别分布在2台Broker上,然后Producer会把消息均匀发送(实际根据负载均衡策略)消息到4个MessageQueeu中。 这样就可以把生产者的请求分散给不同的Broker,假设一台Broker可以抗7W并发,那2台Broker就可以实现RocketMq集群抗下每秒10W+的并发。
如上图中,如果第二个Master Broker宕机了,虽然Slave Broker会自动切换成Master,但在切换的过程中该Broker服务是不可用的。
对于这个问题,在Producer中有一个开关sendLatencyFaultEnable
,这个开启后会有一个容错机制,比如某次访问一个Broker有500ms延迟还无法访问,那么接下来就会回避访问
该Broker一段时间,比如3000ms内不再访问该Broker,避免消息打到故障的Broker上。
参考:生产者
消息持久化是RocketMq最核心的环节,它决定了生产者消息写入的吞吐量,决定了消息会不会丢失,消费者获取消息的吞吐量。
Broker的消息持久化依赖于两个文件CommitLog
和ConsumeQueue
。
当Broker收到一条消息后,首先会把该消息写入磁盘文件CommitLog
,顺序写入哦。
CommitLog里的内容如下:
这个CommitLog
是很多磁盘文件,每个文件最多1GB,当一个文件写满之后,就新建一个。
现在我们的消息已经持久化在了磁盘上,但是有一个问题,当消费者要消费一条消息时,它怎么知道从CommitLog
中具体获取哪个消息呢?
这时就用到另一个磁盘文件ConsumeQueue
,在Broker中,每个MessageQueue
都有一系列ConsumeQueue文件,如:
$HOME/store/consumequeue/{topic}/{queueid}/{filename}
queueid就是对应MessageQueue,这个ConsumeQueue文件存储的就是一条消息在CommitLog中的偏移量,看到这里是不是有的懵逼,到底什么意思呢? 其实就是当Broker收到一条消息后,会把消息在CommitLog中的物理位置,也就是一个文件偏移量,记录在对应的MessageQueue的ConsumeQueue文件中。(可以理解为List中的角标索引)
ConsumeQueue文件里面的内容长这样:
ConsumeQueue文件里一条数据是20个字节,最多存放 30W条数据,文件最大5.72MB,存放满了就新建一个。
所以实际上Topic的每个 MessageQueue 都对应了 Broker 机器上的多个 ConsumeQueue,保存了 MessageQueue 的消息在 CommitLog 中的物理位置,也就是偏移量。
所以最终的存储模型大概是这样的:
RocketMq 的性能是非常高的,久经考验。那么它是怎么保证的呢? 上面我们说到过,消息持久化决定了 RocketMq 的吞吐量,但它是个磁盘写入操作,为什么性能还会这么好呢?
其实 Broker 是基于IO mmap技术,也就是 PageCache 和顺序写 两个机制来实现的。 首先 Broker 是顺序把消息写入 CommitLog 的,每次只在末尾追加,要比随机写性能高很多。 另外,数据写入 CommitLog 的时候,不是直接写入磁盘的,而是先写入OS的 PageCache 内存缓存中,然后由OS另起一个后台线程,异步将 PageCache 的数据刷入磁盘中。
所以采用 PageCache + 顺序写 +异步刷盘 的方式才能保证消息写入 CommitLog 的性能跟写入内存的性能是差不多的。
其他方面的性能优化可以参考这篇文章:阿里中间件博客 参考:持久化
首先我们要了解一个消费组的概念,即一群消费者组成的组。
上图中,一个Topic有两个消费组来消费,Broker 会分别发送消息到这两个组中,根据订阅规则(集群模式、广播模式)来决定是组里的每台机器都消费还是只有一台来消费。
RocketMq有两种订阅规则:集群模式、广播模式。 集群模式就是一条消息一个消费组里只会有一台机器会去消费。 广播模式就是一条消息一个消费组里每一台机器都会去消费。
集群模式的原理就是:RocketMq有一个 MessageQueue 分配算法,默认会把 MessageQueue 平均分配给每个Consumer,并且一个 MessageQueue 只会分配给一个 Consumer,来保证一个消息只能有一个 Consumer 消费。
RocketMq提供了2种消费方式:PUSH和PULL,但实际上,这两种方式本质上是一样的,都是消费者主动去 Broker 拉取消息。 Push模式其实也是消费者主动去 Broker 拉取消息,只不过它实时性很高,就像 Broker 在 Push 一样。它是通过长轮询来实现的,当消费者发起请求到 Broker ,如果没有消息的话,就会把线程挂起(默认15秒),在此期间会有一个后台线程每隔一段时间就去检查一下是否有新的消息,如果有,就唤起线程。
第一次消费,Broker 先通过 ConsumerQueue 文件找到第一个消息的offset,然后通过 offset 去CommitLog中获取具体的消息。并且消费者向 Broker 提交一个消费进度。 第二次,Broker 根据消费进度,从ConsumerQueue 文件获取 offset,再通过 offset 获取CommitLog。
参考:消费者
同步、异步刷盘
Broker(主从同步是怎么做的?消费者从master还是slave获取?会有主从不一致问题吗?如何存储到磁盘上的?) Producer (如何向Broke发消息的?) Consumer (如何拉取消息的?)
哪些情况会导致消息重复? 生产者重复投递 消费者同步消费进度出问题 哪些情况会导致消息丢失?怎么排查?怎么保证0丢失?