前面几篇文章中,我们都涉及到 Topic 的创建流程、更新 Topic 权限等等。本章我们将要分析 获取Topic 所在的集群。
继上N篇文章:
RocketMQ源码分析之一概念理解
RocketMQ 源码分析二之更新Topic命令
用法:sh mqadmin topicClusterList -n 192.168.1.100:9876 -t shg
指令:topicClusterList
代码入口:org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand
参数 | 是否必填 | 说明 |
-h | 否 | 打印帮助 |
-n | 是 | nameserve 服务地址列表,格式ip:port;ip:port;… |
-t | 是 | Topic 名字 |
// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh
// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0
// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
// Namesrv 获取注册到Name Server的所有Broker集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106;
// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
public static final int GET_ROUTEINTO_BY_TOPIC = 105;
// 第一步:从 Name Server 获取集群信息
ClusterInfo clusterInfo = examineBrokerClusterInfo();
// 第二步:从 Name Server 获取 Topic 在哪些 Broker 上面
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
// 第三步:上面两个信息去交集
BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
String brokerName = brokerData.getBrokerName();
// private HashMap > clusterAddrTable;
Iterator>> it = clusterInfo.getClusterAddrTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry> next = it.next();
if (next.getValue().contains(brokerName)) {
clusterSet.add(next.getKey());
}
}
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// topic & queue 的信息
private final HashMap> topicQueueTable;
//说明 master 与 slave 是通过 brokerName 进行配对
private final HashMap brokerAddrTable;
// 将 broker 按照 clusterName 分组
private final HashMap > clusterAddrTable;
// 代表一个活的 broker 链接由最后更新时间,一个链接 channel,数据版本和 Ha 地址组成
// Broker 定时向 namesrv 注册并更新 BrokerLiveInfo 的时间戳
private final HashMap brokerLiveTable;
private final HashMap/* Filter Server */> filterServerTable;
// 获取所有的 cluster Info
public byte[] getAllClusterInfo() {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
return clusterInfoSerializeWrapper.encode();
}
}
public class ClusterInfo extends RemotingSerializable {
private HashMap brokerAddrTable;
private HashMap > clusterAddrTable;
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
| 留言与评论(共有 0 条评论) “” |