一、引言 RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式,单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。 RocketMQ主要由Producer、Broker、Consumer、Namesvr等组件组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息,Namesvr负责存储元数据,各组件的主要功能如下:消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。代理服务器(BrokerServer):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。名字服务(NameServer):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的BrokerIP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。生产者组(ProducerGroup):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。消费者组(ConsumerGroup):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。 RocketMQ整体消息处理逻辑上以Topic维度进行生产消费、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。 本篇文章分析的核心在于介绍RocketMQ的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。二、RocketMQ的整体架构 (图片来自于ApacheRocketMQ) RocketMQ架构上主要分为四部分,如上图所示:Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。NameServer:NameServer是一个非常简单的Topic路由注册中心,支持分布式集群方式部署,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。 RocketMQ的Topic的物理分布如上图所示:Topic作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的Broker当中。 Broker中的Queue是Topic对应消息的物理存储单元。 在RocketMQ的整体设计理念当中,消息的生产消费以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创建对应的MessageQueue。 producer生产消息的过程本质上就是选择Topic在Broker的所有的MessageQueue并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。 consumer消费消息的过程本质上就是一个订阅同一个Topic的consumerGroup下的每个consumer按照一定的规则负责Topic下一部分MessageQueue进行消费。 在RocketMQ整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到Broker选择的负载均衡,消息的消费过程主要涉及多consumer和多Broker之间的负责均衡。三、producer消息生产过程 producer消息生产过程:producer首先访问namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包括每个topic在每个Broker的队列分布情况)。producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的消息生产的路由信息。producer根据本地路由信息向Broker发送消息,选择本地路由中具体的Broker进行消息发送。3。1路由同步过程publicclassMQClientInstance{publicbooleanupdateTopicRouteInfoFromNameServer(finalStringtopic){returnupdateTopicRouteInfoFromNameServer(topic,false,null);}publicbooleanupdateTopicRouteInfoFromNameServer(finalStringtopic,booleanisDefault,DefaultMQProducerdefaultMQProducer){try{if(this。lockNamesrv。tryLock(LOCKTIMEOUTMILLIS,TimeUnit。MILLISECONDS)){try{TopicRouteDatatopicRouteData;if(isDefaultdefaultMQProducer!null){省略对应的代码}else{1、负责查询指定的Topic对应的路由信息topicRouteDatathis。mQClientAPIImpl。getTopicRouteInfoFromNameServer(topic,10003);}if(topicRouteData!null){2、比较路由数据topicRouteData是否发生变更TopicRouteDataoldthis。topicRouteTable。get(topic);booleanchangedtopicRouteDataIsChange(old,topicRouteData);if(!changed){changedthis。isNeedUpdateTopicRouteInfo(topic);}3、解析路由信息转化为生产者的路由信息和消费者的路由信息if(changed){TopicRouteDatacloneTopicRouteDatatopicRouteData。cloneTopicRouteData();for(BrokerDatabd:topicRouteData。getBrokerDatas()){this。brokerAddrTable。put(bd。getBrokerName(),bd。getBrokerAddrs());}生成生产者对应的Topic信息{TopicPublishInfopublishInfotopicRouteData2TopicPublishInfo(topic,topicRouteData);publishInfo。setHaveTopicRouterInfo(true);IteratorEntryString,MQProducerInneritthis。producerTable。entrySet()。iterator();while(it。hasNext()){EntryString,MQProducerInnerentryit。next();MQProducerInnerimplentry。getValue();if(impl!null){impl。updateTopicPublishInfo(topic,publishInfo);}}}保存到本地生产者路由表当中this。topicRouteTable。put(topic,cloneTopicRouteData);returntrue;}}}finally{this。lockNamesrv。unlock();}}else{}}catch(InterruptedExceptione){}returnfalse;}} 路由同步过程:路由同步过程是消息生产者发送消息的前置条件,没有路由的同步就无法感知具体发往那个Broker节点。路由同步主要负责查询指定的Topic对应的路由信息,比较路由数据topicRouteData是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。publicclassTopicRouteDataextendsRemotingSerializable{privateStringorderTopicConf;按照broker维度保存的Queue信息privateListQueueDataqueueDatas;按照broker维度保存的broker信息privateListBrokerDatabrokerDatas;privateHashMapStringbrokerAddr,ListStringFilterServerfilterServerTable;}publicclassQueueDataimplementsComparableQueueData{broker的名称privateStringbrokerName;读队列大小privateintreadQueueNums;写队列大小privateintwriteQueueNums;读写权限privateintperm;privateinttopicSynFlag;}publicclassBrokerDataimplementsComparableBrokerData{broker所属集群信息privateStringcluster;broker的名称privateStringbrokerName;broker对应的ip地址信息privateHashMapLongbrokerId,StringbrokeraddressbrokerAddrs;privatefinalRandomrandomnewRandom();}publicclassTopicPublishInfo{privatebooleanorderTopicfalse;privatebooleanhaveTopicRouterInfofalse;最细粒度的队列信息privateListMessageQueuemessageQueueListnewArrayListMessageQueue();privatevolatileThreadLocalIndexsendWhichQueuenewThreadLocalIndex();privateTopicRouteDatatopicRouteData;}publicclassMessageQueueimplementsComparableMessageQueue,Serializable{privatestaticfinallongserialVersionUID6191200464116433425L;Topic信息privateStringtopic;所属的brokerName信息privateStringbrokerName;Topic下的队列信息IdprivateintqueueId;} 路由解析过程:TopicRouteData核心变量QueueData保存每个Broker的队列信息,BrokerData保存Broker的地址信息。TopicPublishInfo核心变量MessageQueue保存最细粒度的队列信息。producer负责将从namesvr获取的TopicRouteData转化为producer本地的TopicPublishInfo。publicclassMQClientInstance{publicstaticTopicPublishInfotopicRouteData2TopicPublishInfo(finalStringtopic,finalTopicRouteDataroute){TopicPublishInfoinfonewTopicPublishInfo();info。setTopicRouteData(route);if(route。getOrderTopicConf()!nullroute。getOrderTopicConf()。length()0){省略相关代码}else{ListQueueDataqdsroute。getQueueDatas();按照brokerName进行排序Collections。sort(qds);遍历所有broker生成队列维度信息for(QueueDataqd:qds){具备写能力的QueueData能够用于队列生成if(PermName。isWriteable(qd。getPerm())){遍历获得指定brokerData进行异常条件过滤BrokerDatabrokerDatanull;for(BrokerDatabd:route。getBrokerDatas()){if(bd。getBrokerName()。equals(qd。getBrokerName())){brokerDatabd;break;}}if(nullbrokerData){continue;}if(!brokerData。getBrokerAddrs()。containsKey(MixAll。MASTERID)){continue;}遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfofor(inti0;iqd。getWriteQueueNums();i){MessageQueuemqnewMessageQueue(topic,qd。getBrokerName(),i);info。getMessageQueueList()。add(mq);}}}info。setOrderTopic(false);}returninfo;}} 路由生成过程:路由生成过程主要是根据QueueData的BrokerName和writeQueueNums来生成MessageQueue对象。MessageQueue是消息发送过程中选择的最细粒度的可发送消息的队列。{TBW102:〔{brokerName:brokera,perm:7,readQueueNums:8,topicSynFlag:0,writeQueueNums:8},{brokerName:brokerb,perm:7,readQueueNums:8,topicSynFlag:0,writeQueueNums:8}〕} 路由解析举例:topic(TBW102)在brokera和brokerb上存在队列信息,其中读写队列个数都为8。先按照brokera、brokerb的名字顺序针对broker信息进行排序。针对brokera会生成8个topic为TBW102的MessageQueue对象,queueId分别是07。针对brokerb会生成8个topic为TBW102的MessageQueue对象,queueId分别是07。topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个brokera的MessageQueue,有8个brokerb的MessageQueue。消息发送过程中的路由选择就是从这16个MessageQueue对象当中获取一个进行消息发送。3。2负载均衡过程publicclassDefaultMQProducerImplimplementsMQProducerInner{privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCallbacksendCallback,finallongtimeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{1、查询消息发送的TopicPublishInfo信息TopicPublishInfotopicPublishInfothis。tryToFindTopicPublishInfo(msg。getTopic());if(topicPublishInfo!nulltopicPublishInfo。ok()){String〔〕brokersSentnewString〔timesTotal〕;根据重试次数进行消息发送for(;timestimesTotal;times){记录上次发送失败的brokerNameStringlastBrokerNamenullmq?null:mq。getBrokerName();2、从TopicPublishInfo获取发送消息的队列MessageQueuemqSelectedthis。selectOneMessageQueue(topicPublishInfo,lastBrokerName);if(mqSelected!null){mqmqSelected;brokersSent〔times〕mq。getBrokerName();try{3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送sendResultthis。sendKernelImpl(msg,mq,communicationMode,sendCallback,topicPublishInfo,timeoutcostTime);switch(communicationMode){caseSYNC:if(sendResult。getSendStatus()!SendStatus。SENDOK){if(this。defaultMQProducer。isRetryAnotherBrokerWhenNotStoreOK()){continue;}}returnsendResult;default:break;}}catch(MQBrokerExceptione){省略相关代码}catch(InterruptedExceptione){省略相关代码}}else{break;}}if(sendResult!null){returnsendResult;}}}} 消息发送过程:查询Topic对应的路由信息对象TopicPublishInfo。从TopicPublishInfo中通过selectOneMessageQueue获取发送消息的队列,该队列代表具体落到具体的Broker的queue队列当中。执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送,重新选择队列会避开上一次发送失败的Broker的队列。publicclassTopicPublishInfo{publicMessageQueueselectOneMessageQueue(finalStringlastBrokerName){if(lastBrokerNamenull){returnselectOneMessageQueue();}else{按照轮询进行选择发送的MessageQueuefor(inti0;ithis。messageQueueList。size();i){intindexthis。sendWhichQueue。getAndIncrement();intposMath。abs(index)this。messageQueueList。size();if(pos0)pos0;MessageQueuemqthis。messageQueueList。get(pos);避开上一次上一次发送失败的MessageQueueif(!mq。getBrokerName()。equals(lastBrokerName)){returnmq;}}returnselectOneMessageQueue();}}} 路由选择过程:MessageQueue的选择按照轮询进行选择,通过全局维护索引进行累加取模选择发送队列。MessageQueue的选择过程中会避开上一次发送失败Broker对应的MessageQueue。 Producer消息发送示意图:某Topic的队列分布为BrokerAQueue1、BrokerAQueue2、BrokerBQueue1、BrokerBQueue2、BrokerCQueue1、BrokerCQueue2,根据轮询策略依次进行选择。发送失败的场景下如BrokerAQueue1发送失败那么就会跳过BrokerA选择BrokerBQueue1进行发送。四、consumer消息消费过程 consumer消息消费过程:consumer访问namesvr同步topic对应的路由信息。consumer在本地解析远程路由信息并保存到本地。consumer在本地进行Reblance负载均衡确定本节点负责消费的MessageQueue。consumer访问Broker消费指定的MessageQueue的消息。4。1路由同步过程publicclassMQClientInstance{1、启动定时任务从namesvr定时同步路由信息privatevoidstartScheduledTask(){this。scheduledExecutorService。scheduleAtFixedRate(newRunnable(){Overridepublicvoidrun(){try{MQClientInstance。this。updateTopicRouteInfoFromNameServer();}catch(Exceptione){log。error(ScheduledTaskupdateTopicRouteInfoFromNameServerexception,e);}}},10,this。clientConfig。getPollNameServerInterval(),TimeUnit。MILLISECONDS);}publicvoidupdateTopicRouteInfoFromNameServer(){SetStringtopicListnewHashSetString();遍历所有的consumer订阅的Topic并从namesvr获取路由信息{IteratorEntryString,MQConsumerInneritthis。consumerTable。entrySet()。iterator();while(it。hasNext()){EntryString,MQConsumerInnerentryit。next();MQConsumerInnerimplentry。getValue();if(impl!null){SetSubscriptionDatasubListimpl。subscriptions();if(subList!null){for(SubscriptionDatasubData:subList){topicList。add(subData。getTopic());}}}}}for(Stringtopic:topicList){this。updateTopicRouteInfoFromNameServer(topic);}}publicbooleanupdateTopicRouteInfoFromNameServer(finalStringtopic,booleanisDefault,DefaultMQProducerdefaultMQProducer){try{if(this。lockNamesrv。tryLock(LOCKTIMEOUTMILLIS,TimeUnit。MILLISECONDS)){try{TopicRouteDatatopicRouteData;if(isDefaultdefaultMQProducer!null){省略代码}else{topicRouteDatathis。mQClientAPIImpl。getTopicRouteInfoFromNameServer(topic,10003);}if(topicRouteData!null){TopicRouteDataoldthis。topicRouteTable。get(topic);booleanchangedtopicRouteDataIsChange(old,topicRouteData);if(!changed){changedthis。isNeedUpdateTopicRouteInfo(topic);}if(changed){TopicRouteDatacloneTopicRouteDatatopicRouteData。cloneTopicRouteData();for(BrokerDatabd:topicRouteData。getBrokerDatas()){this。brokerAddrTable。put(bd。getBrokerName(),bd。getBrokerAddrs());}构建consumer侧的路由信息{SetMessageQueuesubscribeInfotopicRouteData2TopicSubscribeInfo(topic,topicRouteData);IteratorEntryString,MQConsumerInneritthis。consumerTable。entrySet()。iterator();while(it。hasNext()){EntryString,MQConsumerInnerentryit。next();MQConsumerInnerimplentry。getValue();if(impl!null){impl。updateTopicSubscribeInfo(topic,subscribeInfo);}}}this。topicRouteTable。put(topic,cloneTopicRouteData);returntrue;}}}finally{this。lockNamesrv。unlock();}}}catch(InterruptedExceptione){}returnfalse;}} 路由同步过程:路由同步过程是消息消费者消费消息的前置条件,没有路由的同步就无法感知具体待消费的消息的Broker节点。consumer节点通过定时任务定期从namesvr同步该消费节点订阅的topic的路由信息。consumer通过updateTopicSubscribeInfo将同步的路由信息构建成本地的路由信息并用以后续的负责均衡。4。2负载均衡过程publicclassRebalanceServiceextendsServiceThread{privatestaticlongwaitIntervalLong。parseLong(System。getProperty(rocketmq。client。rebalance。waitInterval,20000));privatefinalMQClientInstancemqClientFactory;publicRebalanceService(MQClientInstancemqClientFactory){this。mqClientFactorymqClientFactory;}Overridepublicvoidrun(){while(!this。isStopped()){this。waitForRunning(waitInterval);this。mqClientFactory。doRebalance();}}} 负载均衡过程:consumer通过RebalanceService来定期进行重新负载均衡。RebalanceService的核心在于完成MessageQueue和consumer的分配关系。publicabstractclassRebalanceImpl{privatevoidrebalanceByTopic(finalStringtopic,finalbooleanisOrder){switch(messageModel){caseBROADCASTING:{省略相关代码break;}caseCLUSTERING:{集群模式下的负载均衡1、获取topic下所有的MessageQueueSetMessageQueuemqSetthis。topicSubscribeInfoTable。get(topic);2、获取topic下该consumerGroup下所有的consumer对象ListStringcidAllthis。mQClientFactory。findConsumerIdList(topic,consumerGroup);3、开始重新分配进行rebalanceif(mqSet!nullcidAll!null){ListMessageQueuemqAllnewArrayListMessageQueue();mqAll。addAll(mqSet);Collections。sort(mqAll);Collections。sort(cidAll);AllocateMessageQueueStrategystrategythis。allocateMessageQueueStrategy;ListMessageQueueallocateResultnull;try{4、通过分配策略重新进行分配allocateResultstrategy。allocate(this。consumerGroup,this。mQClientFactory。getClientId(),mqAll,cidAll);}catch(Throwablee){return;}SetMessageQueueallocateResultSetnewHashSetMessageQueue();if(allocateResult!null){allocateResultSet。addAll(allocateResult);}5、根据分配结果执行真正的rebalance动作booleanchangedthis。updateProcessQueueTableInRebalance(topic,allocateResultSet,isOrder);if(changed){this。messageQueueChanged(topic,mqSet,allocateResultSet);}}break;}default:break;}} 重新分配流程:获取topic下所有的MessageQueue。获取topic下该consumerGroup下所有的consumer的cid(如192。168。0。815958)。针对mqAll和cidAll进行排序,mqAll排序顺序按照先BrokerName后BrokerId,cidAll排序按照字符串排序。通过分配策略AllocateMessageQueueStrategy重新分配。根据分配结果执行真正的rebalance动作。publicclassAllocateMessageQueueAveragelyimplementsAllocateMessageQueueStrategy{privatefinalInternalLoggerlogClientLogger。getLog();OverridepublicListMessageQueueallocate(StringconsumerGroup,StringcurrentCID,ListMessageQueuemqAll,ListStringcidAll){ListMessageQueueresultnewArrayListMessageQueue();核心逻辑计算开始计算当前cid的下标intindexcidAll。indexOf(currentCID);计算多余的模值intmodmqAll。size()cidAll。size();计算平均大小intaverageSizemqAll。size()cidAll。size()?1:(mod0indexmod?mqAll。size()cidAll。size()1:mqAll。size()cidAll。size());计算起始下标intstartIndex(mod0indexmod)?indexaverageSize:indexaverageSizemod;计算范围大小intrangeMath。min(averageSize,mqAll。size()startIndex);组装结果for(inti0;irange;i){result。add(mqAll。get((startIndexi)mqAll。size()));}returnresult;}核心逻辑计算结束OverridepublicStringgetName(){returnAVG;}}rocketMq的集群存在3个broker,分别是brokera、brokerb、brokerc。rocketMq上存在名为topicdemo的topic,writeQueue写队列数量为3,分布在3个broker。排序后的mqAll的大小为9,依次为〔brokera0brokera1brokera2brokerb0brokerb1brokerb2brokerc0brokerc1brokerc2〕rocketMq存在包含4个consumer的consumergroup,排序后cidAll依次为〔192。168。0。615956192。168。0。715957192。168。0。815958192。168。0。915959〕192。168。0。615956的分配MessageQueue结算过程index:0mod:941averageSize:9413startIndex:0range:3messageQueue:〔brokera0、brokera1、brokera2〕192。168。0。615957的分配MessageQueue结算过程index:1mod:941averageSize:942startIndex:3range:2messageQueue:〔brokerb0、brokerb1〕192。168。0。615958的分配MessageQueue结算过程index:2mod:941averageSize:942startIndex:5range:2messageQueue:〔brokerb2、brokerc0〕192。168。0。615959的分配MessageQueue结算过程index:3mod:941averageSize:942startIndex:7range:2messageQueue:〔brokerc1、brokerc2〕 分配策略分析:整体分配策略可以参考上图的具体例子,可以更好的理解分配的逻辑。 consumer的分配:同一个consumerGroup下的consumer对象会分配到同一个Topic下不同的MessageQueue。每个MessageQueue最终会分配到具体的consumer当中。五、RocketMQ指定机器消费设计思路 日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。publicclassAllocateMessageQueueAveragelyimplementsAllocateMessageQueueStrategy{privatefinalInternalLoggerlogClientLogger。getLog();OverridepublicListMessageQueueallocate(StringconsumerGroup,StringcurrentCID,ListMessageQueuemqAll,ListStringcidAll){ListMessageQueueresultnewArrayListMessageQueue();通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费if(!cidAll。contains(currentCID)){returnresult;}intindexcidAll。indexOf(currentCID);intmodmqAll。size()cidAll。size();intaverageSizemqAll。size()cidAll。size()?1:(mod0indexmod?mqAll。size()cidAll。size()1:mqAll。size()cidAll。size());intstartIndex(mod0indexmod)?indexaverageSize:indexaverageSizemod;intrangeMath。min(averageSize,mqAll。size()startIndex);for(inti0;irange;i){result。add(mqAll。get((startIndexi)mqAll。size()));}returnresult;}} consumer负载均衡策略改写:通过改写负载均衡策略AllocateMessageQueueAveragely的allocate机制保证只有指定IP的机器能够进行消费。通过IP进行判断是基于RocketMQ的cid格式是192。168。0。615956,其中前面的IP地址就是对于的消费机器的ip地址,整个方案可行且可以实际落地。六、小结 本文主要介绍了RocketMQ在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。 作者:vivo互联网技术 链接:https:juejin。cnpost7083676205022445582