Broker 几个关键点: 负载均衡 可用性 1.负载均衡 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。...如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。...topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。...2.可用性 由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。...所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决
Broker 启动的主函数入口: org.apache.rocketmq.broker.BrokerStartup: public static void main(String[] args) {...创建了以下配置类: •nettyServerConfig:封装了作为消息队列服务器的配置信息•nettyClientConfig:封装了作为NameServer客户端配置信息•brokerConfig:封装了 Broker...配置信息•messageStoreConfig:封装了 RocketMQ 存储系统的配置信息 1.Broker 初始化 2.1 配置文件加载 •主题配置加载: result = result && this.consumerOffsetManager.load...6)启动HA主从同步线程 •启动各类定时任务 3.2 启动netty服务: remotingServer启动:启动远程通讯服务 fastRemotingServer启动:启动远程通讯服务 broker...信息 3.7 开启定时向NameServer注册broker信息任务 ?
broker是RocketMQ的核心模块,这篇文章我们先从整体看下代码结构、RocketMQ的领域模型,然后再看下一个broker节点的启动过程以及Controller暴露的接口。 代码结构 ?...broker的代码结构 领域模型 MQ领域语言描述RocketMQ做的事情,producer构建Message,发送给broker的指定topic,broker负责将消息投递到指定topic下的队列,并记录消息队列的...和consumer来说,broker是服务端 NettyClientConfig,对于name server来说,broker是客户端 MessageStoreConfig,消息存储的配置,RocketMQ...RocketMQ中的模板方法设计模式 插件设计模式 插件设计模式和工厂设计模式一起使用,需要包含一个插件上下文、一个抽象插件类(AbstractPluginMessageStore),主要模块入下图所示...参考资料 Apache RocketMQ背后的设计思路与最佳实践 消息队列MQ
1.查看broker状态信息 bin/mqadmin brokerStatus -b 192.168.1.x:10911 -n 192.168.1.x:9876 Java HotSpot(TM) 64-...192.168.1.x deleteCommitLogFilesInterval = 100 adminBrokerThreadPoolNums = 16 storePathCommitLog = /data/rocketmq...192.168.1.x:9876 clientAsyncSemaphoreValue = 65535 maxMsgsNumBatch = 64 storePathConsumeQueue = /data/rocketmq...waitTimeMillsInSendQueue = 200 commercialTransCount = 1 osPageCacheBusyTimeOutMills = 1000 abortFile = /data/rocketmq.../store/index rocketmqHome = /home/baseuser/rocketmq useReentrantLockWhenPutMessage = false haHousekeepingInterval
服务器上部署的RocketMq进程一般称之为Broker,Broker会接收Producer的消息,持久化到本地,然后push给Consumer,通常使用集群部署,主从之间会有数据同步。...信息放入请求头中 Broker是怎么接收生产者的消息并存储的 CommitLog Broker 接收消息到存储 commitLog 的代码入口是: org.apache.rocketmq.remoting.netty.NettyRemotingAbstract...#processRequestCommand -> org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest...其实 Broker 会启动一个后台线程,扫描磁盘文件,超过72小时的就会被删除,也就是说RocketMq默认只会保存3天的数据。...RocketMQ 在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。
引言 继 [【RocketMq】NameServ启动脚本分析(Ver4.9.4)] 之后又来看看Broker的脚本。总体上来看大差不差,以阅读核心的配置部分调优为主。 mqbroker #!..." fi export ROCKETMQ_HOME sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup...我们关注最后一个脚本,这里调用了runbroker.sh的脚本: sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup...,这里只好罗列一些这些参数的作用了: 注意Broker对于小于JDK8的版本和小于JDK9的版本做了两种策略,这里的脚本其实是有点奇怪的,因为RocketMq最低不是只支持JDK8么?...NameServ的启动脚本和Broker的类似,看懂任意一个就可以看懂另一个。
本文将在 RocketMQ 消息发送system busy、broker busy原因分析与解决方案 的基础上,结合生产上的日志尝试再次理解 broker busy 以及探讨解决方案。...PC_SYNCHRONIZED]broker busy [PCBUSY_CLEAN_QUEUE]broker busy [TIMEOUT_CLEAN_QUEUE]broker busy 上述前面4个关键字在上篇文章中已详细介绍...针对前4种 broker busy 出现的问题已经在上篇文章中详细介绍,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker...如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ...同时在做 Broker 服务器巡检的时候,可以通过去通过如下命令去查看 broker 一次消息追加是否会超过 500 ms。 ?
目录 一、问题思考 二、Broker处理消费流程 1.Broker消费处理流程概览 2.查找消息流程 3.消息查询结果处理流程 三、消费进度流转 1.客户端上报消费进度 2.Broker端处理消费进度...3.消费进度流转示意图 一、问题思考 1.Broker是如何处理消费流程的?...说明:本文分析均为PUSH消费模式 二、Broker处理消费流程 本部分将消费的切分成三块梳理:Broker消费处理流程概览、查找消息流程、以及消息查询结果处理流程。...1.Broker消费处理流程概览 ?...小结:在拉取消息时会进行Broker和主题读权限的判断,实战中若有必要可以封锁Broker的拉取权限从而禁止从该broker进行消费;或者封锁某主题的读权限禁止消费组从该主题消费消息。
启动闪退 解决办法 进入用户目录下面store文件(我的目录地址:C:\Users\Administrator\store),删除文件里面所有文件,在RocketMQ的bin目录下,执行如下命令后,MQ...mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true PS:autoCreateTopicEnable=true 表示可以动态创建topic 启动报错 RocketMQ...启动broker提示 错误:找不到或无法加载主类。...解决办法 编辑 runbroker.cmd 文件,新增一行 set “JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%” 新增后文件如下: 保存后可以正常启动RocketMQ
序 本文主要研究一下rocketmq broker的CONSUMER_SEND_MSG_BACK CONSUMER_SEND_MSG_BACK rocketmq/common/src/main/java.../org/apache/rocketmq/common/protocol/RequestCode.java public class RequestCode { //.........RequestCode定义了CONSUMER_SEND_MSG_BACK常量,值为36 processRequest rocketmq/broker/src/main/java/org/apache/rocketmq.../broker/processor/SendMessageProcessor.java public class SendMessageProcessor extends AbstractSendMessageProcessor...))) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker
一、问题描述 在2020-03-16 18:00左右收到告警,业务出现发送RocketMQ失败,在约1分钟左右后自动恢复。RocketMQ运行向来稳定,为何也抖动了?...二、Broker日志分析 1.查看GC日志 通过查看发时间问题时间附近GC日志并无发现异常。...日志 由日志可以看出:slave与问题节点broker同步信息异常。...,造成主从同步问题;并未发现Broker自身出问题了。...但是如果阿里云抖动为何只影响RocketMQ集群的3个节点呢?其他RocketMQ集群没有问题;业务机器也没有发现网络等问题。
序 本文主要研究一下rocketmq broker的CONSUMER_SEND_MSG_BACK th (36).jpeg CONSUMER_SEND_MSG_BACK rocketmq/common.../src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java public class RequestCode { //...RequestCode定义了CONSUMER_SEND_MSG_BACK常量,值为36 processRequest rocketmq/broker/src/main/java/org/apache/rocketmq.../broker/processor/SendMessageProcessor.java public class SendMessageProcessor extends AbstractSendMessageProcessor...))) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker
一、Broker处理消息的入口类SendMessageProcessor processRequest方法主要三件事情: 1.处理consumer发回broker的消息重试 2.处理批量发送 3.处理单条消息发送...可以在指定的时间开始服务通过startAcceptSendRequestTimeStamp设定 2.消息校验 broker没有写入权限并且为顺序topic则拒绝服务 检查Topic不能和系统保留Topic...[TBW102]冲突 若Topic未创建,Broker开启自动创建 queueId校验,不能大于队列最大值 3.判断是否超过消费次数(16次),决定是否写入死信队列 4.消息内容组织 设置Message...扩展字段 设置Message在客户端生成的时间 设置发送Message机器的地址 设置存储Message的Broker地址 设置消费重试消息的次数 5.消息存储(单独梳理) private RemotingCommand...().isTraceOn())); log.debug("receive SendMessage request command, {}", request); //Broker
master重启 [root@master01 rocketmq]# init 6 主机名 状态 broker-a master 发送时重启 broker-a slave 正常运行 broker-b...2.broker-a slave重启 [root@slave01 rocketmq]# init 6 主机名 状态 broker-a master 宕机 broker-a slave 发送时重启 broker-b...[root@master02 rocketmq]# init 0 发送5000条测试消息,发送前broker-b master关机,只保留broker-a master运行 ?...1.broker-a master关机 [root@master01 rocketmq]# init 0 在消息消费时将broker-a master关机 主机名 状态 broker-a master...2.broker-a slave关机 先发送1万条消息,然后消费,消费过程中broker-a slave关机 [root@slave01 rocketmq]# init 0 ?
今天来分享一个最近生产环境遇到的一个 RocketMQ 异常: 首先,我们回顾一下 RockemtMQ 的架构: Broker 的主从节点都会注册到 Name Server 集群,Name Server...RocketMQ client 会在本地维护一份 topic 和 Broker 地址的映射关系,放在 MQClientInstance#brokerAddrTable。...发送消息 RocketMQ client 在发送消息时,会根据 topic 首先从本地缓存(brokerAddrTable)获取 Broker,如果获取不到,就会到 Name Server 集群中获取...这些方法的使用都在源码【rocketmq-tools】这个模块中。...客户端代码的调用关系如下: 这个发生在事务消息的场景,RocketMQ client 向 Broker 拉取消息时,如果 Broker 返回 PULL_OFFSET_MOVED,client 就会通过异步线程
Broker作为代理,路由注册是通过Broker与nameServer的心跳功能实现的。除此之外,还联系了生产者和消费者、存储。因此可以知道Broker是非常重要的。...Broker的启动流程如下: ?
今天分享 RocketMQ 的 Broker 挂了,会带来什么影响。 面试官:你好,如果 RocketMQ 集群中的一个 Broker 挂了,会造成什么影响呢?...我:RocketMQ 有延迟隔离策略,如果发送某一个 Broker 失败了,会将其隔离,优先选择正常的 Broker 发送消息。需要注意的是,这个策略默认是不开启的。...如下图: Broker1 挂之前,Order1 的消息发送到了 Broker1,Broker1 挂之后,Order1 的消息被发送到了 Broker2。...在 Broker1 恢复前,消费者只能消费 Broker2 上拉取 Order1 的消息,Broker1 恢复后消费者线程再从 Broker1 拉取,因此 Order1 的消息产生乱序。...对于集群模式,消息偏移量保存在 Broker,路径如下: /${rocketmq.client.localOffsetStoreDir}/.rocketmq_offsets/${clientId}/${
问题 有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?...流程图 源码跟踪(broker启动流程里) 入口方法 DefaultMessageStore###load public boolean load() { boolean result
版本:4.7.0 原因:JAVAHOME环境变量路径中包含空格 解决方案:修改runbroker.cmd 改为 set "JAVA_OPT=%JAVA_OP...
2.1 RocketMQ 网络处理机制概述 RocketMQ的网络设计非常值得我们学习与借鉴,首先在客户端端将不同的请求定义不同的请求命令CODE,服务端会将客户端请求进行分类,每个命令或每类请求命令定义一个处理器...从RocketMQ的设计中来看,同一时间,只会对一个commitlog文件进行顺序写,写完一个后,继续创建一个新的commitlog文件。...但通常情况下,RocketMQ进程退出的可能性不大。...4.2 扩容Broker服务器 方案依据: 当Broker服务器自身比较忙的时候,快速失败,并且在接下来的一段时间内会规避该Broker,这样该Broker恢复提供了时间保证,Broker本身的架构是支持分布式水平扩容的...温馨提示:在Broker扩容时候,可以复制集群中任意一台Broker服务下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服务器指定目录,避免在新Broker
领取专属 10元无门槛券
手把手带您无忧上云