RocketMQ主从模式下的消费进度管理
from:cnblogs。comshanmlp16989785。htm
消费者在启动的时候,会创建消息拉取API对象PullAPIWrapper,调用pullKernelImpl方法向Broker发送拉取消息的请求,那么在主从模式下消费者是如何选择向哪个Broker发送拉取请求的?
进入pullKernelImpl方法中,可以看到会调用recalculatePullFromWhichNode方法选择一个Broker:publicclassPullAPIWrapper{publicPullResultpullKernelImpl(finalMessageQueuemq,finalStringsubExpression,finalStringexpressionType,finallongsubVersion,finallongoffset,finalintmaxNums,finalintsysFlag,finallongcommitOffset,finallongbrokerSuspendMaxTimeMillis,finallongtimeoutMillis,finalCommunicationModecommunicationMode,finalPullCallbackpullCallback)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{调用recalculatePullFromWhichNode方法获取BrokerID,再调用findBrokerAddressInSubscribe根据ID获取Broker的相关信息FindBrokerResultfindBrokerResultthis。mQClientFactory。findBrokerAddressInSubscribe(mq。getBrokerName(),this。recalculatePullFromWhichNode(mq),false);。。。if(findBrokerResult!null){。。。获取Broker地址StringbrokerAddrfindBrokerResult。getBrokerAddr();if(PullSysFlag。hasClassFilterFlag(sysFlagInner)){brokerAddrcomputePullFromWhichFilterServer(mq。getTopic(),brokerAddr);}发送消息拉取请求PullResultpullResultthis。mQClientFactory。getMQClientAPIImpl()。pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);returnpullResult;}}}
在recalculatePullFromWhichNode方法中,会从pullFromWhichNodeTable中根据消息队列获取一个建议的BrokerID,如果获取为空就返回Master节点的BrokerID,ROCKETMQ中Master角色的BrokerID为0,既然从pullFromWhichNodeTable中可以知道从哪个Broker拉取数据,那么pullFromWhichNodeTable中的数据又是从哪里来的?publicclassPullAPIWrapper{KEY为消息队列,VALUE为建议的BrokerIDprivateConcurrentMapMessageQueue,AtomicLongbrokerIdpullFromWhichNodeTablenewConcurrentHashMapMessageQueue,AtomicLong(32);publiclongrecalculatePullFromWhichNode(finalMessageQueuemq){if(this。isConnectBrokerByUser()){returnthis。defaultBrokerId;}从pullFromWhichNodeTable中获取建议的brokerIDAtomicLongsuggestthis。pullFromWhichNodeTable。get(mq);if(suggest!null){returnsuggest。get();}返回MasterBrokerIDreturnMixAll。MASTERID;}}
通过调用关系可知,在updatePullFromWhichNode方法中更新了pullFromWhichNodeTable的值,而updatePullFromWhichNode方法又是被processPullResult方法调用的,消费者向Broker发送拉取消息请求后,Broker对拉取请求进行处理时会设置一个brokerID(后面会讲到),建议下次从这个Broker拉取消息,
消费者对拉取请求返回的响应数据进行处理时会调用processPullResult方法,在这里将建议的BrokerID取出,调用updatePullFromWhichNode方法将其加入到了pullFromWhichNodeTable中:publicclassPullAPIWrapper{privateConcurrentMapMessageQueue,AtomicLongbrokerIdpullFromWhichNodeTablenewConcurrentHashMapMessageQueue,AtomicLong(32);publicPullResultprocessPullResult(finalMessageQueuemq,finalPullResultpullResult,finalSubscriptionDatasubscriptionData){PullResultExtpullResultExt(PullResultExt)pullResult;将拉取消息请求返回的建议BrokerID,加入到pullFromWhichNodeTable中this。updatePullFromWhichNode(mq,pullResultExt。getSuggestWhichBrokerId());。。。}publicvoidupdatePullFromWhichNode(finalMessageQueuemq,finallongbrokerId){AtomicLongsuggestthis。pullFromWhichNodeTable。get(mq);if(nullsuggest){向pullFromWhichNodeTable中添加数据this。pullFromWhichNodeTable。put(mq,newAtomicLong(brokerId));}else{suggest。set(brokerId);}}}
接下来去看下是根据什么条件决定选择哪个Broker的。返回建议的BrokerID
Broker在处理消费者拉取请求时,会调用PullMessageProcessor的processRequest方法,首先会调用MessageStore的getMessage方法获取消息内容,在返回的结果GetMessageResult中设置了一个是否建议从Slave节点拉取的属性(这个值的设置稍后再说),会根据是否建议从slave节点进行以下处理:如果建议从slave节点拉取消息,会调用subscriptionGroupConfig订阅分组配置的getWhichBrokerWhenConsumeSlowly方法获取从节点将ID设置到响应中,否则下次依旧建议从主节点拉取消息,将MASTER节点的ID设置到响应中;判断当前Broker的角色,如果是slave节点,并且配置了不允许从slave节点读取数据(SlaveReadEnablefalse),此时依旧建议从主节点拉取消息,将MASTER节点的ID设置到响应中;如果开启了允许从slave节点读取数据(SlaveReadEnabletrue),有以下两种情况:如果建议从slave节点拉消息,从订阅分组配置中获取从节点的ID,将ID设置到响应中;如果不建议从slave节点拉取消息,从订阅分组配置中获取设置的BrokerId;
当然,如果未开启允许从Slave节点读取数据,下次依旧建议从Master节点拉取;
订阅分组配置
mqadmin命令的i参数可以指定从哪个Broker消费消息(subscriptionGroupConfig的getBrokerId返回的值),w参数可以指定建议从slave节点消费的时候,从哪个slave消费(subscriptionGroupConfig的getWhichBrokerWhenConsumeSlowly方法返回的值):usage:mqadminupdateSubGroup〔a〕〔b〕〔c〕〔d〕g〔h〕〔i〕〔m〕〔n〕〔q〕〔r〕〔s〕〔w〕i,brokerIdconsumerfromwhichbrokeridw,whichBrokerWhenConsumeSlowlywhichbrokeridwhenconsumeslowlypublicclassPullMessageProcessorextendsAsyncNettyRequestProcessorimplementsNettyRequestProcessor{privateRemotingCommandprocessRequest(finalChannelchannel,RemotingCommandrequest,booleanbrokerAllowSuspend)throwsRemotingCommandException{。。。根据拉取偏移量获取消息finalGetMessageResultgetMessageResultthis。brokerController。getMessageStore()。getMessage(requestHeader。getConsumerGroup(),requestHeader。getTopic(),requestHeader。getQueueId(),requestHeader。getQueueOffset(),requestHeader。getMaxMsgNums(),messageFilter);if(getMessageResult!null){response。setRemark(getMessageResult。getStatus()。name());responseHeader。setNextBeginOffset(getMessageResult。getNextBeginOffset());responseHeader。setMinOffset(getMessageResult。getMinOffset());responseHeader。setMaxOffset(getMessageResult。getMaxOffset());是否建议从从节点拉取消息if(getMessageResult。isSuggestPullingFromSlave()){选择一个从节点responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getWhichBrokerWhenConsumeSlowly());}else{responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}判断Broker的角色switch(this。brokerController。getMessageStoreConfig()。getBrokerRole()){caseASYNCMASTER:caseSYNCMASTER:break;caseSLAVE:如果不允许从从节点读取数据,设置为MasterIDif(!this。brokerController。getBrokerConfig()。isSlaveReadEnable()){response。setCode(ResponseCode。PULLRETRYIMMEDIATELY);responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}break;}如果开启了允许从从节点读取数据if(this。brokerController。getBrokerConfig()。isSlaveReadEnable()){如果建议从从节点拉消息if(getMessageResult。isSuggestPullingFromSlave()){获取从节点responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getWhichBrokerWhenConsumeSlowly());}else{获取指定的brokerresponseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getBrokerId());}}else{使用Master节点responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}}else{response。setCode(ResponseCode。SYSTEMERROR);response。setRemark(storegetMessagereturnnull);}}}是否建议从Slave节点拉取的设置
DefaultMessageStore的getMessage方法中用于获取消息内容,并会根据消费者的拉取进度判断是否建议下次从Slave节点拉取消息,判断过程如下:diff:当前CommitLog最大的偏移量减去本次拉取消息的最大物理偏移量,表示剩余未拉取的消息;memory:消息在PageCache中的总大小,计算方式是总物理内存消息存储在内存中的阀值(默认为40)100,也就是说MQ会缓存一部分消息在操作系统的PageCache中,加速访问;如果diff大于memory,表示未拉取的消息过多,已经超出了PageCache缓存的数据的大小,还需要从磁盘中获取消息,所以此时会建议下次从Slave节点拉取;publicclassDefaultMessageStoreimplementsMessageStore{publicGetMessageResultgetMessage(finalStringgroup,finalStringtopic,finalintqueueId,finallongoffset,finalintmaxMsgNums,finalMessageFiltermessageFilter){。。。当前CommitLog的最大偏移量finallongmaxOffsetPythis。commitLog。getMaxOffset();ConsumeQueueconsumeQueuefindConsumeQueue(topic,queueId);if(consumeQueue!null){minOffsetconsumeQueue。getMinOffsetInQueue();maxOffsetconsumeQueue。getMaxOffsetInQueue();if(maxOffset0){。。。}else{根据消费进度获取消息队列SelectMappedBufferResultbufferConsumeQueueconsumeQueue。getIndexBuffer(offset);if(bufferConsumeQueue!null){try{。。。CommitLog最大偏移量减去本次拉取消息的最大物理偏移量longdiffmaxOffsetPymaxPhyOffsetPulling;计算消息在PageCache中的总大小(总物理内存消息存储在内存中的阀值100)longmemory(long)(StoreUtil。TOTALPHYSICALMEMORYSIZE(this。messageStoreConfig。getAccessMessageInMemoryMaxRatio()100。0));是否建议下次去从节点拉取消息getResult。setSuggestPullingFromSlave(diffmemory);}finally{bufferConsumeQueue。release();}}else{。。。}}}else{statusGetMessageStatus。NOMATCHEDLOGICQUEUE;nextBeginOffsetnextOffsetCorrection(offset,0);}。。。returngetResult;}}
总结
消费者在启动后需要向Broker发送拉取消息的请求,Broker收到请求后会根据消息的拉取进度,返回一个建议的BrokerID,并设置到响应中返回,消费者处理响应时将建议的BrokerID放入pullFromWhichNodeTable,下次拉去消息的时候从pullFromWhichNodeTable中取出,并向其发送请求拉取消息。消费进度持久化
上面讲解了主从模式下如何选择从哪个Broker拉取消息,接下来看下消费进度的持久化,因为广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端,所以接下来以集群模式为例。
在【RocketMQ】消息的拉取一文中可知,集群模式下主要是通过RemoteBrokerOffsetStore进行消费进度管理的,在持久化方法persistAll中会调用updateConsumeOffsetToBroker更新Broker端的消费进度:publicclassRemoteBrokerOffsetStoreimplementsOffsetStore{OverridepublicvoidpersistAll(SetMessageQueuemqs){if(nullmqsmqs。isEmpty())return;finalHashSetMessageQueueunusedMQnewHashSetMessageQueue();for(Map。EntryMessageQueue,AtomicLongentry:this。offsetTable。entrySet()){MessageQueuemqentry。getKey();AtomicLongoffsetentry。getValue();if(offset!null){if(mqs。contains(mq)){try{向Broker发送请求更新消费进度this。updateConsumeOffsetToBroker(mq,offset。get());log。info(〔persistAll〕Group:{}ClientId:{}updateConsumeOffsetToBroker{}{},this。groupName,this。mQClientFactory。getClientId(),mq,offset。get());}catch(Exceptione){log。error(updateConsumeOffsetToBrokerexception,mq。toString(),e);}}else{unusedMQ。add(mq);}}}。。。}}
由于updateConsumeOffsetToBroker方法中先调用了findBrokerAddressInSubscribe方法获取Broker的信息,所以这里先看findBrokerAddressInSubscribe方法是如何选择Broker的,它需要传入三个参数,分别为:Broker名称、BrokerID、是否只查找参数中传入的那个BrokerID,方法的处理逻辑如下:首先从brokerAddrTable中根据Broker的名称获取所有的Broker集合(主从模式下他们的Broker名称一致,但是ID不一致),KEY为BrokerID,VALUE为Broker的地址;从Broker集合中根据参数中传入的ID获取broker地址;判断参数中传入的BrokerID是否是主节点,记录在slave变量中;判断获取的Broker地址是否为空,记录在found变量中;如果根据BrokerId获取的地址为空并且参数中传入的BrokerId为从节点,继续轮询获取下一个Broker,并判断地址是否为空;如果此时地址依旧为空并且onlyThisBroker传入的false(也就是不必须选择参数中传入的那个BrokerID),此时获取map集合中的第一个节点;判断获取到的Broker地址是否为空,不为空封装结果返回,否则返回NULL;publicclassMQClientInstance{publicFindBrokerResultfindBrokerAddressInSubscribe(finalStringbrokerName,Broker名称finallongbrokerId,BrokerIDfinalbooleanonlyThisBroker是否只查找参数中传入的那个BrokerID){StringbrokerAddrnull;booleanslavefalse;booleanfoundfalse;获取所有的BrokerIDHashMapLongbrokerId,Stringaddressmapthis。brokerAddrTable。get(brokerName);if(map!null!map。isEmpty()){brokerAddrmap。get(brokerId);是否是从节点slavebrokerId!MixAll。MASTERID;地址是否为空foundbrokerAddr!null;如果地址为空并且是从节点if(!foundslave){获取下一个BrokerbrokerAddrmap。get(brokerId1);foundbrokerAddr!null;}如果地址为空if(!found!onlyThisBroker){获取集合中的第一个节点EntryLong,Stringentrymap。entrySet()。iterator()。next();获取地址brokerAddrentry。getValue();是否是从节点slaveentry。getKey()!MixAll。MASTERID;置为truefoundtrue;}}if(found){返回数据returnnewFindBrokerResult(brokerAddr,slave,findBrokerVersion(brokerName,brokerAddr));}returnnull;}}
回到updateConsumeOffsetToBroker方法,先看第一次调用findBrokerAddressInSubscribe方法获取Broker信息,传入的三个参数分别为:Broker名称、Master节点的ID、true,根据上面讲解的findBrokerAddressInSubscribe方法里面的查找逻辑,如果查找到Master节点的信息,就正常返回,如果此时Master宕机未能正常查找到,由于传入的Master节点的ID并且onlyThisBroker置为true,所以会查找失败返回NULL。
如果第一次调用为空,会进行第二次调用,与第一次调用不同的地方是第三个参数置为了false,也就是说不是必须选择参数中指定的那个Broker,此时依旧优先查找Master节点,如果Master节点未查找到,由于onlyThisBroker置为了false,会迭代集合选择第一个节点返回,此时返回的有可能是从节点。
总结:消费者会优先选择向主节点发送请求进行消费进度保存,假如主节点宕机等原因未能获取到主节点的信息,会迭代集合选择第一个节点返回,所以消费者也可以向从节点发送请求进行进度保存,待主节点恢复后,依旧优先选择主节点。publicclassRemoteBrokerOffsetStoreimplementsOffsetStore{privatevoidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException{更新消费进度updateConsumeOffsetToBroker(mq,offset,true);}OverridepublicvoidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset,booleanisOneway)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException{第一次调用findBrokerAddressInSubscribe方法获取Broker信息,三个参数分别为:Broker名称、Master节点的ID、trueFindBrokerResultfindBrokerResultthis。mQClientFactory。findBrokerAddressInSubscribe(mq。getBrokerName(),MixAll。MASTERID,true);如果获取为空,进行第二次调用if(nullfindBrokerResult){三个参数分别为:Broker名称、Master节点的ID、falsethis。mQClientFactory。updateTopicRouteInfoFromNameServer(mq。getTopic());findBrokerResultthis。mQClientFactory。findBrokerAddressInSubscribe(mq。getBrokerName(),MixAll。MASTERID,false);}if(findBrokerResult!null){设置请求头UpdateConsumerOffsetRequestHeaderrequestHeadernewUpdateConsumerOffsetRequestHeader();requestHeader。setTopic(mq。getTopic());requestHeader。setConsumerGroup(this。groupName);requestHeader。setQueueId(mq。getQueueId());requestHeader。setCommitOffset(offset);发送保存消费进度的请求if(isOneway){this。mQClientFactory。getMQClientAPIImpl()。updateConsumerOffsetOneway(findBrokerResult。getBrokerAddr(),requestHeader,10005);}else{this。mQClientFactory。getMQClientAPIImpl()。updateConsumerOffset(findBrokerResult。getBrokerAddr(),requestHeader,10005);}}else{thrownewMQClientException(Thebroker〔mq。getBrokerName()〕notexist,null);}}}主从模式下的消费进度同步
BrokerController在构造函数中,实例化了SlaveSynchronize,并在start方法中调用了handleSlaveSynchronize方法处理从节点的数据同步,
如果当前的Broker是从节点,会注册定时任务,定时调用SlaveSynchronize的syncAll方法进行数据同步:publicclassBrokerController{privatefinalSlaveSynchronizeslaveSynchronize;publicBrokerController(finalBrokerConfigbrokerConfig,finalNettyServerConfignettyServerConfig,finalNettyClientConfignettyClientConfig,finalMessageStoreConfigmessageStoreConfig){。。。this。slaveSynchronizenewSlaveSynchronize(this);。。。}publicvoidstart()throwsException{if(!messageStoreConfig。isEnableDLegerCommitLog()){startProcessorByHa(messageStoreConfig。getBrokerRole());处理从节点的同步handleSlaveSynchronize(messageStoreConfig。getBrokerRole());this。registerBrokerAll(true,false,true);}}privatevoidhandleSlaveSynchronize(BrokerRolerole){如果是SLAVE节点if(roleBrokerRole。SLAVE){if(null!slaveSyncFuture){slaveSyncFuture。cancel(false);}this。slaveSynchronize。setMasterAddr(null);设置定时任务,定时进行数据同步slaveSyncFuturethis。scheduledExecutorService。scheduleAtFixedRate(newRunnable(){Overridepublicvoidrun(){try{同步数据BrokerController。this。slaveSynchronize。syncAll();}catch(Throwablee){log。error(ScheduledTaskSlaveSynchronizesyncAllerror。,e);}}},10003,100010,TimeUnit。MILLISECONDS);}else{handletheslavesynchroniseif(null!slaveSyncFuture){slaveSyncFuture。cancel(false);}this。slaveSynchronize。setMasterAddr(null);}}}
在SlaveSynchronize的syncAll方法中,又调用了syncConsumerOffset方法同步消费进度:向主节点发送请求获取消费进度数据;从节点将获取到的消费进度数据进行持久化;publicclassSlaveSynchronize{publicvoidsyncAll(){this。syncTopicConfig();同步消费进度this。syncConsumerOffset();this。syncDelayOffset();this。syncSubscriptionGroupConfig();}privatevoidsyncConsumerOffset(){StringmasterAddrBakthis。masterAddr;if(masterAddrBak!null!masterAddrBak。equals(brokerController。getBrokerAddr())){try{向主节点发送请求获取消费进度信息ConsumerOffsetSerializeWrapperoffsetWrapperthis。brokerController。getBrokerOuterAPI()。getAllConsumerOffset(masterAddrBak);设置数据this。brokerController。getConsumerOffsetManager()。getOffsetTable()。putAll(offsetWrapper。getOffsetTable());将获取到的消费进度数据进行持久化this。brokerController。getConsumerOffsetManager()。persist();log。info(Updateslaveconsumeroffsetfrommaster,{},masterAddrBak);}catch(Exceptione){log。error(SyncConsumerOffsetException,{},masterAddrBak,e);}}}}
参考
丁威、周继锋《RocketMQ技术内幕》