专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

RocketMQ消息的存储

  from:cnblogs。comshanmlp16428961。htmBroker对消息的处理
  BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:publicclassBrokerController{初始化publicbooleaninitialize()throwsCloneNotSupportedException{。。。注册处理器this。registerProcessor();。。。}注册处理器publicvoidregisterProcessor(){发送消息处理器SendMessageProcessorsendProcessornewSendMessageProcessor(this);。。。注册消息发送处理器this。remotingServer。registerProcessor(RequestCode。SENDMESSAGE,sendProcessor,this。sendMessageExecutor);this。remotingServer。registerProcessor(RequestCode。SENDMESSAGEV2,sendProcessor,this。sendMessageExecutor);省略其他注册。。。}}
  在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessor的processRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:publicclassSendMessageProcessorextendsAbstractSendMessageProcessorimplementsNettyRequestProcessor{处理请求OverridepublicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{RemotingCommandresponsenull;try{处理请求responseasyncProcessRequest(ctx,request)。get();}catch(InterruptedExceptionExecutionExceptione){log。error(processSendMessageerror,request:request。toString(),e);}returnresponse;}异步处理请求publicCompletableFutureRemotingCommandasyncProcessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalSendMessageContextmqtraceContext;switch(request。getCode()){caseRequestCode。CONSUMERSENDMSGBACK:returnthis。asyncConsumerSendMsgBack(ctx,request);default:解析请求头SendMessageRequestHeaderrequestHeaderparseRequestHeader(request);。。。if(requestHeader。isBatch()){批量消息发送处理returnthis。asyncSendBatchMessage(ctx,request,mqtraceContext,requestHeader);}else{单个消息发送处理returnthis。asyncSendMessage(ctx,request,mqtraceContext,requestHeader);}}}单个消息发送处理privateCompletableFutureRemotingCommandasyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){。。。CompletableFuturePutMessageResultputMessageResultnull;StringtransFlagorigProps。get(MessageConst。PROPERTYTRANSACTIONPREPARED);是否使用事务if(transFlag!nullBoolean。parseBoolean(transFlag)){if(this。brokerController。getBrokerConfig()。isRejectTransactionMessage()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thebroker〔this。brokerController。getBrokerConfig()。getBrokerIP1()〕sendingtransactionmessageisforbidden);returnCompletableFuture。completedFuture(response);}事务处理putMessageResultthis。brokerController。getTransactionalMessageService()。asyncPrepareMessage(msgInner);}else{消息持久化putMessageResultthis。brokerController。getMessageStore()。asyncPutMessage(msgInner);}returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}}
  以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:创建MessageExtBrokerInner对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中判断是否使用了事务,如果未使用事务调用brokerController的getMessageStore方法获取MessageStore对象,然后调用asyncPutMessage方法对消息进行持久化存储返回消息的存储结果publicclassSendMessageProcessorextendsAbstractSendMessageProcessorimplementsNettyRequestProcessor{单个消息发送处理privateCompletableFutureRemotingCommandasyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){。。。创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息MessageExtBrokerInnermsgInnernewMessageExtBrokerInner();设置主题msgInner。setTopic(requestHeader。getTopic());设置消息所在的队列IDmsgInner。setQueueId(queueIdInt);if(!handleRetryAndDLQ(requestHeader,response,request,msgInner,topicConfig)){returnCompletableFuture。completedFuture(response);}设置消息内容msgInner。setBody(body);msgInner。setFlag(requestHeader。getFlag());设置属性MapString,StringorigPropsMessageDecoder。string2messageProperties(requestHeader。getProperties());MessageAccessor。setProperties(msgInner,origProps);设置发送消息时间msgInner。setBornTimestamp(requestHeader。getBornTimestamp());设置发送消息的主机地址msgInner。setBornHost(ctx。channel()。remoteAddress());设置存储消息的主机地址msgInner。setStoreHost(this。getStoreHost());msgInner。setReconsumeTimes(requestHeader。getReconsumeTimes()null?0:requestHeader。getReconsumeTimes());StringclusterNamethis。brokerController。getBrokerConfig()。getBrokerClusterName();属性中添加集群名称MessageAccessor。putProperty(msgInner,MessageConst。PROPERTYCLUSTER,clusterName);如果属性中包含PROPERTYWAITSTOREMSGOKif(origProps。containsKey(MessageConst。PROPERTYWAITSTOREMSGOK)){StringwaitStoreMsgOKValueorigProps。remove(MessageConst。PROPERTYWAITSTOREMSGOK);设置消息属性msgInner。setPropertiesString(MessageDecoder。messageProperties2String(msgInner。getProperties()));origProps。put(MessageConst。PROPERTYWAITSTOREMSGOK,waitStoreMsgOKValue);}else{msgInner。setPropertiesString(MessageDecoder。messageProperties2String(msgInner。getProperties()));}CompletableFuturePutMessageResultputMessageResultnull;StringtransFlagorigProps。get(MessageConst。PROPERTYTRANSACTIONPREPARED);是否使用事务if(transFlag!nullBoolean。parseBoolean(transFlag)){if(this。brokerController。getBrokerConfig()。isRejectTransactionMessage()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thebroker〔this。brokerController。getBrokerConfig()。getBrokerIP1()〕sendingtransactionmessageisforbidden);returnCompletableFuture。completedFuture(response);}事务处理putMessageResultthis。brokerController。getTransactionalMessageService()。asyncPrepareMessage(msgInner);}else{消息写入putMessageResultthis。brokerController。getMessageStore()。asyncPutMessage(msgInner);}返回消息持久化结果returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}}
  MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:publicclassBrokerController{privateMessageStoremessageStore;publicbooleaninitialize()throwsCloneNotSupportedException{booleanresultthis。topicConfigManager。load();。。。if(result){try{创建DefaultMessageStorethis。messageStorenewDefaultMessageStore(this。messageStoreConfig,this。brokerStatsManager,this。messageArrivingListener,this。brokerConfig);。。。}catch(IOExceptione){resultfalse;log。error(Failedtoinitialize,e);}}获取MessageStorepublicMessageStoregetMessageStore(){returnmessageStore;}}消息存储
  DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog,DLedgerCommitLog是CommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。
  在DefaultMessageStore的asyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog的asyncPutMessage进行消息写入:publicclassDefaultMessageStoreimplementsMessageStore{privatefinalCommitLogcommitLog;CommitLogpublicDefaultMessageStore(finalMessageStoreConfigmessageStoreConfig,finalBrokerStatsManagerbrokerStatsManager,finalMessageArrivingListenermessageArrivingListener,finalBrokerConfigbrokerConfig)throwsIOException{。。。如果启用了Dlegerif(messageStoreConfig。isEnableDLegerCommitLog()){使用DLedgerCommitLogthis。commitLognewDLedgerCommitLog(this);}else{否则使用CommitLogthis。commitLognewCommitLog(this);}。。。}OverridepublicCompletableFuturePutMessageResultasyncPutMessage(MessageExtBrokerInnermsg){校验存储状态PutMessageStatuscheckStoreStatusthis。checkStoreStatus();if(checkStoreStatus!PutMessageStatus。PUTOK){returnCompletableFuture。completedFuture(newPutMessageResult(checkStoreStatus,null));}校验消息合法性PutMessageStatusmsgCheckStatusthis。checkMessage(msg);if(msgCheckStatusPutMessageStatus。MESSAGEILLEGAL){returnCompletableFuture。completedFuture(newPutMessageResult(msgCheckStatus,null));}进行一系列校验PutMessageStatuslmqMsgCheckStatusthis。checkLmqMessage(msg);if(msgCheckStatusPutMessageStatus。LMQCONSUMEQUEUENUMEXCEEDED){returnCompletableFuture。completedFuture(newPutMessageResult(lmqMsgCheckStatus,null));}longbeginTimethis。getSystemClock()。now();调用CommitLog的asyncPutMessage方法写入消息CompletableFuturePutMessageResultputResultFuturethis。commitLog。asyncPutMessage(msg);putResultFuture。thenAccept((result){longelapsedTimethis。getSystemClock()。now()beginTime;if(elapsedTime500){log。warn(putMessagenotinlockelapsedtime(ms){},bodyLength{},elapsedTime,msg。getBody()。length);}this。storeStatsService。setPutMessageEntireTimeMax(elapsedTime);if(nullresult!result。isOk()){this。storeStatsService。getPutMessageFailedTimes()。add(1);}});returnputResultFuture;}}合法性校验Broker存储检查
  checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:MessageStore是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储Broker是否是从节点,从节点只能读不能写Broker是否有写权限,如果没有写入权限,不能进行写入操作操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作privatePutMessageStatuscheckStoreStatus(){是否处于停止状态if(this。shutdown){log。warn(messagestorehasshutdown,soputMessageisforbidden);returnPutMessageStatus。SERVICENOTAVAILABLE;}是否SLAVE角色if(BrokerRole。SLAVEthis。messageStoreConfig。getBrokerRole()){longvaluethis。printTimes。getAndIncrement();if((value50000)0){log。warn(brokeroleisslave,soputMessageisforbidden);}returnPutMessageStatus。SERVICENOTAVAILABLE;}是否可写if(!this。runningFlags。isWriteable()){longvaluethis。printTimes。getAndIncrement();if((value50000)0){log。warn(themessagestoreisnotwritable。Itmaybecausedbyoneofthefollowingreasons:thebrokersdiskisfull,writetologicqueueerror,writetoindexfileerror,etc);}returnPutMessageStatus。SERVICENOTAVAILABLE;}else{this。printTimes。set(0);}操作系统是否处于PAGECACHE繁忙状态if(this。isOSPageCacheBusy()){returnPutMessageStatus。OSPAGECACHEBUSY;}returnPutMessageStatus。PUTOK;}消息长度检查
  checkMessage方法主要是对主题的长度校验和消息属性的长度校验:privatePutMessageStatuscheckMessage(MessageExtBrokerInnermsg){如果主题的长度大于最大值if(msg。getTopic()。length()Byte。MAXVALUE){log。warn(putMessagemessagetopiclengthtoolongmsg。getTopic()。length());returnPutMessageStatus。MESSAGEILLEGAL;}如果消息属性长度大于最大值if(msg。getPropertiesString()!nullmsg。getPropertiesString()。length()Short。MAXVALUE){log。warn(putMessagemessagepropertieslengthtoolongmsg。getPropertiesString()。length());returnPutMessageStatus。MESSAGEILLEGAL;}returnPutMessageStatus。PUTOK;}checkLmqMessage
  checkLmqMessage主要判断在开启LMQ(LightMessageQueue)时是否超过了最大消费数量:privatePutMessageStatuscheckLmqMessage(MessageExtBrokerInnermsg){如果消息属性不为空、存在PROPERTYINNERMULTIDISPATCH属性、并且超过了最大消费数量if(msg。getProperties()!nullStringUtils。isNotBlank(msg。getProperty(MessageConst。PROPERTYINNERMULTIDISPATCH))this。isLmqConsumeQueueNumExceeded()){returnPutMessageStatus。LMQCONSUMEQUEUENUMEXCEEDED;}returnPutMessageStatus。PUTOK;}privatebooleanisLmqConsumeQueueNumExceeded(){开启了LMQ开启了多个队列分发消费数量大于了限定值if(this。getMessageStoreConfig()。isEnableLmq()this。getMessageStoreConfig()。isEnableMultiDispatch()this。lmqConsumeQueueNum。get()this。messageStoreConfig。getMaxLmqConsumeQueueNum()){returntrue;}returnfalse;}消息写入
  对消息进行校验完毕之后,调用了CommitLog的asyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:首先对消息的相关属性进行了设置,主要包括以下内容存储时间消息内容的CRC校验和如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识获取当前线程绑定的PutMessageThreadLocal对象,里面有一个MessageExtEncoder类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner的setEncodedBuff方法将buffer设置到encodedBuff中加锁,从mappedFileQueue中获取上一次使用的映射文件mappedFile,并更新消息的存储时间,如果mappedFile为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1GmappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录调用mappedFile的appendMessage方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中调用submitFlushRequest方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘publicclassCommitLog{所有mappedFile集合protectedfinalMappedFileQueuemappedFileQueue;ThreadLocalprivatefinalThreadLocalPutMessageThreadLocalputMessageThreadLocal;写入消息的回调函数privatefinalAppendMessageCallbackappendMessageCallback;publicCommitLog(finalDefaultMessageStoredefaultMessageStore){构造函数。。。创建回调函数this。appendMessageCallbacknewDefaultAppendMessageCallback(defaultMessageStore。getMessageStoreConfig()。getMaxMessageSize());。。。}publicCompletableFuturePutMessageResultasyncPutMessage(finalMessageExtBrokerInnermsg){设置存储时间msg。setStoreTimestamp(System。currentTimeMillis());设置消息的CRC值msg。setBodyCRC(UtilAll。crc32(msg。getBody()));写入结果AppendMessageResultresultnull;获取存储统计服务StoreStatsServicestoreStatsServicethis。defaultMessageStore。getStoreStatsService();获取主题Stringtopicmsg。getTopic();获取事务类型finalinttranTypeMessageSysFlag。getTransactionValue(msg。getSysFlag());if(tranTypeMessageSysFlag。TRANSACTIONNOTTYPEtranTypeMessageSysFlag。TRANSACTIONCOMMITTYPE){省略事务相关处理}获取发送消息的主机地址InetSocketAddressbornSocketAddress(InetSocketAddress)msg。getBornHost();if(bornSocketAddress。getAddress()instanceofInet6Address){如果是IPV6msg。setBornHostV6Flag();设置IPV6标识}获取存储消息的主机地址InetSocketAddressstoreSocketAddress(InetSocketAddress)msg。getStoreHost();if(storeSocketAddress。getAddress()instanceofInet6Address){msg。setStoreHostAddressV6Flag();设置IPV6标识}获取当前线程绑定的PutMessageThreadLocal对象PutMessageThreadLocalputMessageThreadLocalthis。putMessageThreadLocal。get();调用encode方法对消息进行编码,并写入bufferPutMessageResultencodeResultputMessageThreadLocal。getEncoder()。encode(msg);if(encodeResult!null){returnCompletableFuture。completedFuture(encodeResult);}将存储编码消息的buffer设置到msg中msg。setEncodedBuff(putMessageThreadLocal。getEncoder()。encoderBuffer);创建PutMessageContextPutMessageContextputMessageContextnewPutMessageContext(generateKey(putMessageThreadLocal。getKeyBuilder(),msg));longelapsedTimeInLock0;MappedFileunlockMappedFilenull;加锁putMessageLock。lock();try{获取上一次写入的文件MappedFilemappedFilethis。mappedFileQueue。getLastMappedFile();获取系统时间戳longbeginLockTimestampthis。defaultMessageStore。getSystemClock()。now();this。beginTimeInLockbeginLockTimestamp;再次更新存储时间戳,保证全局顺序msg。setStoreTimestamp(beginLockTimestamp);如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件if(nullmappedFilemappedFile。isFull()){使用偏移量0创建一个新的文件mappedFilethis。mappedFileQueue。getLastMappedFile(0);}如果依旧为空if(nullmappedFile){提示错误log。error(createmappedfile1error,topic:msg。getTopic()clientAddr:msg。getBornHostString());returnCompletableFuture。completedFuture(newPutMessageResult(PutMessageStatus。CREATEMAPEDFILEFAILED,null));}写入消息resultmappedFile。appendMessage(msg,this。appendMessageCallback,putMessageContext);。。。elapsedTimeInLockthis。defaultMessageStore。getSystemClock()。now()beginLockTimestamp;}finally{beginTimeInLock0;putMessageLock。unlock();}。。。PutMessageResultputMessageResultnewPutMessageResult(PutMessageStatus。PUTOK,result);统计相关storeStatsService。getSinglePutMessageTopicTimesTotal(msg。getTopic())。add(1);storeStatsService。getSinglePutMessageTopicSizeTotal(topic)。add(result。getWroteBytes());执行刷盘CompletableFuturePutMessageStatusflushResultFuturesubmitFlushRequest(result,msg);CompletableFuturePutMessageStatusreplicaResultFuturesubmitReplicaRequest(result,msg);returnflushResultFuture。thenCombine(replicaResultFuture,(flushStatus,replicaStatus){if(flushStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(flushStatus);}if(replicaStatus!PutMessageStatus。PUTOK){putMessageResult。setPutMessageStatus(replicaStatus);}返回结果returnputMessageResult;});}}写入内存Buffer编码消息
  MessageExtEncoder是CommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:publicclassCommitLog{ThreadLocalprivatefinalThreadLocalPutMessageThreadLocalputMessageThreadLocal;添加消息的ThreadLocal对象staticclassPutMessageThreadLocal{privateMessageExtEncoderencoder;引用MessageExtEncoderprivateStringBuilderkeyBuilder;PutMessageThreadLocal(intsize){创建MessageExtEncoder,size用来指定分配内存的大小encodernewMessageExtEncoder(size);keyBuildernewStringBuilder();}。。。}}
  MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区:对消息属性数据的长度进行校验判断是否超过限定值对总消息内容长度进行校验,判断是否超过最大的长度限制根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域将消息相关信息写入buffer:写入消息长度写入魔数写入消息体CRC校验和写入队列ID写入标识队列的偏移量,需要注意这里还没达到偏移量的值,先占位稍后写入文件的物理偏移量,先占位稍后写入写入系统标识写入发送消息的时间戳写入发送消息的主机地址写入存储时间戳写入存储消息的主机地址RECONSUMETIMESPreparedTransactionOffset写入消息体长度和消息内容写入主题长度写入主题写入属性长度和属性内容publicclassCommitLog{MessageExtEncoderpublicstaticclassMessageExtEncoder{字节缓冲区,存储消息内容的bufferprivatefinalByteBufferencoderBuffer;MessageExtEncoder(finalintsize){分配内存this。encoderBufferByteBuffer。allocateDirect(size);this。maxMessageSizesize;}对消息进行编码并写入bufferprotectedPutMessageResultencode(MessageExtBrokerInnermsgInner){消息属性数据finalbyte〔〕propertiesDatamsgInner。getPropertiesString()null?null:msgInner。getPropertiesString()。getBytes(MessageDecoder。CHARSETUTF8);属性数据长度finalintpropertiesLengthpropertiesDatanull?0:propertiesData。length;校验长度是否超过最大值if(propertiesLengthShort。MAXVALUE){log。warn(putMessagemessagepropertieslengthtoolong。length{},propertiesData。length);returnnewPutMessageResult(PutMessageStatus。PROPERTIESSIZEEXCEEDED,null);}获取主题数据finalbyte〔〕topicDatamsgInner。getTopic()。getBytes(MessageDecoder。CHARSETUTF8);finalinttopicLengthtopicData。length;主题数据长度获取消息体内容长度finalintbodyLengthmsgInner。getBody()null?0:msgInner。getBody()。length;总消息内容长度finalintmsgLencalMsgLength(msgInner。getSysFlag(),bodyLength,topicLength,propertiesLength);是否超过最大长度限制if(msgLenthis。maxMessageSize){CommitLog。log。warn(messagesizeexceeded,msgtotalsize:msgLen,msgbodysize:bodyLength,maxMessageSize:this。maxMessageSize);returnnewPutMessageResult(PutMessageStatus。MESSAGEILLEGAL,null);}初始化this。resetByteBuffer(encoderBuffer,msgLen);1写入消息长度this。encoderBuffer。putInt(msgLen);2写入魔数this。encoderBuffer。putInt(CommitLog。MESSAGEMAGICCODE);3写入消息体CRC校验和this。encoderBuffer。putInt(msgInner。getBodyCRC());4写入队列IDthis。encoderBuffer。putInt(msgInner。getQueueId());5写入标识this。encoderBuffer。putInt(msgInner。getFlag());6队列的偏移量,稍后写入this。encoderBuffer。putLong(0);7文件的物理偏移量,稍后写入this。encoderBuffer。putLong(0);8写入系统标识this。encoderBuffer。putInt(msgInner。getSysFlag());9写入发送消息的时间戳this。encoderBuffer。putLong(msgInner。getBornTimestamp());10写入发送消息的主机地址socketAddress2ByteBuffer(msgInner。getBornHost(),this。encoderBuffer);11写入存储时间戳this。encoderBuffer。putLong(msgInner。getStoreTimestamp());12写入存储消息的主机地址socketAddress2ByteBuffer(msgInner。getStoreHost(),this。encoderBuffer);13RECONSUMETIMESthis。encoderBuffer。putInt(msgInner。getReconsumeTimes());14PreparedTransactionOffsetthis。encoderBuffer。putLong(msgInner。getPreparedTransactionOffset());15写入消息体长度this。encoderBuffer。putInt(bodyLength);if(bodyLength0)this。encoderBuffer。put(msgInner。getBody());写入消息内容16写入主题长度this。encoderBuffer。put((byte)topicLength);写入主题this。encoderBuffer。put(topicData);17写入属性长度this。encoderBuffer。putShort((short)propertiesLength);if(propertiesLength0)this。encoderBuffer。put(propertiesData);写入属性数据encoderBuffer。flip();returnnull;}}}写入内存映射文件
  前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:
  ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。
  MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。
  MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:
  第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。
  第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。
  综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。publicclassMappedFileextendsReferenceResource{记录文件的写入位置protectedfinalAtomicIntegerwrotePositionnewAtomicInteger(0);文件大小protectedintfileSize;字节bufferprotectedByteBufferwriteBuffernull;文件映射privateMappedByteBuffermappedByteBuffer;暂存池,类似线程池,只不过池中存放的是申请的内存protectedTransientStorePooltransientStorePoolnull;初始化publicvoidinit(finalStringfileName,finalintfileSize,finalTransientStorePooltransientStorePool)throwsIOException{init(fileName,fileSize);从暂存池中获取一块内存this。writeBuffertransientStorePool。borrowBuffer();this。transientStorePooltransientStorePool;}初始化privatevoidinit(finalStringfileName,finalintfileSize)throwsIOException{。。。try{获取文件this。fileChannelnewRandomAccessFile(this。file,rw)。getChannel();进行文件映射this。mappedByteBufferthis。fileChannel。map(MapMode。READWRITE,0,fileSize);TOTALMAPPEDVIRTUALMEMORY。addAndGet(fileSize);TOTALMAPPEDFILES。incrementAndGet();oktrue;}catch(FileNotFoundExceptione){。。。}catch(IOExceptione){。。。}finally{if(!okthis。fileChannel!null){this。fileChannel。close();}}}}暂存池publicclassTransientStorePool{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);privatefinalintpoolSize;暂存池大小privatefinalintfileSize;申请的每一块内存大小privatefinalDequeByteBufferavailableBuffers;双端队列,存放申请的内存privatefinalMessageStoreConfigstoreConfig;存储配置publicTransientStorePool(finalMessageStoreConfigstoreConfig){this。storeConfigstoreConfig;this。poolSizestoreConfig。getTransientStorePoolSize();this。fileSizestoreConfig。getMappedFileSizeCommitLog();this。availableBuffersnewConcurrentLinkedDeque();}初始化publicvoidinit(){根据暂存池大小申请内存for(inti0;ipoolSize;i){申请直接内存ByteBufferbyteBufferByteBuffer。allocateDirect(fileSize);finallongaddress((DirectBuffer)byteBuffer)。address();PointerpointernewPointer(address);LibC。INSTANCE。mlock(pointer,newNativeLong(fileSize));放入到暂存池中availableBuffers。offer(byteBuffer);}}}
  经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:MappedFile中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的slice方法创建一个与MappedFile共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中调用回调函数的doAppend方法进行写入,前面可知回调函数是DefaultAppendMessageCallback类型的更新MappedFile写入位置,返回写入结果publicclassMappedFileextendsReferenceResource{记录文件的写入位置protectedfinalAtomicIntegerwrotePositionnewAtomicInteger(0);文件大小protectedintfileSize;字节bufferprotectedByteBufferwriteBuffernull;文件映射privateMappedByteBuffermappedByteBuffer;写入消息publicAppendMessageResultappendMessage(finalMessageExtBrokerInnermsg,finalAppendMessageCallbackcb,PutMessageContextputMessageContext){调用appendMessagesInnerreturnappendMessagesInner(msg,cb,putMessageContext);}publicAppendMessageResultappendMessagesInner(finalMessageExtmessageExt,finalAppendMessageCallbackcb,PutMessageContextputMessageContext){assertmessageExt!null;assertcb!null;获取写入位置intcurrentPosthis。wrotePosition。get();如果写指针小于文件大小if(currentPosthis。fileSize){如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBufferByteBufferbyteBufferwriteBuffer!null?writeBuffer。slice():this。mappedByteBuffer。slice();设置共享内存区的写入位置byteBuffer。position(currentPos);AppendMessageResultresult;if(messageExtinstanceofMessageExtBrokerInner){单个消息处理通过共享内存区byteBuffer写入数据resultcb。doAppend(this。getFileFromOffset(),byteBuffer,this。fileSizecurrentPos,(MessageExtBrokerInner)messageExt,putMessageContext);}elseif(messageExtinstanceofMessageExtBatch){批量消息通过共享内存区byteBuffer写入数据resultcb。doAppend(this。getFileFromOffset(),byteBuffer,this。fileSizecurrentPos,(MessageExtBatch)messageExt,putMessageContext);}else{returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}更新MappedFile的写入位置this。wrotePosition。addAndGet(result。getWroteBytes());this。storeTimestampresult。getStoreTimestamp();returnresult;}log。error(MappedFile。appendMessagereturnnull,wrotePosition:{}fileSize:{},currentPos,this。fileSize);returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}}
  进入到DefaultAppendMessageCallback的doAppend方法中,首先来看方法的入参:fileFromOffset:文件的起始位置偏移量byteBuffer:缓冲区,也就是上一步中创建的共享内存区maxBlank:上一步中可知传入的是文件总大小减去当前要写入的位置,也就是文件剩余空间大小msgInner:消息内容的封装体putMessageContext:消息写入上下文
  方法的处理逻辑如下:计算文件要写入位置偏移量:文件起始位置偏移量准备写入位置的偏移量从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中从msgInner中获取之前已经写入到内存的消息数据preEncodeBuffer,并获取消息内容的长度校验是否有足够的空间写入数据,如果消息长度ENDFILEMINBLANKLENGTH(预留空间大小)大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为ENDOFFILE(超过文件大小)。这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。计算队列偏移量在preEncodeBuffer中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。
  消息写入结果PUTOK:写入成功;ENDOFFILE:超过文件大小;MESSAGESIZEEXCEEDED:消息长度超过最大允许长度:PROPERTIESSIZEEXCEEDED:消息、属性超过最大允许长度;UNKNOWNERROR:未知异常;publicclassCommitLog{classDefaultAppendMessageCallbackimplementsAppendMessageCallback{预留空间大小,8个字节privatestaticfinalintENDFILEMINBLANKLENGTH44;publicAppendMessageResultdoAppend(finallongfileFromOffset,finalByteBufferbyteBuffer,finalintmaxBlank,finalMessageExtBrokerInnermsgInner,PutMessageContextputMessageContext){计算写入位置物理偏移量:文件起始位置准备写入位置的偏移量longwroteOffsetfileFromOffsetbyteBuffer。position();SupplierStringmsgIdSupplier(){intsysflagmsgInner。getSysFlag();intmsgIdLen(sysflagMessageSysFlag。STOREHOSTADDRESSV6FLAG)0?448:1648;ByteBuffermsgIdBufferByteBuffer。allocate(msgIdLen);MessageExt。socketAddress2ByteBuffer(msgInner。getStoreHost(),msgIdBuffer);msgIdBuffer。clear();becausesocketAddress2ByteBufferflipthebuffermsgIdBuffer。putLong(msgIdLen8,wroteOffset);returnUtilAll。bytes2string(msgIdBuffer。array());};获取消息队列信息StringkeyputMessageContext。getTopicQueueTableKey();从主题队列路由表中获取队列偏移量LongqueueOffsetCommitLog。this。topicQueueTable。get(key);如果偏移量为空if(nullqueueOffset){queueOffset0L;初始化为0添加到路由表中CommitLog。this。topicQueueTable。put(key,queueOffset);}booleanmultiDispatchWrapResultCommitLog。this。multiDispatch。wrapMultiDispatch(msgInner);if(!multiDispatchWrapResult){returnnewAppendMessageResult(AppendMessageStatus。UNKNOWNERROR);}如果开启事务需要特殊处理finalinttranTypeMessageSysFlag。getTransactionValue(msgInner。getSysFlag());。。。获取之前已经写入到buffer的消息数据ByteBufferpreEncodeBuffermsgInner。getEncodedBuff();获取数据长度finalintmsgLenpreEncodeBuffer。getInt(0);校验是否有足够的空间写入数据,如果消息长度预留空间大小大于最大值if((msgLenENDFILEMINBLANKLENGTH)maxBlank){this。msgStoreItemMemory。clear();1设置文件大小this。msgStoreItemMemory。putInt(maxBlank);2写入魔数this。msgStoreItemMemory。putInt(CommitLog。BLANKMAGICCODE);开始时间finallongbeginTimeMillsCommitLog。this。defaultMessageStore。now();将文件大小和魔数写入bufferbyteBuffer。put(this。msgStoreItemMemory。array(),0,8);返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为ENDOFFILEreturnnewAppendMessageResult(AppendMessageStatus。ENDOFFILE,wroteOffset,maxBlank,onlywrote8bytes,butdeclarewrotemaxBlankforcomputewritepositionmsgIdSupplier,msgInner。getStoreTimestamp(),queueOffset,CommitLog。this。defaultMessageStore。now()beginTimeMills);}计算队列偏移量的位置intpos44444;6写入队列偏移量preEncodeBuffer。putLong(pos,queueOffset);pos8;7写入物理偏移量preEncodeBuffer。putLong(pos,fileFromOffsetbyteBuffer。position());intipLen(msgInner。getSysFlag()MessageSysFlag。BORNHOSTV6FLAG)0?44:164;8系统标识,9BORNTIMESTAMP,10BORNHOST,11STORETIMESTAMPpos848ipLen;计算存储时间戳的写入位置更新新存储时间戳preEncodeBuffer。putLong(pos,msgInner。getStoreTimestamp());finallongbeginTimeMillsCommitLog。this。defaultMessageStore。now();将preEncodeBuffer的数据写入byteBufferbyteBuffer。put(preEncodeBuffer);清空buffermsgInner。setEncodedBuff(null);设置返回结果AppendMessageResultresultnewAppendMessageResult(AppendMessageStatus。PUTOK,wroteOffset,msgLen,msgIdSupplier,msgInner。getStoreTimestamp(),queueOffset,CommitLog。this。defaultMessageStore。now()beginTimeMills);switch(tranType){caseMessageSysFlag。TRANSACTIONPREPAREDTYPE:caseMessageSysFlag。TRANSACTIONROLLBACKTYPE:break;caseMessageSysFlag。TRANSACTIONNOTTYPE:caseMessageSysFlag。TRANSACTIONCOMMITTYPE:ThenextupdateConsumeQueueinformationCommitLog。this。topicQueueTable。put(key,queueOffset);CommitLog。this。multiDispatch。updateMultiQueueOffset(msgInner);break;default:break;}returnresult;}}}刷盘
  由于篇幅原因,刷盘机制将另写一篇文章。
  总结
  参考
  丁威、周继锋《RocketMQ技术内幕》
  https:github。comapacherocketmqblobdevelopdocscnExampleLMQ。md
  RocketMQ版本:4。9。3

