RocketMQ源码分析之NameServer核心组件RouteInfoManager源码分析
一、前言
前文我们介绍了NameServer核心组件KVConfigManager,本文我们介绍NameServer另一个核心组件RouteInfoManager路由数据管理组件,该组件存放着整个消息集群的相关消息;
二、RouteInfoManager构造方法及字段public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // broker网络长连接过期时间,长连接空闲过期时间是2分钟 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // 读写锁 private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 创建topic以后,每个topic是逻辑上的概念,都是有多个queue,这些queue分散在不同的broker组里 // topic->queues private final HashMap> topicQueueTable; // 一个broker name -> broker data,代表的是一个broker组,一个broker data应该是包含了一组broker数据 private final HashMap brokerAddrTable; // 一个nameserver是可以管理多个broker cluster,通常来说就一个cluster就可以了 // 多业务,对于大型的公司来说,他可能是有多个业务的,每个业务是可以部署独立的broker集群,对应的都是一个nameserver private final HashMap> clusterAddrTable; // 顾名思义,他应该是用于管理跟broker之间的长连接、是否还有心跳、保活 private final HashMap brokerLiveTable; // filter server是什么东西,rocketmq高阶的功能,我们可以基于tag来进行数据筛选,比较简单,没办法支持更加复杂细粒度的数据筛选 // rocketmq是支持一个高阶功能,叫做filter server,在每台broker机器上是可以启动一个filter server // filter server启动之后会跟本地的broker来进行长连接构建,注册,以及心跳和保活 // 我们可以把一个自定义的消息筛选的class,一个类,上传到filter server里去,我们消费数据的时候,让broker // 把数据先传输到本地机器的filter server里去,filter server基于你自定义的class来进行细粒度的数据筛选 // 把精细筛选后的数据再回传给你的消费端 // 每个broker机器上是可以启动一个或者是多个filter server,都会传输给nameserver private final HashMap/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap>(1024); this.brokerAddrTable = new HashMap(128); this.clusterAddrTable = new HashMap>(32); this.brokerLiveTable = new HashMap(256); this.filterServerTable = new HashMap>(256); } // 省略… }public class QueueData implements Comparable { private String brokerName; // 每个queue都属于一个数据分区,一定是在一个broker组里 private int readQueueNums; // 分成write queue和read queue private int writeQueueNums; // write queue是用于写入数据的路由的,read queue是用于消费数据的路由的 // 在这个broker里,我的topic有4个write queue,还有4个read queue // 随机的从4个write queue里获取到一个queue来写入数据,在消费的时候,从4个read queue里随机的挑选一个,来读取数据 // 4个write queue,2个read queue -> 会均匀的写入到4个write queue里去,读数据的时候仅仅会读里面的2个queue的数据 // 4个write queue,8个read queue -> 你只会写入4个queue里,但是消费的时候随机从8个queue里消费的 // 区分读写队列作用是帮助我们对topic的queues进行扩容和缩容,8个write queue + 8个read queue // 4个write queue -> 写入数据仅仅会进入这4个write queue里去 // 8个read queue,读取数据,有4个queue持续消费到最新的数据,另外4个queue不会写入新数据,但是会把他 // 也有的数据全部消费完毕,把8个read queue -> 4个read queue private int perm; private int topicSysFlag; }public class BrokerData implements Comparable { // broker集群拓扑架构,一个broker集群 -> 多个broker组(broker name)-> 多个broker机器(主从复制,高可用) // 这一组broker是属于哪个cluster private String cluster; // broker name代表了当前的broker组 private String brokerName; // 当前这一组broker里面包含了具体的几个broker机器, private HashMap brokerAddrs; private final Random random = new Random(); public BrokerData() { } }class BrokerLiveInfo { // broker是可以主动给nameserver上报心跳,每次上报都可以更新这个时间戳 private long lastUpdateTimestamp; // broker数据版本号 private DataVersion dataVersion; // netty channel,网络连接,长连接的概念 private Channel channel; // 跟你当前这个broker机器构成HA高可用的broker地址 private String haServerAddr; public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel, String haServerAddr) { this.lastUpdateTimestamp = lastUpdateTimestamp; this.dataVersion = dataVersion; this.channel = channel; this.haServerAddr = haServerAddr; } }topicQueueTable:topic消息队列的路由信息,消息发送的时候会根据路由表进行负载均衡。Key为topic名称,value也是一个Map:以brokerName为key,value是队列数据如上代码所示,包含读/写队列数量、权重等。brokerAddrTable:broker的基础信息,Key为brokerName,value包含brokerName,broker所在的集群信息,主备broker的地址。clusterAddrTable:broker集群信息,Key为集群名称(clusterName),value存储的是集群中所有broker的名称(brokerName)。brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10秒要扫描的信息。filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃。三、路由注册流程分析加写锁,防止并发修改路由表。首先判断Broker所属的集群(clusterName)是否存在,如果不存在则创建集群(clusterAddrTable),然后将Broker的名称添加到集群的Broker集合中。维护BrokerData信息,先从brokerAddrTable中根据Broker的名称来获取BrokerData,如果不存在,则新建一个BrokerData并保存进brokerAddrTable,registerFirst设置为true。如果该Broker已经存在对应的BrokerData,直接替换掉原来的,registerFirst为false。registerFirst为true表示第一次注册。如果接收到的Broker信息为主节点,并且Broker的Topic配置发生了变化或者是第一次注册,则需要创建或更新Topic的路由元数据(QueueData),并且把路由元数据设置/更新到topicQueueTable。其实就是为默认主题自动注册路由信息,其中包含MixAll.DEFAULT_TOPIC的路由信息。当消息生产者发送消息到主题时,如果该主题未创建,并且BrokerConfig的autoCreateTopicEnable为true,则返回MixAll.DEFAULT_TOPIC的路由信息。更新brokerLiveTable,存储能正常使用的Broker信息。BrokerLiveInfo是执行路由删除操作的重要依据。注册Broker的过滤器Server地址列表,一个Broker会关联多个FilterServer消息过滤服务器。如果此Broker是从节点,还需要查找该Broker的主节点信息,并且更新对应的masterAdd属性。最后解锁,返回注册结果。public RegisterBrokerResult registerBroker( final String clusterName, // broker所属的cluster集群 final String brokerAddr, // broker机器地址 final String brokerName, // broker所属的组名称 final long brokerId, // broker机器自己的id final String haServerAddr, // 跟你的这个broker互为HA高可用的一个机器地址 final TopicConfigSerializeWrapper topicConfigWrapper, // 当前的这个broker机器上面包含的topic队列数据 final List filterServerList, // broker机器上面部署的filter server列表 final Channel channel) { // 物理上的netty channel网络长连接 RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 路由注册需要枷锁,防止并发修改RouteInfoManger中的路由表。 this.lock.writeLock().lockInterruptibly(); // 拿到一个cluster集群对应的broker组,把我们的这个broker组加入到cluster里去 // 为什么要用set数据结构,一个broker组是有多个broker机器,会注册多次,组加入cluster // 必须是set,这样可以对组进行去重 Set brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // broker组是不是第一次来注册 boolean registerFirst = false; // 如果是broker组第一次来注册,给初始化一份broker组数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap()); this.brokerAddrTable.put(brokerName, brokerData); } // 拿到broker组数据里的小map,broker组里的broker机器map Map brokerAddrsMap = brokerData.getBrokerAddrs(); // Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> // The same IP:PORT must only have one record in brokerAddrTable // 这个地方是处理一些异常数据,如果说你注册过来的broker机器地址跟之前注册过的机器地址是一样的 // 但是broker id是不同的,同一台机器,你启动了不同的broker节点(用的是不同的broker.conf),是不对的 Iterator> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } // 把本次要注册broker地址放到了broker组对应的broker机器地址列表里去 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 如果说你是一组broker里的master,而且你上报了你管理的topic数据 // 处理broker组管理的topic的队列数据,会更新到内存的map里去 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 如果broker是主节点并且topic配置信息发生该表(dataVersion不一致)或者是初次注册,需要创建或更新topic路由元数据 // 并填充topicQueueTable,其实就是为默认主题自动注册路由信息,其中包含 MixAll.DEFAULT_TOPIC的路由信息。 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry entry : tcTable.entrySet()) { // 更新或创建新的 QueueData this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 维护跟broker之间的保活数据 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // 维护broker机器上部署的filter server的列表 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } // 如果说注册过来的机器是一组broker里的slave if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { // 他会把你的一组broker里的slave broker来注册的时候 // 给你的注册结果里设置进去你的ha server addr,是你的这一组broker里master他的ha server addr result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); // 他还会把你这一组的broker里的master地址设置进去返回给你 result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }// 维护topic在各个broker里的队列数据 private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { // 创建队列信息 QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); // 如果不存在该队列的信息则新建 queueDataMap 存放到 topicQueueTable List queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList(); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } // 存在,直接更新替换旧的 else { boolean addNewOne = true; Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); } } } if (addNewOne) { queueDataList.add(queueData); } } }
NameServer与Broker保持着长连接,Broker的状态信息存储在brokerLive-Table中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServer-Table)。更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发。同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。四、NameServer处理心跳流程分析
主要是通过默认的请求处理组件DefaultRequestProcessor接收心跳请求,调用queryBrokerTopicConfig方法触发RouteInfoManager中的updateBrokerInfoUpdateTimestamp方法进行broker保活,更新时间戳;
注:后续我们会分析默认请求处理组件DefaultRequestProcessor的源码;// 如果说你要是broker可以定期向你的nameserver进行心跳的话,每次心跳 // 都会更新一下broker保活数据的时间戳 public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) { BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr); if (prev != null) { prev.setLastUpdateTimestamp(System.currentTimeMillis()); } }五、路由删除流程分析
NameServer会每隔10s扫描一次brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker的连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有两个触发点来触发路由删除操作:NameServer定时扫描brokerLiveTable,检测上次心跳包与当前系统时间的时间戳,如果时间戳大于120s,则需要移除该Broker信息。Broker在正常关闭的情况下,会执行unregisterBroker指令移除该Broker信息。
1、NameServer定时扫描brokerLiveTable
每10s执行一次。逻辑也很简单,先遍历brokerLiveInfo路由表(HashMap),检测BrokerLiveInfo的LastUpdateTimestamp上次收到心跳包的时间,如果超过120s,则认为该Broker已不可用,然后将它移除并关闭连接,最后删除与该Broker相关的路由信息。// broker定时保活扫描,如果说你的broker机器跟nameserver之间超过2分钟没有通信 // 等于说关闭掉跟你的物理网络连接,以及清理掉内存数据结构里关于这个broker机器的数据 public void scanNotActiveBroker() { // 扫描的就是这个BrokerLiveTable,路由信息表。还有一个Brokernames Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); // 根据心跳时间判断是否存活的核心逻辑。两分钟未发送心跳注册请求 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }获取读锁,如果Channel不为空,就遍历brokerLiveTable尝试获取使用了该Channel的Broker。最后解锁。获取写锁,根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息。维护brokerAddrTable。遍历brokerAddrTable,从BrokerData的brokerAddrs中,找到具体的Broker,从BrokerData中将其移除。如果移除后在BrokerData中不再包含其他Broker,则在brokerAddrTable中移除该brokerName对应的条目。维护clusterAddrTable,也是遍历。找到Broker并将其从集群中基础。如果移除后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除。维护topicQueueTable,遍历所有主题的队列,如果队列中包含要删除的Broker的队列,则移除,如果Topic只包含待移除Broker的队列,则从topicQueueTable删除该Topic释放写锁,完成路由删除操作。public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { // 获取读锁 this.lock.readLock().lockInterruptibly(); Iterator> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); // 遍历brokerLiveTable while (itBrokerLiveTable.hasNext()) { Entry entry = itBrokerLiveTable.next(); // 获取使用该channel的brokerAddr if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } // channel为空或者没有使用该channel的Broker if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker"s channel destroyed, {}, clean it"s data structure at once", brokerAddrFound); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { // 申请写锁 this.lock.writeLock().lockInterruptibly(); // 根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息 this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; Iterator> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); // 遍历 brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); // 移除该 brokerAddr的信息 if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } if (brokerNameFound != null && removeBrokerName) { Iterator>> it = this.clusterAddrTable.entrySet().iterator(); // 遍历 clusterAddrTable while (it.hasNext()) { Entry> entry = it.next(); String clusterName = entry.getKey(); Set brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); // 成功移除Broker之后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除 if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } if (removeBrokerName) { // 遍历 topicQueueTable Iterator>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List queueDataList = entry.getValue(); Iterator itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } // 如果队列已经为空,移除该Topic if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { // 释放写锁 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
2、执行unregisterBroker方法下线Broker// broker可以注册,也可以来进行下线 -> 集群+组+机器 public void unregisterBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId) { try { try { this.lock.writeLock().lockInterruptibly(); BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo != null ? "OK" : "Failed", brokerAddr ); this.filterServerTable.remove(brokerAddr); boolean removeBrokerName = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr != null ? "OK" : "Failed", brokerAddr ); if (brokerData.getBrokerAddrs().isEmpty()) { this.brokerAddrTable.remove(brokerName); log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", brokerName ); removeBrokerName = true; } } if (removeBrokerName) { Set nameSet = this.clusterAddrTable.get(clusterName); if (nameSet != null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", removed ? "OK" : "Failed", brokerName); if (nameSet.isEmpty()) { this.clusterAddrTable.remove(clusterName); log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); } } this.removeTopicByBrokerName(brokerName); } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("unregisterBroker Exception", e); } }// 对一个broker把他管理的topic数据移除掉 private void removeTopicByBrokerName(final String brokerName) { Iterator>> itMap = this.topicQueueTable.entrySet().iterator(); while (itMap.hasNext()) { Entry> entry = itMap.next(); String topic = entry.getKey(); List queueDataList = entry.getValue(); Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { log.info("removeTopicByBrokerName, remove one broker"s topic {} {}", topic, qd); it.remove(); } } if (queueDataList.isEmpty()) { log.info("removeTopicByBrokerName, remove the topic all queue {}", topic); itMap.remove(); } } }