RocketMQ基本架构图如下
从这个架构图上我们可以知道,RocketMQ有4块核心部分:
NameServer
:管理Broker的信息,让使用MQ的系统感知到集群里面的broker
Broker
:主从架构实现数据多副本存储和高可用
producer
:生产者
consumer
:消费者
每台broker机器需要向所有的NameServer机器上注册自己的信息,防止单台NameServer挂掉导致Broker信息不全,保证NameServer的集群高可用。
基于Netty的网络通信。
broker会每隔30秒向NameServer发送一个的心跳 ,NameServer收到一个心跳会更新对应broker的最近一次心跳事件,然后NamServer会每隔十秒运行一个任务,去检查一下各个broker的最近一次心跳的时间,如果超过120s没有收到相应broker的心跳,则判定对应的broker已经挂掉。
为了保证MQ的数据不丢失而且具备一定的高可用性,我们采用的是主从复制模式。
RocketMQ自身的Master-Slave模式主采取的是Slave主动从Master拉取消息。
对系统会存在一点影响,但是影响不大,只不过少了Slave Broker,会导致所有的读写压力都集中在Master Broker上
选举方式这里不做重点介绍。
我们先看看Topic、Broker、Message之间的关系。
如图比如说一个TopicA有n条消息,然后一个TopicA中的n条数据分配放入给4个MessageQueue1-4。
所以本质上来说就是一个数据分片机制,通过MessageQueue将一个Topic的数据拆分为很多数据分片,在每个Broker机器上都存储一些MessageQueue。通过这个方法可以实现分布式存储。
因为从前面我们知道,生产者会跟NameServer通信获取相应Topic的路由数据,从而知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上,通过对应的规则写入对应的MessageQueue。
当MasterBroker宕机,此时SlaveBroker正在切换过程中,有一组Broker就没有Master可以写入。
此时我们可以打开Producer的自动容错机制开关:sendLatencyFaultEnable,比如说访问其中一个Broker发现网络延迟有1000ms还无法访问,我们会自动回避这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker。
过一段时间之后,MasterBroker修复好了,或者说SlaveBroker选举成功了,就可以提供给别人访问了。
Broker数据存储实际上是MQ最核心的环节:
首先,Producer发送消息给Broker,Broker接收到消息后,把这个消息直接顺序写入写入到磁盘上的一个日志文件,叫做CommitLog。
在Broker中,每一个Topic下的每一个MessageQueue都会有对应一系列的ConsumeQueue文件。
Broker磁盘存储类似于文件树的形式存在:
ConsumeQueue中存储着对应MessageQueue中的消息在CommitLog中的物理偏移量地址offset。
如图:
即:Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,这些ConsumeQueue共同组成保存了MessageQueue的所有消息在CommitLog文件中的物理offset偏移量。
磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略
如图:
这样的话基本上可以让消息写入CommitLog的性能跟直接写入内存里面是差不多的,所以Broker才能具有高吞吐量。
对于日志类型这种场景,可以允许数据的丢失,但是要求比较高的吞吐量,可以采用异步刷盘的方式。另外非核心的业务场景,不涉及重要核心数据变更的场景,也可以使用异步刷盘,比如订单支付成功,发送短信这种场景。但是对于涉及到核心的数据变更的场景,就需要使用同步刷盘,比如订单支付成功后扣减库存。
原则:一个Consumer机器可以消费处理多个MessageQueue,一个MessageQueue只能被一个相同ConsumerGroup中的同一个Consumer消费。
Broker收到消息拉取请求后,会找到对应的MessageQueue中开始消费的位置,在ConsumeQueue读取里面对应位置的的消息在CommitLog中的offset
如图:
生产者是基于os cache提升写性能的,broker收到一条消息,会写入CommitLog文件,但是会先把CommitLog文件中的数据写入os cache(操作系统管理的缓存中),然后os开启后台线程,异步的将os cache缓存中的CommitLog文件的数据刷入磁盘。
在消费者消费信息的时候:
第一步,我们会去读取ConsumeQueue中的offset偏移量,此时大量的读取压力全部都在ConsumeQueue,ConsumeQueue文件的读性能是很大程度上会影响消息拉取的性能和吞吐量。
所以,Broker对ConsumeQueue文件也是基于os cache来进行优化的。
实际上,ConsumeQueue主要只是存放消息的offset,所以每个文件很小,占不了多少磁盘空间,完全可以被os缓存在内存里。所以几乎可以说消息的读取性能达到内存级别。
第二步,根据读取到的offset去CommitLog里读取消息的完整数据。此时会有两种可能
这两种状态很好区分,比如说消费者一直在快速的拉取和消费处理,跟上了broker的消息写入速率,这么来说os cache中每次CommitLog的消息还没来得及被刷入磁盘中的时候就被消费者消费了;但是比如说broker负载很高,拉取消息的性能很低,跟不上生产者的速率,那么数据会保存在磁盘中进行读取。
根据以上,我们可以判断了什么时候Master Broker负载会高,也就是当消费者读取消息的时候,要从磁盘中加载大量的数据出来,此时Master Broker就会知道本次的负载会比较高,通知消费者下次从Slave Broker去拉取数据。
本质上就是对比当前没有拉取消息的数量和大小,以及最多可以存放在os cache内存里的消****息的大小,如果没有拉取的消息超过了最大能使用的内存的量,那么之后会频繁的从磁盘加载数据,此时就让你从slave broker去加载数据了!
举一个简单的例子作为分析的入口,将从各个环节可能发生的问题进行深入分析,如图:
消息发送失败的原因多种多样,存在于多个环节,我们一一分析。
第一个环节就是,订单系统推送消息到MQ的过程中,由于网络等因素导致消息丢失。
为了解决系统推送消息丢失问题,RocketMQ有一个非常强悍的功能就是事务消息,能够确保我们消息一定会成功写入MQ里面,不会半路搞丢。
如图是在本系统中的一个基本事务消息的流程图。
如果此阶段,half消息发送给MQ失败,会执行一系列回滚操作,关闭这个订单的状态,因为后续的消息都操作不了
如图解释如下:
上面这么复杂的事务消息机制可能导致整体的性能比较差,而且吞吐量会比较低,我们一定要用事务消息吗?
可以基于同步发送消息+反复多次重试的方案
我们可以分析的到,事务消息能够保证我们的消息从生产者成功发送到broker中对应的消费者需要消费的Topic中,我们认为他的消息推送成功。
问题一:
但是这个消息推送仅仅先是推送到os cache缓存中,仅仅只是可以被消费系统看到,由于消息积压等原因,还没来得及去获取这条消息,还没来得及刷到ConsumeQueue的磁盘文件中去,此时万一机器突然宕机,os cache中的数据全部丢失,此时消息必然丢失,消费系统无法读到这条消息。
如图示意:
解决:
为了解决这个问题,一定要确保消息零丢失的话,我们的解决办法就是将异步刷盘调整为同步刷盘。
放弃了异步刷盘的高吞吐量,确保消息数据的零丢失,也就是说只要MQ返回响应half消息发送成功了,此时消息就已经进入了磁盘文件了。
问题二:
就算os cache的消息写入ConsumeQueue的磁盘文件了,红包没来得及消费这条消息的时候,磁盘突然就坏了,一样会导致消息丢失。
所以说,无论是通过同步发送消息+反复多次重试的方案,还是事务消息的方案,哪怕保证写入MQ成功了,消息未必不会丢失。
解决:
对Broker使用主从架构的模式,每一个MasterBroker至少有一个SlaveBroker去同步他的数据,而且一条消息写入成功,必须让SlaveBroker也写入成功,保证数据有多个副本的冗余。
不一定。
问题分析:
因为当红包系统拿到消息数据进内存里时,此时还没有执行发红包的逻辑,然后此时红包系统就已经提交了这条消息的offset到broker中告诉broker已经消费掉了这条消息,消息位置会往后移。然后此时红包系统宕机,这条消息就会丢失,永远执行不了发红包的逻辑。
RocketMQ解决方案: 利用消息监听器同步处理消息
在RocketMQ的Consumer的默认消费模式下,我们在消息监听器中接收到一批消息之后,会执行处理消息的逻辑,处理完成之后才会返回SUCCESS状态提交offset到broker中,如果处理时宕机,不会返回SUCCESS状态给broker,broker会继续将这个消息给下一个Consumer消费。
首先,消息零丢失方案会必然的导致从头到尾的性能下降和MQ的吞吐量下降
一般和金钱、交易以及核心数据相关的系统和核心链路,可以用这套零消息丢失方案:比如支付系统、订单系统等。
重复发红包等问题
如图:
有类似很多这种消息重试,接口重试的情况都会有消息重复发送的可能性,还比如说当你发送消息成功到MQ,MQ返回的SUCCESS的响应由于网络原因未收到,重试机制会再次发送消息,导致消息重复。
解决方案:幂等性机制
Redis缓存思想也比较简单,只需要根据对应的订单信息去缓存里面查询一下是否已经发送给MQ了。
但是这种解决方案也不是绝对的安全,因为你消息发送成功给MQ了还没来得及写Redis系统就挂了,之后也会被重复调用。
总结以上两种解决方案,我们不建议在消息的发送环节保证消息的不重复发送,会影响接口性能。
解决方案:
通过以上的学习,我已经基本解决了MQ消息不丢失以及不会重复处理消息的问题,在正常流程下基本上没有什么问题。但是会出现以下问题。
我们一直都是假设的一个场景就是红包系统的MessageListener监听回调函数接收到消息都能顺利的处理好消息逻辑,发送红包,落库等操作,返回SUCCESS,提交offset到broker中去,然后从broker中获取下一批消息来处理。
如图:
问题:
但是如果在我们MessageListener处理消息逻辑时候,红包数据库宕机了,没办法完成发红包的逻辑,此时出现对消息处理的异常,我们应该怎么处理呢?
解决方案:
在MessageListener中,除了返回SUCCESS状态,我们还可以返回RECONSUME_LATER状态,也就是用try-cache包裹住我们的业务代码,成功则返回SUCCESS状态,顺利进行后续操作,如果出现异常则返回RECONSUME_LATER状态。
当RocketMQ收到我们返回的RECONSUME_LATER状态之后,会将这批消息放到对应消费组的重试队列中。
重试队列里面的消息会再次发给消费组,默认最多重试16次,如果重试16次失败则进入死信队列。
死信队列:
对于死信队列,一般我们可以专门开一个后台线程,订阅这个死信队列,对死信队列中的消息,一直不停的尝试。
大数据团队要获取到订单系统的binlog,然后保存一份在自己的大数据存储系统中
数据库binlog:记录数据库的增删改查操作。
大数据团队不能直接跑复杂的大SQL在订单系统的数据库中跑出来一些数据报表,这样会严重影响订单系统的性能,为了优化方案,我们采用类似基于Canal这样的中间件去监听订单数据的binlog,然后把这个binlog发到MQ中去,然后我们的大数据系统自己用MQ里获取binlog,自己在自己的大数据存储中执行增删改查操作,得到我们需要的报表,如图下:
出现上面问题的原因,根本问题就是一个订单binlog分别进入了两个MessageQueue中,解决这个问题的方法其实非常简单,就是得想办法让同一个订单的binlog进入到一个MessageQueue里面去。
方法很简单:我们可以根据订单id对MessageQueue的数量取模来对应每个订单究竟去哪个MessageQueue。
消息乱序解决方案不能和重试队列混用。
大量订单点击提交未支付,超过30min需要自动退款,我们研究需要定时退款扫描问题。
如图:
当一个订单下单之后,没有支付会进入订单数据库保存,如果30分钟内没有支付,就必须订单系统自动关闭这个订单。
可能我们就需要有一个后台的线程,不停的去扫描订单数据库里所有的未支付状态的订单,超过30分钟了就必须把订单状态更新为关闭。这里会有一个问题,订单系统的后台线程必须不停的扫描各种未支付的订单,可能每个未支付的订单在30分钟之内会被扫描很多遍。这个扫描订单的服务是一个麻烦的问题。
针对这种场景,RocketMQ的延迟消息就登场了。
如图:
所以RocketMQ的延迟消息,是非常常用并且非常有用的一个功能
在一些真正的生产项目中,我们需要合理的规划Topic和里面的tags,一个Topic代表了某一类的业务消息类型数据,我们可以通过里面的tags来对同一个topic的一些消息进行过滤。
我们在消息零丢失方案中,万一消息真的丢失了,我们怎么去排查呢?在RocketMQ中我们可以给消息设置对应的Key值,比如设置一个订单ID:message.setKeys(orderId),这样这个消息就和一个key绑定起来,当这个消息发送到broker中去,会根据对应message的数量构建hash索引,存放在IndexFile索引文件中,我们可以通过MQ提供的命令去查询。
在我们这种大型的金融级的系统,或者跟钱有关的支付系统等等,需要有超高级别的高可用保障机制,所以对于零消息丢失解决方案来说,万一一整个MQ集群彻底崩溃了,我们需要有更完善的措施来保证我们消息不会丢失。
此时生产者发送不了消息到MQ,所以我们生产者就应该把消息在本地进行持久化,可以是存在本地磁盘,也可以是在数据库里去存起来,MQ恢复之后,再把持久化的消息投递到MQ中去。
最简单的方法去提高消费者的吞吐量,就是提高消费者的并行度,比如说部署更多的Consumer机器去消费消息。但是我们需要明确的一点就是对应的MessageQueue也要增加,因为一个MessageQueue只能被一个Consumer机器消费。
第二个办法是我们可以增加Consumer的线程数量,消费线程乐队,消费速度越快。
第三个办法是我们可以开启消费者的批量消费功能(有对应的参数设置)。
Consumer是支持设置在哪里开始消费的,常见的有两种:从Topic的第一条数据消费(CONSUME_FROM_LAST_OFFSET),或者是从最后一次消费过的消息之后开始消费(CONSUME_FROM_FIRTST_OFFSET),我们一般都是设置选择后者。
如图所示:在一个系统中,由生产者系统和消费者系统两个环节组成,生产者不断的向MQ里面写入消息,消费者不断的从MQ中消费消息。突然有一天消费者依赖的一些数据库挂了,消费者就处理不了当下的业务逻辑,消息也不能正常的被消费,此时生产者还在正常的向MQ中写入消息,结果在高峰期内,就往MQ中写入了百万条消息,都积压在了MQ里面了。
第一, 最简单粗暴的方法,如果我们的消息能够容忍百万消息的丢失,那么我们可以直接修改消费者系统的代码,丢弃所有的消息,那么百万消息很快就被处理完了,但是往往对于绝大多数系统而言,我们不能使用这种办法。
第二, 我们需要等待消费者依赖的数据库恢复之后,根据线上的Topic的MEssageQueue来判断后续如何处理。
MessageQueue数量多:
如图:
MessageQueue数量少:
如图:
要做MQ的集群迁移,不是简单的粗暴的把Producer更新停机,新的代码重新上线发到新的MQ中去。
一般来说,我们需要做到两件事情:
好了各位,以上就是这篇文章的全部内容了,我后面会每周都更新几篇高质量的大厂面试和常用技术栈相关的文章。感谢大伙能看到这里,如果这个文章写得还不错, 求三连!!! 感谢各位的支持和认可,我们下篇文章见!
我是 九灵
,有需要交流的童鞋可以关注公众号:Java 补习课
! 如果本篇博客有任何错误,请批评指教,不胜感激 !