前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ 是如何发送消息

RocketMQ 是如何发送消息

作者头像
云扬四海
发布于 2022-11-30 07:36:54
发布于 2022-11-30 07:36:54
1.1K00
代码可运行
举报
文章被收录于专栏:云扬四海云扬四海
运行总次数:0
代码可运行

创建Topic的时候为何要指定MessageQueue数量?

  • 简单来说,就是你要指定你的这个Topic对应了多少个队列,也就是多少个MessageQueue。
  • MessageQueue就是RocketMQ中非常关键的一个数据分片机制,他通过MessageQueue将一个Topic的数据拆分为了很多个数据分片,然后在每个Broker机器上都存储一些MessageQueue。
  • Topic是一个逻辑上的概念,实际上在每个broker上以queue的形式保存,也就是说每个topic在broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。

生产者发送消息的时候写入哪个MessageQueue?

  • 生产者会跟NameServer进行通信获取Topic的路由数据, 以生产者从NameServer中就会知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上
  • 让一个Topic中的数据分散在多个MessageQueue中,进而分散在多个Broker机器上,实现RocketMQ集群分布式存储海量的消息数据了

如果某个Broker出现故障该怎么办?

  • 如果某个Broker临时出现故障了,比如Master Broker挂了,此时正在等待的其他Slave Broker自动热切换为Master Broker,那么这个时候对这一组Broker就没有Master Broker可以写入了
  • 如果你还是按照之前的策略来均匀把数据写入各个Broker上的MessageQueue,那么会导致你在一段时间内,每次访问到这个挂掉的Master Broker都会访问失败,这个似乎不是我们想要的样子。
  • 对于这个问题,通常来说建议大家在Producer中开启一个开关,就是sendLatencyFaultEnable
  • 一旦打开了这个开关,那么他会有一个自动容错机制,比如如果某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了

RocketMQ 是如何持久化消息的

1、为什么Broker数据存储是最重要的一个环节

  • roker数据存储实际上才是一个MQ最核心的环节,他决定了生产者消息写入的吞吐量,决定了消息不能丢失,决定了消费者获取消息的吞吐量,这些都是由他决定的

2、CommitLog消息顺序写入机制

  • 当生产者的消息发送到一个Broker上的时候,他接收到了一条消息,接着他会对这个消息做什么事情?首先第一步,他会把这个消息直接写入磁盘上的一个日志文件,叫做CommitLog,直接顺序写入这个文件
  • 这个CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接追加写入这个文件的末尾,就跟上面的图里一样。如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。

RocketMq是如何写入数据的

设定一个topic -> 根据设定的MessageQueue个数 -> 分不在不同的master Broker里边 -> 每个MessageQueue是由多个 CommitLog组成 -> Commit是采用顺序读写。加上OS PageCache来保证写入性能 -> 首先。OS PageCache是基于内存的缓冲池。采用异步刷盘或者同步刷盘顺序写入磁盘 (异步刷盘宕机是会有可能导致数据丢失

  • DLedger 相当于替换了 CommitLog
  • DLedger CommitLog 来构建出机器上的MessageQueue
  • Broker机器刚刚启动的时候,就是靠这个DLedger基于Raft协议实现的leader选举机制,互相投票选举出来一个Leader,其他人就是Follower,然后只有Leader可以接收数据写入,Follower只能接收Leader同步过来的数据
  • DLedger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DLedgerServer组件把这个uncommitted数据发送给Follower Broker的DLedgerServer。
  • 接着Follower Broker的DLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DLedgerServer,然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态。
  • 也就是说。当leaderBroker收到消息之后会同步给 FollowerBroker 节点。当节点响应ack之后主节点才会返回给生产者ack

源码索引

  • 消息发送
  • producer.send(msg);
  • -> defaultMQProducerImpl.sendDefaultImpl
    • -> this.tryToFindTopicPublishInfo 从 NameService 获取 Topic路由信息(本地有缓存就从缓存中获取)
    • -> this.selectOneMessageQueue 选择一个消息队列 queue
    • -> this.sendKernelImpl 调用发送核心方法
    • -> mQClientFactory.getMQClientAPIImpl().sendMessage 进行发送
  • -> MQClientAPIImpl#sendMessageSync
  • -> remotingClient.invokeSync 调用netty方法发送 RequestCode.SEND_MESSAGE 消息
  • broker接受到消息的处理
  • -> NettyServerHandler#channelRead0
  • -> NettyRemotingAbstract#processMessageReceived
  • -> NettyRemotingAbstract#processRequestCommand 处理客户端的请求消息
  • -> processor.asyncProcessRequest 客户端发送的是异步消息,不需要同步返回成功
  • -> SendMessageProcessor#asyncProcessRequest 进入消息处理
  • -> AbstractSendMessageProcessor#parseRequestHeader 解析请求
  • -> SendMessageProcessor#asyncSendMessage 异步保存发送的消息
  • -> this.brokerController.getMessageStore().asyncPutMessage(msgInner) MessageStore存储消息
  • -> MessageStore#asyncPutMessage 异步保存发送的消息
  • -> MessageStore#putMessage 保存发送的消息
  • -> DefaultMessageStore#asyncPutMessages DefaultMessageStore保存消息默认实现
  • -> this.commitLog.asyncPutMessages(messageExtBatch) 保存发送的消息
  • -> CommitLog#asyncPutMessages 保存发送的消息
  • -> mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext) mappedFile
  • -> CommitLog#submitFlushRequest 提交刷盘 (异步 / 同步)
  • -> CommitLog#submitReplicaRequest 将消息同步到从节点。可配置备份多少个
  • -> 消息保存完毕

