头条创作挑战赛一、前言 前文RocketMQ源码分析之核心磁盘数据结构CommitLog让我们知道这个CommitLog是干什么用的,就是broker会将消息写入本地磁盘的CommitLog文件中。 但是CommitLog采用的MasterSlave部署模式,提供了一定的高可用性。但这样的部署模式,有一定缺陷。比如故障转移方面,如果主节点挂了,还需要人为手动进行重启或者切换,无法自动将一个从节点转换为主节点。 所以RocketMQ通过DLedgerCommitLog来实现基于raft协议的commitlog存储库,也是RocketMQ实现新的高可用多副本架构的关键。二、源码分析DLedgerCommitLog初始化时机;DLedgerCommitLog成员变量;DLedgerCommitLog构造函数;加载所有磁盘文件mappedfile的数据;数据恢复;追加消息;消息查找;1、DLedgerCommitLog初始化时机 DefaultMessageStore构造时进行根据是否开启高可用来初始化DLedgerCommitLog还是commitLog; 上面这行代码也就是说如果开启了高可用的话默认初始化一个DLedgerCommitLog否则就初始化原始的commitLog,我们到这里就可以想到了,这个DLedgerCommitLog和原始的CommitLog相比肯定是多了往子节点同步的部分。2、DLedgerCommitLog成员变量Storeallmetadatadowntimeforrecovery,dataprotectionreliability他是commitlog的子类,他可以去继承我么的commitlog把数据写入到本地磁盘文件里去,以及flush这样的功能对于我们的数据恢复、以及数据保护可以去做一个多副本策略,高可用架构publicclassDLedgerCommitLogextendsCommitLog{开源dledger框架的高可用同步服务器组件privatefinalDLedgerServerdLedgerServer;开源dledger框架的配置组件privatefinalDLedgerConfigdLedgerConfig;开源dledger框架的mmap内存映射文件存储组件privatefinalDLedgerMmapFileStoredLedgerFileStore;开源dledger框架的mmap内存映射文件listprivatefinalMmapFileListdLedgerFileList;Theididentifiesthebrokerrole,0meansmaster,othersmeansslaveprivatefinalintid;消息序列器privatefinalMessageSerializermessageSerializer;用于记录消息追加的时耗(日志追加所持有锁时间)privatevolatilelongbeginTimeInDledgerLock0;Thisoffsetseparatetheoldcommitlogfromdledgercommitlog记录的旧Commitlog文件中的最大偏移量,如果访问的偏移量大于它,则访问Dledger管理的文件privatelongpidedCommitlogOffset1;是否正在恢复旧的Commitlog文件privatebooleanisInrecoveringOldCommitlogfalse;privatefinalStringBuildermsgIdBuildernewStringBuilder();} 我们可以看到DLedgerCommitLog实际上是继承了CommitLog的,那么DLedgerCommitLog的存储结构又是怎么样的呢,如何兼容CommitLog呢,其实我们根据上面的知识可以想到其实我们的主从高可用只是比普通模式的Log需要多记录一些term,channel等这些元数据信息: 看到这里我们能想到,我们只要把commitLog的原本信息放到body里不就可以兼容commitLog了,而且改动也不大,对于历史数据也能很好的兼容,rocketmq确实是这么做的。3、DLedgerCommitLog构造函数publicDLedgerCommitLog(finalDefaultMessageStoredefaultMessageStore){调用父类的构造函数也就是说开启了主从架构也会兼容历史的消息super(defaultMessageStore);dLedgerConfignewDLedgerConfig();是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为truedLedgerConfig。setEnableDiskForceClean(defaultMessageStore。getMessageStoreConfig()。isCleanFileForciblyEnable());DLedger存储类型,固定为基于文件的存储模式dLedgerConfig。setStoreType(DLedgerConfig。FILE);Leader节点的id名称,示例配置:n0,其配置要求第二个字符后必须是数字。dLedgerConfig。setSelfId(defaultMessageStore。getMessageStoreConfig()。getdLegerSelfId());DLegergroup的名称,建议与broker配置属性brokerName保持一致dLedgerConfig。setGroup(defaultMessageStore。getMessageStoreConfig()。getdLegerGroup());DLegerGroup中所有的节点信息,其配置示例n0127。0。0。1:40911;n1127。0。0。1:40912;n2127。0。0。1:40913。多个节点使用分号隔开。dLedgerConfig。setPeers(defaultMessageStore。getMessageStoreConfig()。getdLegerPeers());设置DLedger的日志文件的根目录,取自borker配件文件中的storePathRootDir,即RocketMQ的数据存储根路径。dLedgerConfig。setStoreBaseDir(defaultMessageStore。getMessageStoreConfig()。getStorePathRootDir());设置DLedger的单个日志文件的大小,取自Broker配置文件中的mapedFileSizeCommitLog,即与Commitlog文件的单个文件大小一致dLedgerConfig。setMappedFileSizeForEntryData(defaultMessageStore。getMessageStoreConfig()。getMappedFileSizeCommitLog());DLedger日志文件的删除时间,取自Broker配置文件中的deleteWhen,默认为凌晨4点dLedgerConfig。setDeleteWhen(defaultMessageStore。getMessageStoreConfig()。getDeleteWhen());DLedger日志文件保留时长,取自Broker配置文件中的fileReservedHours,默认为72hdLedgerConfig。setFileReservedHours(defaultMessageStore。getMessageStoreConfig()。getFileReservedTime()1);dLedgerConfig。setPreferredLeaderId(defaultMessageStore。getMessageStoreConfig()。getPreferredLeaderId());dLedgerConfig。setEnableBatchPush(defaultMessageStore。getMessageStoreConfig()。isEnableBatchPush());idInteger。parseInt(dLedgerConfig。getSelfId()。substring(1))1;初始化DledgerServer主要是进行主从复制以及选举使用dLedgerServernewDLedgerServer(dLedgerConfig);dLedgerFileStore(DLedgerMmapFileStore)dLedgerServer。getdLedgerStore();在dledger框架的存储层里加一个append钩子,追加数据之前需要什么定位到这条数据的一个位置然后加入进去这条数据在commitlog里面的全局物理offsetDLedgerMmapFileStore。AppendHookappendHook(entry,buffer,bodyOffset){我们上面说过其实当我们开启了主从同步之后我们追加消息的时候其实只有body是存储的原始的commitLog结构其他对于客户端都是无用的信息所以这里设置的追加消息的钩子函数就是为了返回body的offsetassertbodyOffsetDLedgerEntry。BODYOFFSET;buffer。position(buffer。position()bodyOffsetMessageDecoder。PHYPOSPOSITION);buffer。putLong(entry。getPos()bodyOffset);};dLedgerFileStore。addAppendHook(appendHook);dLedgerFileListdLedgerFileStore。getDataFileList();this。messageSerializernewMessageSerializer(defaultMessageStore。getMessageStoreConfig()。getMaxMessageSize());} 主要流程节点:调用父类的构造函数也就是说开启了主从架构也会兼容历史的消息构建配置文件类根据DledgerConfig构建DledgerServer主要负责主从日志同步以及选举设置追加消息的钩子函数4、加载所有磁盘文件mappedfile的数据 这里其实就是去加载commitLog中的信息为了进行历史消息的兼容publicbooleanload(){returnsuper。load();} 最后还是调用到了父类CommitLog中的load方法,其中mappedFileQueue的load方法,前文RocketMQ源码分析之映射文件队列MappedFileQueue有进行讲解;CommitLog里面数据都是在多个磁盘文件里的,每个磁盘文件都是一个MappedFile他应该是属于把所有的磁盘文件mappedfile的数据,从磁盘里load加载到映射内存区域里来publicbooleanload(){booleanresultthis。mappedFileQueue。load();log。info(loadcommitlog(result?OK:Failed));returnresult;}5、数据恢复加载commitLog以及index文件的wrotePosition,flushedPosition,committedPosition重要的指针;如果存在dLedgerFile则恢复返回;调用commitLog的recoverNormall()进行commitLog文件的恢复;如果不存在旧的commitLog直接结束文件日志的恢复流程;如果存在则尝试找到最后一个commitLog文件,如果没找到就停止;从最后一个文件的最后写入点尝试查找写入的魔数,如果存在魔数并且等CommitLog。BLANKMAGICCODE则无需写入魔数;初始化pidedCommitlogOffset,等于最后一个文件的起始偏移量加上文件的大小,即该指针指向最后一个文件的结束位置;将最后一个文件全部写满,其方法为设置消息体的大小以及魔数;设置最后一个文件的WrotePosition,CommittedPosition,FlushedPosition表示文件已经被写满;privatevoidrecover(longmaxPhyOffsetOfConsumeQueue){主要是加载commitLog以及index文件的wrotePosition,flushedPosition,committedPosition重要的指针dLedgerFileStore。load();if(dLedgerFileList。getMappedFiles()。size()0){如果存在dLedgerFile只需要恢复dLedgerFile即可存在dLedgerFile恢复dLedgerFiledLedgerFileStore。recover();设置pidedCommitlogOffset为dLedger文件的最小offset作为和老的commitLog的分割,小于这个offset需要访问老的commitLogpidedCommitlogOffsetdLedgerFileList。getFirstMappedFile()。getFileFromOffset();MappedFilemappedFilethis。mappedFileQueue。getLastMappedFile();if(mappedFile!null){如果存在旧的commitLog则禁止删除Dledger防止出现日志断层影响查询disableDeleteDledger();}最大物理offsetlongmaxPhyOffsetdLedgerFileList。getMaxWrotePosition();ClearConsumeQueueredundantdataif(maxPhyOffsetOfConsumeQueuemaxPhyOffset){log。warn(〔TruncateCQ〕maxPhyOffsetOfConsumeQueue({})processOffset({}),truncatedirtylogicfiles,maxPhyOffsetOfConsumeQueue,maxPhyOffset);this。defaultMessageStore。truncateDirtyLogicFiles(maxPhyOffset);}return;}Indicatethat,itisthefirsttimetoloadmixedcommitlog,needtorecovertheoldcommitlogisInrecoveringOldCommitlogtrue;Noneedtheabnormalrecover调用commitLog的recoverNormall()进行commitLog文件的恢复super。recoverNormally(maxPhyOffsetOfConsumeQueue);isInrecoveringOldCommitlogfalse;如果不存在旧的commitLog直接结束文件日志的恢复流程MappedFilemappedFilethis。mappedFileQueue。getLastMappedFile();不存在旧的commitLog直接返回if(mappedFilenull){return;}ByteBufferbyteBuffermappedFile。sliceByteBuffer();byteBuffer。position(mappedFile。getWrotePosition());booleanneedWriteMagicCodetrue;1TOTALSIZEbyteBuffer。getInt();sizeintmagicCodebyteBuffer。getInt();if(magicCodeCommitLog。BLANKMAGICCODE){needWriteMagicCodefalse;}else{log。info(Recoveroldcommitlogfoundaillegalmagiccode{},magicCode);}dLedgerConfig。setEnableDiskForceClean(false);pidedCommitlogOffsetmappedFile。getFileFromOffset()mappedFile。getFileSize();log。info(RecoveroldcommitlogneedWriteMagicCode{}pos{}file{}pidedCommitlogOffset{},needWriteMagicCode,mappedFile。getFileFromOffset()mappedFile。getWrotePosition(),mappedFile。getFileName(),pidedCommitlogOffset);if(needWriteMagicCode){byteBuffer。position(mappedFile。getWrotePosition());byteBuffer。putInt(mappedFile。getFileSize()mappedFile。getWrotePosition());byteBuffer。putInt(BLANKMAGICCODE);mappedFile。flush(0);}设置最后一个文件的WrotePosition,CommittedPosition,FlushedPosition表示文件已经被写满mappedFile。setWrotePosition(mappedFile。getFileSize());mappedFile。setCommittedPosition(mappedFile。getFileSize());mappedFile。setFlushedPosition(mappedFile。getFileSize());dLedgerFileList。getLastMappedFile(pidedCommitlogOffset);log。info(Willsettheinitialcommitlogoffset{}fordledger,pidedCommitlogOffset);}6、追加消息publicCompletableFuturePutMessageResultasyncPutMessage(MessageExtBrokerInnermsg){StoreStatsServicestoreStatsServicethis。defaultMessageStore。getStoreStatsService();finalinttranTypeMessageSysFlag。getTransactionValue(msg。getSysFlag());setMessageInfo(msg,tranType);finalStringfinalTopicmsg。getTopic();BacktoResultsAppendMessageResultappendResult;AppendFuturedledgerFuture;EncodeResultencodeResult;encodeResultthis。messageSerializer。serialize(msg);if(encodeResult。status!AppendMessageStatus。PUTOK){returnCompletableFuture。completedFuture(newPutMessageResult(PutMessageStatus。MESSAGEILLEGAL,newAppendMessageResult(encodeResult。status)));}putMessageLock。lock();spinorReentrantLock,dependingonstoreconfiglongelapsedTimeInLock;longqueueOffset;try{beginTimeInDledgerLockthis。defaultMessageStore。getSystemClock()。now();queueOffsetgetQueueOffsetByKey(encodeResult。queueOffsetKey,tranType);encodeResult。setQueueOffsetKey(queueOffset,false);追加消息的时候不再写入之前的commitLog而是调用dlegerserver的handleAppend进行日志的写入子节点日志的复制(后面会详细讲解)只有超过半数以上的节点复制成功才会返回成功如果追加成功则会返回追加成功的起始偏移量即pos属性类似于commitLog中的物理偏移量AppendEntryRequestrequestnewAppendEntryRequest();request。setGroup(dLedgerConfig。getGroup());request。setRemoteId(dLedgerServer。getMemberState()。getSelfId());request。setBody(encodeResult。getData());dledgerFuture(AppendFuture)dLedgerServer。handleAppend(request);if(dledgerFuture。getPos()1){returnCompletableFuture。completedFuture(newPutMessageResult(PutMessageStatus。OSPAGECACHEBUSY,newAppendMessageResult(AppendMessageStatus。UNKNOWNERROR)));}根据dledger的起始偏移量计算真正的消息的存储offset。longwroteOffsetdledgerFuture。getPos()DLedgerEntry。BODYOFFSET;intmsgIdLength(msg。getSysFlag()MessageSysFlag。STOREHOSTADDRESSV6FLAG)0?448:1648;ByteBufferbufferByteBuffer。allocate(msgIdLength);StringmsgIdMessageDecoder。createMessageId(buffer,msg。getStoreHostBytes(),wroteOffset);elapsedTimeInLockthis。defaultMessageStore。getSystemClock()。now()beginTimeInDledgerLock;appendResultnewAppendMessageResult(AppendMessageStatus。PUTOK,wroteOffset,encodeResult。getData()。length,msgId,System。currentTimeMillis(),queueOffset,elapsedTimeInLock);switch(tranType){caseMessageSysFlag。TRANSACTIONPREPAREDTYPE:caseMessageSysFlag。TRANSACTIONROLLBACKTYPE:break;caseMessageSysFlag。TRANSACTIONNOTTYPE:caseMessageSysFlag。TRANSACTIONCOMMITTYPE:ThenextupdateConsumeQueueinformationDLedgerCommitLog。this。topicQueueTable。put(encodeResult。queueOffsetKey,queueOffset1);break;default:break;}}catch(Exceptione){log。error(Putmessageerror,e);returnCompletableFuture。completedFuture(newPutMessageResult(PutMessageStatus。UNKNOWNERROR,newAppendMessageResult(AppendMessageStatus。UNKNOWNERROR)));}finally{beginTimeInDledgerLock0;putMessageLock。unlock();}if(elapsedTimeInLock500){log。warn(〔NOTIFYME〕putMessageinlockcosttime(ms){},bodyLength{}AppendMessageResult{},elapsedTimeInLock,msg。getBody()。length,appendResult);}returndledgerFuture。thenApply(appendEntryResponse{PutMessageStatusputMessageStatusPutMessageStatus。UNKNOWNERROR;switch(DLedgerResponseCode。valueOf(appendEntryResponse。getCode())){caseSUCCESS:putMessageStatusPutMessageStatus。PUTOK;break;caseINCONSISTENTLEADER:caseNOTLEADER:caseLEADERNOTREADY:caseDISKFULL:putMessageStatusPutMessageStatus。SERVICENOTAVAILABLE;break;caseWAITQUORUMACKTIMEOUT:Donotreturnflushslavetimeouttotheclient,fortheonsclientwillignoreit。putMessageStatusPutMessageStatus。OSPAGECACHEBUSY;break;caseLEADERPENDINGFULL:putMessageStatusPutMessageStatus。OSPAGECACHEBUSY;break;}PutMessageResultputMessageResultnewPutMessageResult(putMessageStatus,appendResult);if(putMessageStatusPutMessageStatus。PUTOK){StatisticsstoreStatsService。getSinglePutMessageTopicTimesTotal(finalTopic)。add(1);storeStatsService。getSinglePutMessageTopicSizeTotal(msg。getTopic())。add(appendResult。getWroteBytes());}returnputMessageResult;});}7、消息查找 消息的查找起始和原来还是没有什么区别的,还是使用二分查找法通过offset获取mappedFile文件,只是多了一个pidedCommitlogOffset的判断是否是老数据,如果是老数据直接走commitLog,新数据就走Dledger维护的文件列表;publicSelectMappedBufferResultgetMessage(finallongoffset,finalintsize){如果是小于pidedCommitlogOffset证明是旧数据从commitLog获取if(offsetpidedCommitlogOffset){returnsuper。getMessage(offset,size);}从dledger获取intmappedFileSizethis。dLedgerServer。getdLedgerConfig()。getMappedFileSizeForEntryData();MmapFilemappedFilethis。dLedgerFileList。findMappedFileByOffset(offset,offset0);if(mappedFile!null){intpos(int)(offsetmappedFileSize);获取文件并转换为DLedgerSelectMappedBufferResult类型returnconvertSbr(mappedFile。selectMappedBuffer(pos,size));}returnnull;}三、总结DLedger在整合时,使用DLedger条目包裹RocketMQ中的CommitLog条目,即在DLedger条目的body字段来存储整条CommitLog条目;引入pidedCommitlogOffset变量,表示物理偏移量小于该值的消息存在于旧的CommitLog文件中,实现升级DLedger集群后能访问到旧的数据;新DLedger集群启动后,会将最后一个CommitLog填充,即新的数据不会再写入到原先的CommitLog文件;消息追加到DLedger数据日志文件中,返回的偏移量不是DLedger条目的起始偏移量,而是DLedger条目中body字段的起始偏移量,即真实消息的起始偏移量,保证消息物理偏移量的语义与RocketMQCommitlog一样;