from:cnblogs。comshanmlp16428961。htmBroker对消息的处理 BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:publicclassBrokerController{初始化publicbooleaninitialize()throwsCloneNotSupportedException{。。。注册处理器this。registerProcessor();。。。}注册处理器publicvoidregisterProcessor(){发送消息处理器SendMessageProcessorsendProcessornewSendMessageProcessor(this);。。。注册消息发送处理器this。remotingServer。registerProcessor(RequestCode。SENDMESSAGE,sendProcessor,this。sendMessageExecutor);this。remotingServer。registerProcessor(RequestCode。SENDMESSAGEV2,sendProcessor,this。sendMessageExecutor);省略其他注册。。。}} 在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessor的processRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:publicclassSendMessageProcessorextendsAbstractSendMessageProcessorimplementsNettyRequestProcessor{处理请求OverridepublicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{RemotingCommandresponsenull;try{处理请求responseasyncProcessRequest(ctx,request)。get();}catch(InterruptedExceptionExecutionExceptione){log。error(processSendMessageerror,request:request。toString(),e);}returnresponse;}异步处理请求publicCompletableFutureRemotingCommandasyncProcessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalSendMessageContextmqtraceContext;switch(request。getCode()){caseRequestCode。CONSUMERSENDMSGBACK:returnthis。asyncConsumerSendMsgBack(ctx,request);default:解析请求头SendMessageRequestHeaderrequestHeaderparseRequestHeader(request);。。。if(requestHeader。isBatch()){批量消息发送处理returnthis。asyncSendBatchMessage(ctx,request,mqtraceContext,requestHeader);}else{单个消息发送处理returnthis。asyncSendMessage(ctx,request,mqtraceContext,requestHeader);}}}单个消息发送处理privateCompletableFutureRemotingCommandasyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){。。。CompletableFuturePutMessageResultputMessageResultnull;StringtransFlagorigProps。get(MessageConst。PROPERTYTRANSACTIONPREPARED);是否使用事务if(transFlag!nullBoolean。parseBoolean(transFlag)){if(this。brokerController。getBrokerConfig()。isRejectTransactionMessage()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thebroker〔this。brokerController。getBrokerConfig()。getBrokerIP1()〕sendingtransactionmessageisforbidden);returnCompletableFuture。completedFuture(response);}事务处理putMessageResultthis。brokerController。getTransactionalMessageService()。asyncPrepareMessage(msgInner);}else{消息持久化putMessageResultthis。brokerController。getMessageStore()。asyncPutMessage(msgInner);}returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}} 以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:创建MessageExtBrokerInner对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中判断是否使用了事务,如果未使用事务调用brokerController的getMessageStore方法获取MessageStore对象,然后调用asyncPutMessage方法对消息进行持久化存储返回消息的存储结果publicclassSendMessageProcessorextendsAbstractSendMessageProcessorimplementsNettyRequestProcessor{单个消息发送处理privateCompletableFutureRemotingCommandasyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){。。。创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息MessageExtBrokerInnermsgInnernewMessageExtBrokerInner();设置主题msgInner。setTopic(requestHeader。getTopic());设置消息所在的队列IDmsgInner。setQueueId(queueIdInt);if(!handleRetryAndDLQ(requestHeader,response,request,msgInner,topicConfig)){returnCompletableFuture。completedFuture(response);}设置消息内容msgInner。setBody(body);msgInner。setFlag(requestHeader。getFlag());设置属性MapString,StringorigPropsMessageDecoder。string2messageProperties(requestHeader。getProperties());MessageAccessor。setProperties(msgInner,origProps);设置发送消息时间msgInner。setBornTimestamp(requestHeader。getBornTimestamp());设置发送消息的主机地址msgInner。setBornHost(ctx。channel()。remoteAddress());设置存储消息的主机地址msgInner。setStoreHost(this。getStoreHost());msgInner。setReconsumeTimes(requestHeader。getReconsumeTimes()null?0:requestHeader。getReconsumeTimes());StringclusterNamethis。brokerController。getBrokerConfig()。getBrokerClusterName();属性中添加集群名称MessageAccessor。putProperty(msgInner,MessageConst。PROPERTYCLUSTER,clusterName);如果属性中包含PROPERTYWAITSTOREMSGOKif(origProps。containsKey(MessageConst。PROPERTYWAITSTOREMSGOK)){StringwaitStoreMsgOKValueorigProps。remove(MessageConst。PROPERTYWAITSTOREMSGOK);设置消息属性msgInner。setPropertiesString(MessageDecoder。messageProperties2String(msgInner。getProperties()));origProps。put(MessageConst。PROPERTYWAITSTOREMSGOK,waitStoreMsgOKValue);}else{msgInner。setPropertiesString(MessageDecoder。messageProperties2String(msgInner。getProperties()));}CompletableFuturePutMessageResultputMessageResultnull;StringtransFlagorigProps。get(MessageConst。PROPERTYTRANSACTIONPREPARED);是否使用事务if(transFlag!nullBoolean。parseBoolean(transFlag)){if(this。brokerController。getBrokerConfig()。isRejectTransactionMessage()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thebroker〔this。brokerController。getBrokerConfig()。getBrokerIP1()〕sendingtransactionmessageisforbidden);returnCompletableFuture。completedFuture(response);}事务处理putMessageResultthis。brokerController。getTransactionalMessageService()。asyncPrepareMessage(msgInner);}else{消息写入putMessageResultthis。brokerController。getMessageStore()。asyncPutMessage(msgInner);}返回消息持久化结果returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}} MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:publicclassBrokerController{privateMessageStoremessageStore;publicbooleaninitialize()throwsCloneNotSupportedException{booleanresultthis。topicConfigManager。load();。。。if(result){try{创建DefaultMessageStorethis。messageStorenewDefaultMessageStore(this。messageStoreConfig,this。brokerStatsManager,this。messageArrivingListener,this。brokerConfig);。。。}catch(IOExceptione){resultfalse;log。error(Failedtoinitialize,e);}}获取MessageStorepublicMessageStoregetMessageStore(){returnmessageStore;}}消息存储 DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog,DLedgerCommitLog是CommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。 在DefaultMessageStore的asyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog的asyncPutMessage进行消息写入:publicclassDefaultMessageStoreimplementsMessageStore{privatefinalCommitLogcommitLog;CommitLogpublicDefaultMessageStore(finalMessageStoreConfigmessageStoreConfig,finalBrokerStatsManagerbrokerStatsManager,finalMessageArrivingListenermessageArrivingListener,finalBrokerConfigbrokerConfig)throwsIOException{。。。如果启用了Dlegerif(messageStoreConfig。isEnableDLegerCommitLog()){使用DLedgerCommitLogthis。commitLognewDLedgerCommitLog(this);}else{否则使用CommitLogthis。commitLognewCommitLog(this);}。。。}OverridepublicCompletableFuturePutMessageResultasyncPutMessage(MessageExtBrokerInnermsg){校验存储状态PutMessageStatuscheckStoreStatusthis。checkStoreStatus();if(checkStoreStatus!PutMessageStatus。PUTOK){returnCompletableFuture。completedFuture(newPutMessageResult(checkStoreStatus,null));}校验消息合法性PutMessageStatusmsgCheckStatusthis。checkMessage(msg);if(msgCheckStatusPutMessageStatus。MESSAGEILLEGAL){returnCompletableFuture。completedFuture(newPutMessageResult(msgCheckStatus,null));}进行一系列校验PutMessageStatuslmqMsgCheckStatusthis。checkLmqMessage(msg);if(msgCheckStatusPutMessageStatus。LMQCONSUMEQUEUENUMEXCEEDED){returnCompletableFuture。completedFuture(newPutMessageResult(lmqMsgCheckStatus,null));}longbeginTimethis。getSystemClock()。now();调用CommitLog的asyncPutMessage方法写入消息CompletableFuturePutMessageResultputResultFuturethis。commitLog。asyncPutMessage(msg);putResultFuture。thenAccept((result){longelapsedTimethis。getSystemClock()。now()beginTime;if(elapsedTime500){log。warn(putMessagenotinlockelapsedtime(ms){},bodyLength{},elapsedTime,msg。getBody()。length);}this。storeStatsService。setPutMessageEntireTimeMax(elapsedTime);if(nullresult!result。isOk()){this。storeStatsService。getPutMessageFailedTimes()。add(1);}});returnputResultFuture;}}合法性校验Broker存储检查 checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:MessageStore是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储Broker是否是从节点,从节点只能读不能写Broker是否有写权限,如果没有写入权限,不能进行写入操作操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作privatePutMessageStatuscheckStoreStatus(){是否处于停止状态if(this。shutdown){log。warn(messagestorehasshutdown,soputMessageisforbidden);returnPutMessageStatus。SERVICENOTAVAILABLE;}是否SLAVE角色if(BrokerRole。SLAVEthis。messageStoreConfig。getBrokerRole()){longvaluethis。printTimes。getAndIncrement();if((value50000)0){log。warn(brokeroleisslave,soputMessageisforbidden);}returnPutMessageStatus。SERVICENOTAVAILABLE;}是否可写if(!this。runningFlags。isWriteable()){longvaluethis。printTimes。getAndIncrement();if((value50000)0){log。warn(themessagestoreisnotwritable。Itmaybecausedbyoneofthefollowingreasons:thebrokersdiskisfull,writetologicqueueerror,writetoindexfileerror,etc);}returnPutMessageStatus。SERVICENOTAVAILABLE;}else{this。printTimes。set(0);}操作系统是否处于PAGECACHE繁忙状态if(this。isOSPageCacheBusy()){returnPutMessageStatus。OSPAGECACHEBUSY;}returnPutMessageStatus。PUTOK;}消息长度检查 checkMessage方法主要是对主题的长度校验和消息属性的长度校验:privatePutMessageStatuscheckMessage(MessageExtBrokerInnermsg){如果主题的长度大于最大值if(msg。getTopic()。length()Byte。MAXVALUE){log。warn(putMessagemessagetopiclengthtoolongmsg。getTopic()。length());returnPutMessageStatus。MESSAGEILLEGAL;}如果消息属性长度大于最大值if(msg。getPropertiesString()!nullmsg。getPropertiesString()。length()Short。MAXVALUE){log。warn(putMessagemessagepropertieslengthtoolongmsg。getPropertiesString()。length());returnPutMessageStatus。MESSAGEILLEGAL;}returnPutMessageStatus。PUTOK;}checkLmqMessage checkLmqMessage主要判断在开启LMQ(LightMessageQueue)时是否超过了最大消费数量:privatePutMessageStatuscheckLmqMessage(MessageExtBrokerInnermsg){如果消息属性不为空、存在PROPERTYINNERMULTIDISPATCH属性、并且超过了最大消费数量if(msg。getProperties()!nullStringUtils。isNotBlank(msg。getProperty(MessageConst。PROPERTYINNERMULTIDISPATCH))this。isLmqConsumeQueueNumExceeded()){returnPutMessageStatus。LMQCONSUMEQUEUENUMEXCEEDED;}returnPutMessageStatus。PUTOK;}privatebooleanisLmqConsumeQueueNumExceeded(){开启了LMQ开启了多个队列分发消费数量大于了限定值if(this。getMessageStoreConfig()。isEnableLmq()this。getMessageStoreConfig()。isEnableMultiDispatch()this。lmqConsumeQueueNum。get()this。messageStoreConfig。getMaxLmqConsumeQueueNum()){returntrue;}returnfalse;}消息写入 对消息进行校验完毕之后,调用了CommitLog的asyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:首先对消息的相关属性进行了设置,主要包括以下内容存储时间消息内容的CRC校验和如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识获取当前线程绑定的PutMessageThreadLocal对象,里面有一个MessageExtEncoder类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner的setEncodedBuff方法将buffer设置到encodedBuff中加锁,从mappedFileQueue中获取上一次使用的映射文件mappedFile,并更新消息的存储时间,如果mappedFile为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1GmappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录调用mappedFile的appendMessage方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中调用submitFlushRequest方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘publicclassCommitLog{所有mappedFile集合protectedfinalMappedFileQueuemappedFileQueue;ThreadLocalprivatefinalThreadLocalPutMessageThreadLocalputMessageThreadLocal;写入消息的回调函数privatefinalAppendMessageCallbackappendMessageCallback;publicCommitLog(finalDefaultMessageStoredefaultMessageStore){构造函数。。。创建回调函数this。appendMessageCallbacknewDefaultAppendMessageCallback(defaultMessageStore。getMessageStoreConfig()。getMaxMessageSize());。。。}publicCompletableFuturePutMessageResultasyncPutMessage(finalMessageExtBrokerInnermsg){设置存储时间msg。setStoreTimestamp(System。currentTimeMillis());设置消息的CRC值msg。setBodyCRC(UtilAll。crc32(msg。getBody()));写入结果AppendMessageResultresultnull;获取存储统计服务StoreStatsServicestoreStatsServicethis。defaultMessageStore。getStoreStatsService();获取主题Stringtopicmsg。getTopic();获取事务类型finalinttranTypeMessageSysFlag。getTransactionValue(msg。getSysFlag());if(tranTypeMessageSysFlag。TRANSACTIONNOTTYPEtranTypeMessageSysFlag。TRANSACTIONCOMMITTYPE){省略事务相关处理}获取发送消息的主机地址InetSocketAddressbornSocketAddress(InetSocketAddress)msg。getBornHost();if(bornSocketAddress。getAddress()instanceofInet6Address){如果是IPV6msg。setBornHostV6Flag();设置IPV6标识}获取存储消息的主机地址InetSocketAddressstoreSocketAddress(InetSocketAddress)msg。getStoreHost();if(storeSocketAddress。getAddress()instanceofInet6Address){msg。setStoreHostAddressV6Flag();设置IPV6标识}获取当前线程绑定的PutMessageThreadLocal对象PutMessageThreadLocalputMessageThreadLocalthis。putMessageThreadLocal。get();调用encode方法对消息进行编码,并写入bufferPutMessageResultencodeResultputMessageThreadLocal。getEncoder()。encode(msg);if(encodeResult!null){returnCompletableFuture。completedFuture(encodeResult);}将存储编码消息的buffer设置到msg中msg。setEncodedBuff(putMessageThreadLocal。getEncoder()。encoderBuffer);创建PutMessageContextPutMessageContextputMessageContextnewPutMessageContext(generateKey(putMessageThreadLocal。getKeyBuilder(),msg));longelapsedTimeInLock0;MappedFileunlockMappedFilenull;加锁putMessageLock。lock();try{获取上一次写入的文件MappedFilemappedFilethis。mappedFileQueue。getLastMappedFile();获取系统时间戳longbeginLockTimestampthis。defaultMessageStore。getSystemClock()。now();this。beginTimeInLockbeginLockTimestamp;再次更新存储时间戳,保证全局顺序msg。setStoreTimestamp(beginLockTimestamp);如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件if(nullmappedFilemappedFile。isFull()){使用偏移量0创建一个新的文件mappedFilethis。mappedFileQueue。getLastMappedFile(0);}如果依旧为空if(nullmappedFile){提示错误log。error(createmappedfile1error,topic:msg。getTopic()clientAddr:msg。getBornHostString());returnCompletableFuture。completedFuture(newPutMessageResult(PutMessageStatus。CREATEMAPEDFILEFAILED,null));}写入消息resultmappedFile。appendMessage(msg,this。appendMessageCallback,putMessageContext);。。。elapsedTimeInLockthis。defaultMessageStore。getSystemClock()。now()beginLockTimestamp;}finally{beginTimeInLock0;putMessageLock。unlock();}。。。PutMessageResultputMessageResultnewPutMessageResult(PutMessageStatus。PUTOK,result);统计相关storeStatsService。getSinglePutMessageTopicTimesTotal(msg。getTopic())。add(1);storeStatsService。getSinglePutMessageTopicSizeTotal(topic)。add(result。getWroteBytes());执行刷盘CompletableFuturePutMessageStatusflushResultFuturesubmitFlushRequest(result,msg);CompletableFuturePutMessageStatusreplicaResultFuturesubmitReplicaRequest(result,msg);returnflushResultFuture。thenCombine(replicaResultFuture,(flushStatus,replicaStatus){if(flushStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(flushStatus);}if(replicaStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(replicaStatus);}返回结果returnputMessageResult;});}}写入内存Buffer编码消息 MessageExtEncoder是CommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:publicclassCommitLog{ThreadLocalprivatefinalThreadLocalPutMessageThreadLocalputMessageThreadLocal;添加消息的ThreadLocal对象staticclassPutMessageThreadLocal{privateMessageExtEncoderencoder;引用MessageExtEncoderprivateStringBuilderkeyBuilder;PutMessageThreadLocal(intsize){创建MessageExtEncoder,size用来指定分配内存的大小encodernewMessageExtEncoder(size);keyBuildernewStringBuilder();}。。。}} MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区:对消息属性数据的长度进行校验判断是否超过限定值对总消息内容长度进行校验,判断是否超过最大的长度限制根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域将消息相关信息写入buffer:写入消息长度写入魔数写入消息体CRC校验和写入队列ID写入标识队列的偏移量,需要注意这里还没达到偏移量的值,先占位稍后写入文件的物理偏移量,先占位稍后写入写入系统标识写入发送消息的时间戳写入发送消息的主机地址写入存储时间戳写入存储消息的主机地址RECONSUMETIMESPreparedTransactionOffset写入消息体长度和消息内容写入主题长度写入主题写入属性长度和属性内容publicclassCommitLog{MessageExtEncoderpublicstaticclassMessageExtEncoder{字节缓冲区,存储消息内容的bufferprivatefinalByteBufferencoderBuffer;MessageExtEncoder(finalintsize){分配内存this。encoderBufferByteBuffer。allocateDirect(size);this。maxMessageSizesize;}对消息进行编码并写入bufferprotectedPutMessageResultencode(MessageExtBrokerInnermsgInner){消息属性数据finalbyte〔〕propertiesDatamsgInner。getPropertiesString()null?null:msgInner。getPropertiesString()。getBytes(MessageDecoder。CHARSETUTF8);属性数据长度finalintpropertiesLengthpropertiesDatanull?0:propertiesData。length;校验长度是否超过最大值if(propertiesLengthShort。MAXVALUE){log。warn(putMessagemessagepropertieslengthtoolong。length{},propertiesData。length);returnnewPutMessageResult(PutMessageStatus。PROPERTIESSIZEEXCEEDED,null);}获取主题数据finalbyte〔〕topicDatamsgInner。getTopic()。getBytes(MessageDecoder。CHARSETUTF8);finalinttopicLengthtopicData。length;主题数据长度获取消息体内容长度finalintbodyLengthmsgInner。getBody()null?0:msgInner。getBody()。length;总消息内容长度finalintmsgLencalMsgLength(msgInner。getSysFlag(),bodyLength,topicLength,propertiesLength);是否超过最大长度限制if(msgLenthis。maxMessageSize){CommitLog。log。warn(messagesizeexceeded,msgtotalsize:msgLen,msgbodysize:bodyLength,maxMessageSize:this。maxMessageSize);returnnewPutMessageResult(PutMessageStatus。MESSAGEILLEGAL,null);}初始化this。resetByteBuffer(encoderBuffer,msgLen);1写入消息长度this。encoderBuffer。putInt(msgLen);2写入魔数this。encoderBuffer。putInt(CommitLog。MESSAGEMAGICCODE);3写入消息体CRC校验和this。encoderBuffer。putInt(msgInner。getBodyCRC());4写入队列IDthis。encoderBuffer。putInt(msgInner。getQueueId());5写入标识this。encoderBuffer。putInt(msgInner。getFlag());6队列的偏移量,稍后写入this。encoderBuffer。putLong(0);7文件的物理偏移量,稍后写入this。encoderBuffer。putLong(0);8写入系统标识this。encoderBuffer。putInt(msgInner。getSysFlag());9写入发送消息的时间戳this。encoderBuffer。putLong(msgInner。getBornTimestamp());10写入发送消息的主机地址socketAddress2ByteBuffer(msgInner。getBornHost(),this。encoderBuffer);11写入存储时间戳this。encoderBuffer。putLong(msgInner。getStoreTimestamp());12写入存储消息的主机地址socketAddress2ByteBuffer(msgInner。getStoreHost(),this。encoderBuffer);13RECONSUMETIMESthis。encoderBuffer。putInt(msgInner。getReconsumeTimes());14PreparedTransactionOffsetthis。encoderBuffer。putLong(msgInner。getPreparedTransactionOffset());15写入消息体长度this。encoderBuffer。putInt(bodyLength);if(bodyLength0)this。encoderBuffer。put(msgInner。getBody());写入消息内容16写入主题长度this。encoderBuffer。put((byte)topicLength);写入主题this。encoderBuffer。put(topicData);17写入属性长度this。encoderBuffer。putShort((short)propertiesLength);if(propertiesLength0)this。encoderBuffer。put(propertiesData);写入属性数据encoderBuffer。flip();returnnull;}}}写入内存映射文件 前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下: ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。 MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。 MappedFile提供了两种方式来进行内容的写入,对应不同的init方法: 第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。 第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。 综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。publicclassMappedFileextendsReferenceResource{记录文件的写入位置protectedfinalAtomicIntegerwrotePositionnewAtomicInteger(0);文件大小protectedintfileSize;字节bufferprotectedByteBufferwriteBuffernull;文件映射privateMappedByteBuffermappedByteBuffer;暂存池,类似线程池,只不过池中存放的是申请的内存protectedTransientStorePooltransientStorePoolnull;初始化publicvoidinit(finalStringfileName,finalintfileSize,finalTransientStorePooltransientStorePool)throwsIOException{init(fileName,fileSize);从暂存池中获取一块内存this。writeBuffertransientStorePool。borrowBuffer();this。transientStorePooltransientStorePool;}初始化privatevoidinit(finalStringfileName,finalintfileSize)throwsIOException{。。。try{获取文件this。fileChannelnewRandomAccessFile(this。file,rw)。getChannel();进行文件映射this。mappedByteBufferthis。fileChannel。map(MapMode。READWRITE,0,fileSize);TOTALMAPPEDVIRTUALMEMORY。addAndGet(fileSize);TOTALMAPPEDFILES。incrementAndGet();oktrue;}catch(FileNotFoundExceptione){。。。}catch(IOExceptione){。。。}finally{if(!okthis。fileChannel!null){this。fileChannel。close();}}}}暂存池publicclassTransientStorePool{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);privatefinalintpoolSize;暂存池大小privatefinalintfileSize;申请的每一块内存大小privatefinalDequeByteBufferavailableBuffers;双端队列,存放申请的内存privatefinalMessageStoreConfigstoreConfig;存储配置publicTransientStorePool(finalMessageStoreConfigstoreConfig){this。storeConfigstoreConfig;this。poolSizestoreConfig。getTransientStorePoolSize();this。fileSizestoreConfig。getMappedFileSizeCommitLog();this。availableBuffersnewConcurrentLinkedDeque();}初始化publicvoidinit(){根据暂存池大小申请内存for(inti0;ipoolSize;i){申请直接内存ByteBufferbyteBufferByteBuffer。allocateDirect(fileSize);finallongaddress((DirectBuffer)byteBuffer)。address();PointerpointernewPointer(address);LibC。INSTANCE。mlock(pointer,newNativeLong(fileSize));放入到暂存池中availableBuffers。offer(byteBuffer);}}} 经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:MappedFile中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的slice方法创建一个与MappedFile共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中调用回调函数的doAppend方法进行写入,前面可知回调函数是DefaultAppendMessageCallback类型的更新MappedFile写入位置,返回写入结果publicclassMappedFileextendsReferenceResource{记录文件的写入位置protectedfinalAtomicIntegerwrotePositionnewAtomicInteger(0);文件大小protectedintfileSize;字节bufferprotectedByteBufferwriteBuffernull;文件映射privateMappedByteBuffermappedByteBuffer;写入消息publicAppendMessageResultappendMessage(finalMessageExtBrokerInnermsg,finalAppendMessageCallbackcb,PutMessageContextputMessageContext){调用appendMessagesInnerreturnappendMessagesInner(msg,cb,putMessageContext);}publicAppendMessageResultappendMessagesInner(finalMessageExtmessageExt,finalAppendMessageCallbackcb,PutMessageContextputMessageContext){assertmessageExt!null;assertcb!null;获取写入位置intcurrentPosthis。wrotePosition。get();如果写指针小于文件大小if(currentPosthis。fileSize){如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBufferByteBufferbyteBufferwriteBuffer!null?writeBuffer。slice():this。mappedByteBuffer。slice();设置共享内存区的写入位置byteBuffer。position(currentPos);AppendMessageResultresult;if(messageExtinstanceofMessageExtBrokerInner){单个消息处理通过共享内存区byteBuffer写入数据resultcb。doAppend(this。getFileFromOffset(),byteBuffer,this。fileSizecurrentPos,(MessageExtBrokerInner)messageExt,putMessageContext);}elseif(messageExtinstanceofMessageExtBatch){批量消息通过共享内存区byteBuffer写入数据resultcb。doAppend(this。getFileFromOffset(),byteBuffer,this。fileSizecurrentPos,(MessageExtBatch)messageExt,putMessageContext);}else{returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}更新MappedFile的写入位置this。wrotePosition。addAndGet(result。getWroteBytes());this。storeTimestampresult。getStoreTimestamp();returnresult;}log。error(MappedFile。appendMessagereturnnull,wrotePosition:{}fileSize:{},currentPos,this。fileSize);returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}} 进入到DefaultAppendMessageCallback的doAppend方法中,首先来看方法的入参:fileFromOffset:文件的起始位置偏移量byteBuffer:缓冲区,也就是上一步中创建的共享内存区maxBlank:上一步中可知传入的是文件总大小减去当前要写入的位置,也就是文件剩余空间大小msgInner:消息内容的封装体putMessageContext:消息写入上下文 方法的处理逻辑如下:计算文件要写入位置偏移量:文件起始位置偏移量准备写入位置的偏移量从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中从msgInner中获取之前已经写入到内存的消息数据preEncodeBuffer,并获取消息内容的长度校验是否有足够的空间写入数据,如果消息长度ENDFILEMINBLANKLENGTH(预留空间大小)大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为ENDOFFILE(超过文件大小)。这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。计算队列偏移量在preEncodeBuffer中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。 消息写入结果PUTOK:写入成功;ENDOFFILE:超过文件大小;MESSAGESIZEEXCEEDED:消息长度超过最大允许长度:PROPERTIESSIZEEXCEEDED:消息、属性超过最大允许长度;UNKNOWNERROR:未知异常;publicclassCommitLog{classDefaultAppendMessageCallbackimplementsAppendMessageCallback{预留空间大小,8个字节privatestaticfinalintENDFILEMINBLANKLENGTH44;publicAppendMessageResultdoAppend(finallongfileFromOffset,finalByteBufferbyteBuffer,finalintmaxBlank,finalMessageExtBrokerInnermsgInner,PutMessageContextputMessageContext){计算写入位置物理偏移量:文件起始位置准备写入位置的偏移量longwroteOffsetfileFromOffsetbyteBuffer。position();SupplierStringmsgIdSupplier(){intsysflagmsgInner。getSysFlag();intmsgIdLen(sysflagMessageSysFlag。STOREHOSTADDRESSV6FLAG)0?448:1648;ByteBuffermsgIdBufferByteBuffer。allocate(msgIdLen);MessageExt。socketAddress2ByteBuffer(msgInner。getStoreHost(),msgIdBuffer);msgIdBuffer。clear();becausesocketAddress2ByteBufferflipthebuffermsgIdBuffer。putLong(msgIdLen8,wroteOffset);returnUtilAll。bytes2string(msgIdBuffer。array());};获取消息队列信息StringkeyputMessageContext。getTopicQueueTableKey();从主题队列路由表中获取队列偏移量LongqueueOffsetCommitLog。this。topicQueueTable。get(key);如果偏移量为空if(nullqueueOffset){queueOffset0L;初始化为0添加到路由表中CommitLog。this。topicQueueTable。put(key,queueOffset);}booleanmultiDispatchWrapResultCommitLog。this。multiDispatch。wrapMultiDispatch(msgInner);if(!multiDispatchWrapResult){returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}如果开启事务需要特殊处理finalinttranTypeMessageSysFlag。getTransactionValue(msgInner。getSysFlag());。。。获取之前已经写入到buffer的消息数据ByteBufferpreEncodeBuffermsgInner。getEncodedBuff();获取数据长度finalintmsgLenpreEncodeBuffer。getInt(0);校验是否有足够的空间写入数据,如果消息长度预留空间大小大于最大值if((msgLenENDFILEMINBLANKLENGTH)maxBlank){this。msgStoreItemMemory。clear();1设置文件大小this。msgStoreItemMemory。putInt(maxBlank);2写入魔数this。msgStoreItemMemory。putInt(CommitLog。BLANKMAGICCODE);开始时间finallongbeginTimeMillsCommitLog。this。defaultMessageStore。now();将文件大小和魔数写入bufferbyteBuffer。put(this。msgStoreItemMemory。array(),0,8);返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为ENDOFFILEreturnnewAppendMessageResult(AppendMessageStatus。ENDOFFILE,wroteOffset,maxBlank,onlywrote8bytes,butdeclarewrotemaxBlankforcomputewritepositionmsgIdSupplier,msgInner。getStoreTimestamp(),queueOffset,CommitLog。this。defaultMessageStore。now()beginTimeMills);}计算队列偏移量的位置intpos44444;6写入队列偏移量preEncodeBuffer。putLong(pos,queueOffset);pos8;7写入物理偏移量preEncodeBuffer。putLong(pos,fileFromOffsetbyteBuffer。position());intipLen(msgInner。getSysFlag()MessageSysFlag。BORNHOSTV6FLAG)0?44:164;8系统标识,9BORNTIMESTAMP,10BORNHOST,11STORETIMESTAMPpos848ipLen;计算存储时间戳的写入位置更新新存储时间戳preEncodeBuffer。putLong(pos,msgInner。getStoreTimestamp());finallongbeginTimeMillsCommitLog。this。defaultMessageStore。now();将preEncodeBuffer的数据写入byteBufferbyteBuffer。put(preEncodeBuffer);清空buffermsgInner。setEncodedBuff(null);设置返回结果AppendMessageResultresultnewAppendMessageResult(AppendMessageStatus。PUTOK,wroteOffset,msgLen,msgIdSupplier,msgInner。getStoreTimestamp(),queueOffset,CommitLog。this。defaultMessageStore。now()beginTimeMills);switch(tranType){caseMessageSysFlag。TRANSACTIONPREPAREDTYPE:caseMessageSysFlag。TRANSACTIONROLLBACKTYPE:break;caseMessageSysFlag。TRANSACTIONNOTTYPE:caseMessageSysFlag。TRANSACTIONCOMMITTYPE:ThenextupdateConsumeQueueinformationCommitLog。this。topicQueueTable。put(key,queueOffset);CommitLog。this。multiDispatch。updateMultiQueueOffset(msgInner);break;default:break;}returnresult;}}}刷盘 由于篇幅原因,刷盘机制将另写一篇文章。 总结 参考 丁威、周继锋《RocketMQ技术内幕》 https:github。comapacherocketmqblobdevelopdocscnExampleLMQ。md RocketMQ版本:4。9。3