rocketMq 同步消息的原理

  • netty调用方会触发的动作
  • RemotingClient#invokeSync
  • RemotingClient#invokeSyncImpl 发送同步方法的实现
  • this.responseTable.put(opaque, responseFuture) 这个很关键,将opaque 存到响应的 responseTable里边
  • 然后下方 responseFuture.waitResponse(timeoutMillis) 会阻塞当前请求
  • netty被调用方会触发的动作
  • NettyRemotingAbstract 然后我们看此处的方法。
  • RemotingResponseCallback callback = new RemotingResponseCallback() 构建了一个远程
  • 如果请求是同步请求的话,一定会触发 callback.callback(response);
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final RemotingResponseCallback callback = new RemotingResponseCallback() {
    @Override
    public void callback(RemotingCommand response) {
        doAfterRpcHooks(remoteAddr, cmd, response);
        if (!cmd.isOnewayRPC()) {
            if (response != null) {
                response.setOpaque(opaque);
                response.markResponseType();
                try {
                    ctx.writeAndFlush(response);
                } catch (Throwable e) {
                    log.error("process request over, but response failed", e);
                    log.error(cmd.toString());
                    log.error(response.toString());
                }
            } else {
            }
        }
    }
};
  • 请观察一下这个的实现。 response.setOpaque(opaque); 想当于将请求的 opaque 塞入到了response里边。
  • 然后将 ctx.writeAndFlush(response); 到调用方
  • 然后回到调用方
  • NettyRemotingAbstract#processMessageReceived 检查到是 RESPONSE_COMMAND 响应的请求
  • responseFuture.putResponse 会设置 responseCommand 并且 countDownLatch.countDown 释放之前阻塞的请求
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-11-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
分布式消息队列 RocketMQ 源码分析 —— RPC 通信(一)
摘要: 原创出处 https://mp.weixin.qq.com/s/V_nOevq_2cbrH2_zgOSP-w 「胡宗棠」欢迎转载,保留摘要,谢谢!
芋道源码
2019/10/29
7870
分布式消息队列 RocketMQ 源码分析 —— RPC 通信(一)
消息中间件—RocketMQ的RPC通信(一)
文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键
用户2991389
2018/09/05
1.6K0
消息中间件—RocketMQ的RPC通信(一)
RocketMQ原理—1.RocketMQ整体运行原理
这个介绍顺序就基本涵盖了RocketMQ的整体运行流程,接下来首先分析RocketMQ生产者的工作原理。
东阳马生架构
2025/04/01
1450
面试必备(背)--RocketMQ八股文系列
担任路由消息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表分别进行发送消息和消费消息。nameServer由多个无状态的节点构成,节点之间无任何信息同步。 broker会定期向NameServer以发送心跳包的方式,轮询向所有NameServer注册以下元数据信息:
微客鸟窝
2022/11/07
1.1K0
面试必备(背)--RocketMQ八股文系列
RocketMQ实践问题
这个是在面试时,关于MQ,面试官最喜欢问的问题。这个问题是所有MQ都需要面对的一个共性问题。大致的解决思路都是一致的,但是针对不同的MQ产品又有不同的解决方案。分析这个问题要从以下几个角度入手:
Java廖志伟
2022/03/07
5210
RocketMQ实践问题
RocketMQ 生产者 Producer 发送消息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
java404
2018/12/25
2.2K0
面试官:生产环境中使用RocketMQ常见问题
然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。
程序员子龙
2023/11/26
1.4K0
rabbitmq整个消息投递的路径
rabbitmq整个消息投递的路径是producer—>rabbitmq broker—>exchange—>queue—>consumer。 生产者将消息投递到Broker时产生confirm状态,会出现二种情况,ack:表示已经被Broker签收。nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常等。生产者将消息投递到Broker,被Broker签收,但是没有对应的队列进行投递,将消息回退给生产者会产生return状态。这二种状态是rabbitmq提供的消息可靠投递机制,生产者开启确认模式和退回模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失。消费者如果在消费端没有出现异常,则调用channel.basicAck方法确认签收消息。消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。通过一系列的操作,可以保证消息的可靠投递以及防止消息丢失的情况。
用户11220026
2024/07/26
1630
Dledger是如何实现主从自动切换的
hello小伙伴们,今天王子又来继续和大家聊RocketMQ了,之前的文章我们一直说Broker的主从切换是可以基于Dledger实现自动切换的,那么小伙伴们是不是很好奇它究竟是如何实现的呢?今天我们就来聊一聊这个话题。
HUC思梦
2020/10/10
1.5K0
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。
用户2031163
2023/10/20
4030
RocketMQ高级原理
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
Java廖志伟
2022/03/07
4880
RocketMQ高级原理
万字聊一聊RocketMQ一条消息短暂而又精彩的一生
我们都知道,消息是由业务系统在运行过程产生的,当我们的业务系统产生了消息,我们就可以调用RocketMQ提供的API向RocketMQ发送消息,就像下面这样
三友的java日记
2024/06/07
1650
万字聊一聊RocketMQ一条消息短暂而又精彩的一生
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
上篇文章主要介绍消息中间件并以RocketMQ架构展开描述其核心组件以及MQ运行流程
菜菜的后端私房菜
2024/09/13
3350
【MQ我可以讲一个小时】
引入消息中间件也会带来很多问题,先说说消息丢失,生产者往消息队列发送消息,消息队列往消费者发送消息,会有丢消息的可能,消息队列也有可能丢消息,通常MQ存盘时都会先写入操作系统的缓存页中,然后再由操作系统异步的将消息写入硬盘,这个中间有个时间差,就可能会造成消息丢失,如果服务挂了,缓存中还没有来得及写入硬盘的消息就会发生消息丢失。不同的消息中间件对于消息丢失也有不同的解决方案,先说说最容易丢失消息的kafka吧。生产者发消息给Kafka Broker:消息写入Leader后,Follower是主动与Leader进行同步,然后发ack告诉生产者收到消息了,这个过程kafka提供了一个参数,request.required.acks属性来确认消息的生产,0表示不进行消息接收是否成功的确认,发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了。1表示当Leader接收成功时确认,只要Leader存活就可以保证不丢失,保证了吞吐量,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了。-1或者all表示Leader和Follower都接收成功时确认,可以最大限度保证消息不丢失,但是吞吐量低,降低了kafka的性能。一般在不涉及金额的情况下,均衡考虑可以使用1,保证消息的发送和性能的一个平衡。Kafka Broker 消息同步和持久化:Kafka通过多分区多副本机制,可以最大限度保证数据不会丢失,如果数据已经写入系统缓存中,但是还没来得及刷入磁盘,这个时候机器宕机,或者没电了,那就丢消息了,当然这种情况很极端。Kafka Broker 将消息传递给消费者:如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时消费者直接宕机了,未处理完的数据丢失了,下次也消费不到了。所以为了避免这种情况,需要将配置改为,先消费处理数据,然后手动提交,这样消息处理失败,也不会提交成功,没有丢消息。
Java廖志伟
2022/03/07
4900
【MQ我可以讲一个小时】
Kafka和RocketMQ实现原理对比
既然有了Kafka为什么还会出现RocketMQ?这就不得不提到RocketMQ的诞生动机了,在RocketMQ的官网上面可以找到这个问题答案,原文可以点击此处阅读。实际原因当然是kafka存在一些问
jaydenwen123
2021/12/27
1.6K0
Kafka和RocketMQ实现原理对比
RocketMQ(三):面对高并发请求,如何高效持久化消息?
上篇文章我们分析完RocketMQ发送消息的原理,得到结果客户端会通过RPC组件向Broker进行通信
菜菜的后端私房菜
2024/09/18
1.2K0
RocketMQ原理—5.高可用+高并发+高性能架构
Push同步模式:Producer往Broker主节点写入数据后,Broker主节点会主动把数据推送Push到Broker从节点里。
东阳马生架构
2025/04/06
2270
RocketMQ DLedger初识
众所周知,作为一个出色的分布式消息中间件,RocketMQ 在全球范围内获得了广泛的应用,那么作为一个分布式消息中间件,最重要的是什么?
政采云前端团队
2023/09/01
5850
RocketMQ DLedger初识
RocketMQ原理—3.源码设计简单分析三
NameServer启动后,会有一个NamesrvController组件管理控制NameServer的所有行为,包括内部会启动一个Netty服务器去监听一个9876端口号,然后接收处理Broker和客户端发送过来的请求。
东阳马生架构
2025/04/03
590
RocketMQ原理—2.源码设计简单分析二
现已知Broker作为一个JVM进程启动后,会由BrokerStartup这个启动组件先初始化4个配置组件,然后再通过这4个配置组件创建BrokerController这个管理控制组件,在BrokerController管控组件中会包含大量的核心功能组件和后台线程池。如下图示:
东阳马生架构
2025/04/02
630
推荐阅读
相关推荐
分布式消息队列 RocketMQ 源码分析 —— RPC 通信(一)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档