在rocketmq中rocketmqTool作为可视化和二次开发使用比较多的类org.apache.rocketmq.tools.admin.MQAdminExt,这个类在admin里面:
在rocketmq中:
而我们知道创建rocketmq创建的过程中,会启动很多东西,这个连接的过程涉及到的内容比较多,可以看到rocketmq的rocketmq-dashboard和mqcloud里面都使用了池化技术-对象池。rocketmq-dashboard池化org.apache.rocketmq.dashboard.admin.MqAdminExtObjectPool:
@Bean
public GenericObjectPool<MQAdminExt> mqAdminExtPool() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestWhileIdle(true);
genericObjectPoolConfig.setMaxWaitMillis(10000);
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000);
MQAdminPooledObjectFactory mqAdminPooledObjectFactory = new MQAdminPooledObjectFactory();
MQAdminFactory mqAdminFactory = new MQAdminFactory(rmqConfigure);
mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory);
GenericObjectPool<MQAdminExt> genericObjectPool = new GenericObjectPool<MQAdminExt>(
mqAdminPooledObjectFactory,
genericObjectPoolConfig);
return genericObjectPool;
}
同时进行借和还:
public static void createMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
try {
// Get the mqAdmin instance from the object pool
MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject();
MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt);
} catch (Exception e) {
LOGGER.error("get mqAdmin from pool error", e);
}
}
public static void returnMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
if (mqAdminExt != null) {
try {
// After execution, return the mqAdmin instance to the object pool
mqAdminExtPool.returnObject(mqAdminExt);
} catch (Exception e) {
LOGGER.error("return mqAdmin to pool error", e);
}
}
MQ_ADMIN_EXT_THREAD_LOCAL.remove();
}
同时mqcloud再次基础上做了自己的封装模板,方便每次调用的时候,进行回调:
@Autowired
private GenericKeyedObjectPool<Cluster, MQAdminExt> mqPool;
/**
* 执行操作
* @param callback
* @return
* @throws Exception
*/
public <T> T execute(MQAdminCallback<T> callback) {
MQAdminExt mqAdmin = null;
try {
// 获取mqAdmin实例
mqAdmin = mqPool.borrowObject(callback.mqCluster());
if(mqAdmin == null) {
logger.warn("cluster:{} cannot get mqadmin!", callback.mqCluster());
return null;
}
// 触发回调
T t = callback.callback(mqAdmin);
return t;
} catch (Exception e) {
try {
// 触发异常情况回调
return callback.exception(e);
} catch (Exception ex) {
logger.warn("cluster:{} exception err:{}", callback.mqCluster(), ex.getMessage());
return null;
}
} finally {
if(mqAdmin != null) {
try {
mqPool.returnObject(callback.mqCluster(), mqAdmin);
} catch (Exception e) {
logger.warn("cluster:{} shutdown err:{}", callback.mqCluster(), e.getMessage());
}
}
}
}
解决了rocketmq实列对象的复用外,还需要知道rocketmq提供了哪些api可以进行扩展。
mqClientInstance 启动和关闭
broker操作
BrokerCluster操作
PlainAccess操作
GlobalWhiteAddr 白名单
topic 主题操作
SubscriptionGroup 订阅组操作
consume 消费操作
producer生产者操作
Offset 偏移量
NameServer操作
message操作
subscription 订阅
task 清理任务
trace 链路操作
stat 统计
DefaultMQAdminExtImpl 默认mqAdmin扩展实现
通过rocketmq-tool提供的api相关功能,我们可以进行rocketmq的mqAdmin扩展的启动、关闭,同时针对broker、producer、consume、offset、nameServer、message、trace相关操作和统计,同时触发task进行相关清理工作。
在rocketmq-dashboard中,可以看到使用了它们的使用:
start 启动
getBrokerConfig 获取broker配置
examineBrokerClusterInfo brokerClusterInfo信息
fetchBrokerRuntimeStats 获取broker运行时统计信息
viewMessage 查看消息
queryMessage 查询消息
messageTrackDetail 链路详情
consumeMessageDirectly 直接消费消息
examineConsumerConnectionInfo 消费者连接信息
examineProducerConnectionInfo 生产者连接信息
examineTopicStats
fetchAllTopicList
fetchBrokerRuntimeStats
examineConsumeStats
examineTopicRouteInfo
getNameServerAddressList
wipeWritePermOfBroker
.....
可以看到dashBoard基于rocketmq-tool模块,做了很多功能,而且这些功能都是可视化的。