上一章中,我们分析了 RocketMQ Broker 端会定时上报数据到 Name Server 端。那么,本章节将详细分析如何获取注册到 Name Server 中 RocketMQ Broker 集群的详细信息。
继前几篇文章:
用法:sh mqadmin clusterList -n 192.168.1.100:9876
指令:clusterList
代码入口:org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand
参数 | 是否必填 | 说明 |
-h | 否 | 打印帮助 |
-n | 是 | Name Server 服务地址列表,格式ip:port;ip:port;… |
-m | 否 | 打印Broker 端更多的采集数据 |
-i | 是 | 打印配置间隔。单位 秒 |
// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh
// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0
// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
这里只分析打印更多详细信息的!
只摘抄核心代码片段![微笑]
// 获取注册在 Name Server 上的 集群信息
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
// 根据 Broker 信息,获取 Broker 上面统计的数据,是 MAP 结构
KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
// 获取注册到Name Server的所有Broker集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106;
// Broker 获取Broker运行时信息
public static final int GET_BROKER_RUNTIME_INFO = 28;
this.namesrvController.getRouteInfoManager().getAllClusterInfo();
/**
* 代码入口:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo
* 得到集群的相关信息:broker 与 cluster 地址的信息
* @return
*/
public byte[] getAllClusterInfo() {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
return clusterInfoSerializeWrapper.encode();
}
// m没有很复杂的逻辑在里面,就简单的组合对象罢了
public class ClusterInfo extends RemotingSerializable {
private HashMap brokerAddrTable;
private HashMap > clusterAddrTable;
}
代码入口:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker Name Server
核心类:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
下面也是摘抄代码块出来[呲牙]
// Broker 定时往 Name Server 上面注册
// Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
// public static final int REGISTER_BROKER = 103;
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
代码入口:org.apache.rocketmq.broker.processor.AdminBrokerProcessor#prepareRuntimeInfo
下面只摘抄核心代码片段[比心]
HashMap runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
runtimeInfo.put("msgPutTotalYesterdayMorning",String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
Broker 运行时数据采集有两个方向:
1、实时获取
2、统计昨天的
// 统计昨天的
BrokerController.this.getBrokerStats().record();
// 这个类是管理 RocketMQ Broker 数据采集的。org.apache.rocketmq.store.stats.BrokerStatsManager
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
| 留言与评论(共有 0 条评论) “” |