生产者是怎么发消息的?
首先在理解生产者发消息之前,必须要明白一个概念:MessageQueue
是什么?
其实MessageQueue
是RocketMq的一种数据分片+物理存储机制。
我们一般在创建 Topic 的时候会指定 MessageQueue 的数量。
如上图,一个 Topic 中有4个 MessageQueue,每个 Brokers 上有2个 MessageQueue ,生产者通过算法(默认是均匀分配)来把消息写入不同的 MessageQueue 中。MessageQueue 的数据可以持久化在磁盘上。
这样就把消息分散到了多个 Broker 上,大大提升 Broker 的抗并发能力!
Producer 通过 NameSever 获取指定 Topic 的 Broker 路由信息,并在本地保存一份缓存数据,比如一个Topic有哪些 MessageQueue,MessageQueue 在哪几台 Broker 上,Broker 的ip.port等等。 Producer 发送消息只发到 Master Broker上,Slave 通过 主从同步获取数据。
那么 Produce 是怎么链接 NameSever 的呢?
Producer 作为发送消息的一方,有3中容错机制:
sendLatencyFaultEnable
可以开启,RocketMq内部会维护一个故障Broker的HashMap,把一定延迟级别的Broker放入这个map,下次选择Broker的时候,就会规避不可用的Broker。
生产者通过轮询某个 Topic 下的所有 MessageQueue 的方式来实现发送方的负载均衡,如下图:
通过这种方式,可以将一个 Topic 的消息分散到多个 MessageQueue 上,进而分散到多个 Broker 上。
如果与 Producer 连接的 NameSever 突然宕机,Producer 最长要30秒才能感知到,此时Producer 可以先从本地缓存读取 Topic 的路由信息。直到连接到下一个 NameSever,刷新本地缓存。
如果与 Producer 连接的 Broker 突然宕机,比如 Master Broker 挂了,此时其他 Slave Broker 会选举出一台 Master Broker,但在这个过程中,Producer 发送的消息都会失败。
对于这个问题,在Producer中有一个开关sendLatencyFaultEnable
,这个开启后会有一个容错机制,比如某次访问一个Broker有500ms延迟还无法访问,那么接下来就会回避访问
该Broker一段时间,比如3000ms内不再访问该Broker,避免消息打到故障的Broker上。
此外, Producer 本身可以捕获发送异常,进行重试。