作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)
中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:
Web服务器
代理服务器
ZooKeeper
Kafka
RabbitMQ(本章节)
上个小节我们介绍了RocketMQ和RabbitMQ和Kafka的基本差距,本小节我们来做一个简单的部署和操作。
#官方地址
https://rocketmq.apache.org/download/
#具体以现场选择版本为准
wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
也可以手工安装Oracle JDK,目前测试OpenJDK,是可以启动的,但是查看集群节点信息提示某个加密方式有问题,所以建议还是Oracle JDK。
yum -y install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-develo
#省略上传,该文件需要注册oracle账号才能下载
tar xvf jdk1.8.tar.gz -C /usr/local/
echo 'export JAVA_HOME=/usr/local/jdk1.8
export PATH=$JAVA_HOME/bin:$PATH' | sudo tee -a /etc/profile
source /etc/profile
#如果内存低于8g将无法启动,8G都有可能无法启动
#我这里用的是12G
unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-all-5.1.4-bin-release/bin
sh play.sh
ps -ef |grep -v grep |grep "rocketmq\|namesrv\|mqbroker"
正常情况下应该有6个进程,层级如下。
├─ sh mqnamesrv (PID 1132)
│ └─ sh runserver.sh (PID 1140)
│ └─ java NamesrvStartup (PID 1197)
│
└─ sh mqbroker (PID 1134)
└─ sh runbroker.sh (PID 1142)
└─ java BrokerStartup (PID 1196)
[root@localhost bin]# jps
1397 Jps
1196 BrokerStartup
1197 NamesrvStartup
[root@localhost bin]#
但是使用这个方法启动的Broker节点无法注册到Namesrv里面,就会导致RocketMQ是无法正常使用的,需要稍微修改下启动脚本。
注:该问题只是在该命令执行无法获取正确的IP或者主机名才需要调整。
vi ./bin/play.sh
#修改前
ADDR=`hostname -i`:9876
#修改后
ADDR=`hostname`:9876
也可以手工启动Broker节点和Namesrv节点。
### 启动namesrv
nohup sh bin/mqnamesrv &
### 启动broker
nohup sh bin/mqbroker -n localhost:9876 &
5.检查RocketMQ的服务
下面的输出就代表,我们从namesvc上已经查询到Broker节点,并且可以正常使用。
[root@localhost bin]# sh mqadmin clusterList -n localhost:9876
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster localhost.localdomain 0 192.168.31.154:10911 V5_1_4 0.00(0,0ms) 0.00(0,0ms) 0-0(0.0w, 0.0, 0.0) 0 486493.52 0.1300 true
RocketMQ基本操作
1.创建topic
[root@rocketmq bin]# sh mqadmin updateTopic -n 127.0.0.1:9876 -t YourTopicName -c DefaultCluster
create topic to 192.168.31.154:10911 success.
TopicConfig [topicName=YourTopicName, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
2.查看路由信息
[root@localhost bin]# sh mqadmin topicRoute -n 127.0.0.1:9876 -t YourTopicName
{
"brokerDatas":[
{
"brokerAddrs":{0:"192.168.31.154:10911"
},
"brokerName":"localhost.localdomain",
"cluster":"DefaultCluster",
"enableActingMaster":false
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"localhost.localdomain",
"perm":6,
"readQueueNums":8,
"topicSysFlag":0,
"writeQueueNums":8
}
]
}
[root@localhost bin]#
3.查看数据状态
有点类似kafka的分区。
[root@rocketmq bin]# sh mqadmin topicStatus -n 127.0.0.1:9876 -t YourTopicName
#Broker Name #QID #Min Offset #Max Offset #Last Updated
rocketmq 0 0 0
rocketmq 1 0 0
rocketmq 2 0 0
rocketmq 3 0 0
rocketmq 4 0 0
rocketmq 5 0 0
rocketmq 6 0 0
rocketmq 7 0 0
4.发送消息
#这个操作会发送到默认TOPIC:TopicTest
export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
#省略部分内容
SendResult [sendStatus=SEND_OK, msgId=C0A81F9A0694378FD1AC09AEEC9A03E5, offsetMsgId=C0A81F9A00002A9F000000000003AA27, messageQueue=MessageQueue [topic=TopicTest, brokerName=rocketmq, queueId=0], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A81F9A0694378FD1AC09AEEC9B03E6, offsetMsgId=C0A81F9A00002A9F000000000003AB18, messageQueue=MessageQueue [topic=TopicTest, brokerName=rocketmq, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A81F9A0694378FD1AC09AEEC9C03E7, offsetMsgId=C0A81F9A00002A9F000000000003AC09, messageQueue=MessageQueue [topic=TopicTest, brokerName=rocketmq, queueId=2], queueOffset=249]
5.消费消息
#这个操作会消费默认TOPIC:TopicTest
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_please_rename_unique_group_name_4_15 Receive New Messages: [MessageExt [brokerName=rocketmq, queueId=3, storeSize=240, queueOffset=2005, sysFlag=0, bornTimestamp=1751464373949, bornHost=/192.168.31.154:35294, storeTimestamp=1751464373950, storeHost=/192.168.31.154:10911, msgId=C0A81F9A00002A9F00000000001D7B76, commitLogOffset=1932150, bodyCRC=1470979395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1751465756632, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81F9A0E43378FD1AC09D85ABD0015, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=2500}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50, 49], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_20 Receive New Messages: [MessageExt [brokerName=rocketmq, queueId=3, storeSize=240, queueOffset=2004, sysFlag=0, bornTimestamp=1751464373944, bornHost=/192.168.31.154:35294, storeTimestamp=1751464373944, storeHost=/192.168.31.154:10911, msgId=C0A81F9A00002A9F00000000001D77B6, commitLogOffset=1931190, bodyCRC=367242165, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1751465756632, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81F9A0E43378FD1AC09D85AB80011, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=2500}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 55], transactionId='null'}]]