本文尝试从Apache RocketMQ的简介、主要组件及其作用、3种部署模式、Controller集群模式工作流程、最佳实践等方面对其进行详细分析。希望对您有所帮助!
Apache RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴集团开发并贡献给 Apache 软件基金会。它旨在提供高吞吐量、低延迟和高可靠性的消息传递和流处理服务。广泛应用于金融、互联网、物联网等领域,支持多种应用场景。
Apache RocketMQ 以其高性能、高可靠性和灵活性,成为众多企业实现分布式消息传递和流处理的首选解决方案。
Apache RocketMQ 主要组件及其作用如下:
作用:提供轻量级的路由服务,存储生产者和消费者与 Broker 之间的路由信息。
作用:负责存储和转发消息,是 RocketMQ 的核心组件。
作用:负责发送消息到 RocketMQ 集群。
作用:负责从 RocketMQ 集群接收和处理消息。
作用:管理主从 Broker 的自动故障转移和高可用性。
作用:提供可视化的运维管理工具。
作用:实现消息的持久化存储。
作用:包含 Producer 和 Consumer 的客户端 SDK。
最新版本的 Apache RocketMQ 通过这些组件的协同工作,提供了高性能、高可靠性和高可扩展性的分布式消息传递和流处理服务。各个组件分工明确,确保系统能够高效、稳定地运行,并满足各种复杂业务场景的需求。
Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。
下文分别介绍三种部署方式:
由于 Local 模式下 Proxy 和 Broker 是同进程部署,Proxy本身无状态,因此主要的集群配置仍然以 Broker 为基础进行即可。
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
警告
这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
启动 Broker+Proxy
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证Broker 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log
The broker[xxx, 192.169.1.2:10911] boot success...
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
启动Broker+Proxy集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &
...
备注
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
启动Broker+Proxy集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties --enable-proxy &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties --enable-proxy &
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
启动 Broker+Proxy 集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties --enable-proxy &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties --enable-proxy &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties --enable-proxy &
提示
以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。
提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见
在 Cluster 模式下,Broker 与 Proxy分别部署,我可以在 NameServer和 Broker都启动完成之后再部署 Proxy。
在 Cluster模式下,一个 Proxy集群和 Broker集群为一一对应的关系,可以在 Proxy的配置文件 rmq-proxy.json
中使用 rocketMQClusterName
进行配置
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
警告
这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 &
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
提示
以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。
提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见
可以在多台机器启动多个Proxy
### 在机器A,启动第一个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
### 在机器B,启动第二个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
### 在机器C,启动第三个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
若需要指定配置文件,可以使用 -pc
或者 --proxyConfigPath
进行指定
### 自定义配置文件
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 -pc /path/to/proxyConfig.json &
如何部署支持自动主从切换的 RocketMQ 集群? 其架构如上图所示,主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。
Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。
注意
Controller 若只部署单副本也能完成 Broker Failover,但若该单点 Controller 故障,会影响切换能力,但不会影响存量集群的正常收发。
Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开(可以选择性打开,并不强制要求每一台 NameServer 都打开),在该模式下,NameServer 本身能力仍然是无状态的,也就是内嵌模式下若 NameServer 挂掉多数派,只影响切换能力,不影响原来路由获取等功能。另一种是独立部署,需要单独部署 Controller 组件。
嵌入 NameServer 部署时只需要在 NameServer 的配置文件中设置 enableControllerInNamesrv=true,并填上 Controller 的配置即可。
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n2-127.0.0.1:9879
controllerDLegerSelfId = n0
controllerStorePath = /home/admin/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true
参数解释:
参数设置完成后,指定配置文件启动 Nameserver 即可。
$ nohup sh bin/mqnamesrv -c namesrv.conf &
独立部署执行以下脚本即可
$ nohup sh bin/mqcontroller -c controller.conf &
mqcontroller 脚本在源码包 distribution/bin/mqcontroller,配置参数与内嵌模式相同。
注意
独立部署Controller后,仍然需要单独部署NameServer提供路由发现能力
Broker 启动方法与之前相同,增加以下参数
controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
在Controller模式下,Broker配置必须设置 enableControllerMode=true,并填写 controllerAddr,并以下面命令启动:
$ nohup sh bin/mqbroker -c broker.conf &
注意
自动主备切换模式下Broker无需指定brokerId和brokerRole,其由Controller组件进行分配
该模式未对任何客户端层面 API 进行新增或修改,不存在客户端的兼容性问题。
Nameserver 本身能力未做任何修改,Nameserver 不存在兼容性问题。如开启 enableControllerInNamesrv 且 controller 参数配置正确,则开启 controller 功能。
Broker若设置 enableControllerMode=false,则仍然以之前方式运行。若设置 enableControllerMode=true,则需要部署 controller 且参数配置正确才能正常运行。
具体行为如下表所示:
旧版 Nameserver | 旧版 Nameserver+独立部署 Controller | 新版 Nameserver 开启 controller功能 | 新版 Nameserver 关闭 controller 功能 | |
---|---|---|---|---|
旧版 Broker | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 |
新版 Broker 开启 Controller 模式 | 无法正常上线 | 正常运行,可以切换 | 正常运行,可以切换 | 无法正常上线 |
新版 Broker 不开启 Controller 模式 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 |
从上述兼容性表述可以看出,NameServer 正常升级即可,无兼容性问题。在不想升级 Nameserver 情况,可以独立部署 Controller 组件来获得切换能力。
针对 Broker 升级,分为两种情况:
(1)Master-Slave 部署升级成 Controller 切换架构
可以带数据进行原地升级,对于每组 Broker,停机主、备 Broker,保证主、备的 CommitLog 对齐(可以在升级前禁写该组 Broker 一段时间,或则通过拷贝方式保证一致),升级包后重新启动即可。
注意
若主备 CommitLog 不对齐,需要保证主上线以后再上线备,否则可能会因为数据截断而丢失消息。
(2)原 DLedger 模式升级到 Controller 切换架构
由于原 DLedger 模式消息数据格式与 Master-Slave 下数据格式存在区别,不提供带数据原地升级的路径。在部署多组 Broker 的情况下,可以禁写某一组 Broker 一段时间(只要确认存量消息被全部消费即可,比如根据消息的保存时间来决定),然后清空 store 目录下除 config/topics.json、subscriptionGroup.json 下(保留 topic 和订阅关系的元数据)的其他文件后,进行空盘升级。
在 RocketMQ 的集群模式下,Controller 负责管理多个 Broker 的元数据和协调它们的操作。下面是集群模式下的工作流程和 Mermaind 图示例。
以下是表示 RocketMQ 集群模式下 Controller 工作流程的 Mermaind 图示例:
这张图展示了 Controller 和 Broker 之间的交互流程,Controller 负责管理元数据、协调 Broker 操作,并在 Broker 之间分配任务和同步数据。
flushDiskType=SYNC_FLUSH
来开启同步刷盘。sendBatchMessage
方法发送批量消息,提高发送效率。consumeThreadMin
和 consumeThreadMax
参数来调整消费线程数。sendAsync
方法发送消息,避免同步等待,提高发送性能。maxReconsumeTimes
参数设置最大重试次数。logLevel
参数设置日志级别。通过遵循以上详细的最佳实践,可以有效提升 RocketMQ 的稳定性、性能和安全性,确保消息系统能够高效、可靠地运行。结合具体的业务需求和系统环境,合理配置和优化 RocketMQ,可以实现最佳的使用效果。
在 Go 语言中使用 RocketMQ,你需要使用 RocketMQ 的 Go 客户端库来进行消息的生产和消费。下面是一个简单的实践示例,包括如何配置 RocketMQ Go 客户端、发送消息和消费消息的步骤。
首先,你需要安装 RocketMQ 的 Go 客户端库。可以通过 Go 的包管理工具 go get
来安装:
go get github.com/apache/rocketmq-client-go/v2
以下是一个简单的 Go 语言程序,用于向 RocketMQ 发送消息:
package main
import (
"fmt"
"log"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个 RocketMQ 生产者实例
p, err := producer.NewProducer(producer.WithNameServer([]string{"localhost:9876"}))
if err != nil {
log.Fatalf("create producer failed: %s", err.Error())
}
// 启动生产者
err = p.Start()
if err != nil {
log.Fatalf("start producer failed: %s", err.Error())
}
defer p.Shutdown()
// 创建消息
msg := &primitive.Message{
Topic: "TestTopic",
Body: []byte("Hello RocketMQ!"),
}
// 发送消息
res, err := p.SendSync(context.Background(), msg)
if err != nil {
log.Fatalf("send message failed: %s", err.Error())
}
fmt.Printf("Send message success: %s\n", res.String())
}
以下是一个简单的 Go 语言程序,用于从 RocketMQ 消费消息:
package main
import (
"context"
"fmt"
"log"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个 RocketMQ 消费者实例
c, err := consumer.NewPushConsumer(
consumer.WithNameServer([]string{"localhost:9876"}),
consumer.WithGroupName("TestConsumerGroup"),
)
if err != nil {
log.Fatalf("create consumer failed: %s", err.Error())
}
// 定义消息处理函数
c.RegisterMessageListener(consumer.MessageListenerFunc(func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
fmt.Printf("Received message: %s\n", string(msg.Body))
}
return consumer.ConsumeSuccess, nil
}))
// 订阅主题
err = c.Subscribe("TestTopic", consumer.MessageSelector{}, nil)
if err != nil {
log.Fatalf("subscribe topic failed: %s", err.Error())
}
// 启动消费者
err = c.Start()
if err != nil {
log.Fatalf("start consumer failed: %s", err.Error())
}
defer c.Shutdown()
// 阻塞主线程,保持消费者运行
select {}
}
确保你已经启动了 RocketMQ 服务,并且 localhost:9876
是你的 NameServer 地址。
编译并运行发送消息程序:
go run producer.go
编译并运行接收消息程序:
go run consumer.go
你应该能看到发送的消息被消费者接收到并输出。
以上就是如何在 Go 语言中实践 RocketMQ 的基本步骤。
RocketMQ 是一个高性能、高可用的分布式消息中间件,最初由阿里巴巴开源,并在社区中逐渐发展和演进。以下是 RocketMQ 的主要历史演进过程:
RocketMQ 从一个内部使用的消息中间件发展成为一个成熟的开源项目,经过不断的改进和演进,逐步成为 Apache 顶级项目,并支持现代的云原生和微服务架构。通过持续的开发和社区贡献,RocketMQ 不断适应新的技术需求和业务场景。
完。