国家发改委建议屠宰企业适当增加库存促进猪价尽快回升至合理区间据国家发展改革委微信公众号消息,针对近期生猪价格低位运行的情况,国家发展改革委价格司组织部分大型生猪屠宰企业召开会议,深入分析生猪市场供需和价格形势,建议屠宰企业适当增加商业库存提关于元宇宙的另一种猜想元宇宙已成当下最热门的词汇,这是关于人类未来生活场景的一种描述,主要是和虚拟现实有关,涉及到网络,区块链,虚拟现实,显示系统等,可以让人实现沉浸式虚拟场景,人与人的沟通在跨空间跨时2023年最大的人工智能(AI)趋势整个技术行业的主导部分主要由人工智能和机器学习组成,它们正日益成为人类日常生活的一部分。发展使公司能够以更少的时间和金钱实现预期目标,快速做出重要决策并创造新的创新产品和服务。到2程序员是面向搜索引擎编程?分享Google的11个查资料技巧!对程序员来说,搜索引擎肯定是日常工作中是不可或缺,很多人戏谑自己是面向搜索引擎编程遇到困难,去网上搜答案这个行为没错,我相信我肯定不是第一个遇到这个难题的,网上找找前辈是怎么解决但AI算法线性回归原理与实现1算法思想回归分析是一种预测性建模技术,它是研究因变量与自变量之间的关系,这种技术通常用于预测分析,时间序列模型以及发现变量之间的因果关系,通常使用直线或曲线进行拟合,目标是使曲线谁是药王?2022年度十大创新药企榜单揭晓华夏时报(www。chinatimes。net。cn)记者孙梦圆于娜北京报道2023年1月12日,在华夏时报社主办的第二届华夏大健康产业高峰论坛上,2022年度金手杖奖获奖名单揭晓韩国出口下降,经济短期复苏无望韩国关税厅对外公布出口数据,其出口持续下降,尤其芯片产业深受打击。刚刚,韩国关税厅对外公布了近期韩国出口的相关数据,和此前外界预期一致,韩国的出口持续下滑,尤其是芯片以及精密机械,武汉理工AFM调整界面电荷转移优化超晶格薄膜的热电性能!热电材料在微芯片和5G光通信模块的主动冷却以及可穿戴电子设备和物联网传感器的小温度梯度发电方面大有可为。热电材料的热电转换效率由无量纲功绩值(ZT)来评估,提高功率因数和减少L是提首列塔吉克斯坦甘肃国际货运班列抵达兰州今天(14日)上午,一列满载着50个集装箱1300余吨货物的国际货运班列由塔吉克斯坦运抵兰州新区铁路口岸。首次打通了塔吉克斯坦至甘肃的国际物流通道,也是西北第一个国家级新区兰州新区土库曼斯坦第七届议会选举将于3月举行土库曼斯坦电视台13日报道称,2023年3月26日,土库曼斯坦将举行第七届议会下院(马吉利斯)议员选举。土库曼斯坦议会下院已通过相关决议。第五届州市人民委员会成员选举与第九届国会成墨菲对手根本不想对位瓦兰,他为了球队的成功做出了牺牲鹈鹕以116110击败活塞。赛后,鹈鹕球员特雷墨菲接受媒体采访,谈到了队友约纳斯瓦兰丘纳斯。对手根本不想对位他。墨菲在采访中表示。我们就是不断地把球喂给他,然后围绕着他来打,他真的
你把鳄雀鳝当宠物,它却把本地物种当猎物头条创作挑战赛在当下社会养养猫养养狗养养鱼等都是很普遍的事情,我们的确也非常喜欢小动物,在小动物的陪伴下我们能收获很多乐趣。随着互联网人工智能的崛起,我们和世界的距离更近了。通过互娇姐侃彩排列五,9月1日,胆码猜想式差分式取巧式,来了大家好!今天分享两个方案,一个是三步式胆码,另一个是两步式投机取巧,供大家参考。对与错,请勿怪。一,胆码可能是有0,已走七步,防对数5入奖。二,位置码D,可能是9,或者是对数4。小戈尔巴乔夫,一个改变了世界的人戈尔巴乔夫,一个改变了世界的人!文叶雨秋俄罗斯总统事务局中央临床医院当地时间2022年8月30日夜间发布消息称,苏联最后一任领导人戈尔巴乔夫当天晚上因长期重病医治无效去世,终年91第二十五届成都国际汽车展览会盛大开幕128个汽车品牌1600余台汽车亮相李嫒嫒中国证券报中证网中证网讯(记者李嫒嫒)8月26日,第二十五届成都国际汽车展览会(简称成都国际车展)在中国西部国际博览城开幕。本届成都国际车展以享蓉城,促产业,稳经济,驭未来为建国最大医疗丑闻吃人不吐骨头的刘翔峰,让人心惊胆战湘雅刘翔峰让人心惊胆战,把病人当提款机,黑心医生累累恶行中央纪委监察部网站近日对长沙湘雅二院黑医刘翔峰作出专门回应,并发表评论坚决维护医疗领域的廉洁健康。湘雅医学院如果这件事只是医湖南禅僧,建十方道场,悲智行愿!众生受世间尊崇的真性活佛十方道场,养世间净土。众生称他为真正的活佛,弘法利生,一生悲智行愿,道行卓越。更以土木工程之法造福万千群众。他的殊荣名扬海外,一系列弘法十方之重任诉说着不凡。然而,纵使一身袈裟照见机关事业单位人员,在2024年10月以前退休是否更划算?差距大吗?机关事业单位人员,在2024年10月份前退休是否更划算?跟在此之后退休,养老金待遇水平相差大吗?对于机关事业单位人员来说,如果是在2024年10月份以前退休,那么他们是属于10年过74年华国锋衣着朴素参加家长会,工人家长问其贵姓,答中华的华引言1974年3月21日傍晚,一个身材魁梧神采奕奕的男人带着一个高中生模样的女生,着急忙慌的穿过鳞次街道进入了北京一六六中学。彼时是6点半,半个小时后,一六六中学将要举行家长会,男唐山打人陈某志落实罪名,妻子疯狂甩锅求脱身,背后保护伞被捉等待两个月,唐山烧烤店打人案终于有结果了!陈某志等犯罪嫌疑人被依法提起公诉,其背后的保护伞也被查处!唐山4名被打女子中,2人被鉴定为轻伤,2人轻微伤。一直有一个未解之谜,困惑着人们东八区的先生们马上开播,半个娱乐圈演员强势加盟,有爆款潜质最近新上映的影视剧比比皆是,都快追不过来了!今天备受瞩目的东八区的先生们也要开播了,在开拍的时候就受到了很多人的关注。东八区的先生们由张翰担任制作人,秦丽担任总制片人,夏睿执导,张财政部发布2022年上半年中国财政政策执行情况报告综述2022年是我国进入全面建设社会主义现代化国家向第二个百年奋斗目标进军新征程的重要一年。上半年,面对复杂严峻的国际形势和艰巨繁重的国内改革发展稳定任务,在以习近平同志为核心的党
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网