范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQ消息的存储

  from:cnblogs.com/shanml/p/16428961.htmBroker对消息的处理
  BrokerController  初始化的过程中,调用registerProcessor  方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor  ,然后将其注册到远程服务中:public class BrokerController {     // 初始化     public boolean initialize() throws CloneNotSupportedException {         // ...         // 注册处理器         this.registerProcessor();         // ...     }        // 注册处理器     public void registerProcessor() {         /**          * 发送消息处理器          */         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);         // ...         // 注册消息发送处理器         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);         // 省略其他注册...     } }
  在Broker收到生产者的发送消息请求时,会进入到 SendMessageProcessor  的processRequest  方法中处理请求,然后又会调用asyncProcessRequest  异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage  方法处理,否则调用asyncSendMessage  方法处理单个消息:public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {      // 处理请求     @Override     public RemotingCommand processRequest(ChannelHandlerContext ctx,                                           RemotingCommand request) throws RemotingCommandException {         RemotingCommand response = null;         try {             // 处理请求             response = asyncProcessRequest(ctx, request).get();         } catch (InterruptedException | ExecutionException e) {             log.error("process SendMessage error, request : " + request.toString(), e);         }         return response;     }        // 异步处理请求     public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,                                                                   RemotingCommand request) throws RemotingCommandException {         final SendMessageContext mqtraceContext;         switch (request.getCode()) {             case RequestCode.CONSUMER_SEND_MSG_BACK:                 return this.asyncConsumerSendMsgBack(ctx, request);             default:                 // 解析请求头                 SendMessageRequestHeader requestHeader = parseRequestHeader(request);                 // ...                 if (requestHeader.isBatch()) {                     // 批量消息发送处理                     return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);                 } else {                     // 单个消息发送处理                     return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);                 }         }     }        // 单个消息发送处理     private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,                                                                 SendMessageContext mqtraceContext,                                                                 SendMessageRequestHeader requestHeader) {         // ...         CompletableFuture putMessageResult = null;         String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);         // 是否使用事务         if (transFlag != null && Boolean.parseBoolean(transFlag)) {             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                 response.setCode(ResponseCode.NO_PERMISSION);                 response.setRemark(                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                 + "] sending transaction message is forbidden");                 return CompletableFuture.completedFuture(response);             }             // 事务处理             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);         } else {             // 消息持久化             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);         }         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);     }  }
  以单个消息的发送处理方法 asyncSendMessage  为例看一下消息的接收过程:创建 MessageExtBrokerInner  对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中判断是否使用了事务,如果未使用事务调用 brokerController  的getMessageStore  方法获取MessageStore  对象,然后调用asyncPutMessage  方法对消息进行持久化存储返回消息的存储结果 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {      // 单个消息发送处理     private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,                                                                 SendMessageContext mqtraceContext,                                                                 SendMessageRequestHeader requestHeader) {         // ...         // 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();         // 设置主题         msgInner.setTopic(requestHeader.getTopic());         // 设置消息所在的队列ID         msgInner.setQueueId(queueIdInt);         if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {             return CompletableFuture.completedFuture(response);         }         // 设置消息内容         msgInner.setBody(body);         msgInner.setFlag(requestHeader.getFlag());         // 设置属性         Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());         MessageAccessor.setProperties(msgInner, origProps);         // 设置发送消息时间         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());         // 设置发送消息的主机地址         msgInner.setBornHost(ctx.channel().remoteAddress());         // 设置存储消息的主机地址         msgInner.setStoreHost(this.getStoreHost());         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());         String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();         // 属性中添加集群名称         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);         // 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK         if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {             String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);             // 设置消息属性             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));             origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);         } else {             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));         }         CompletableFuture putMessageResult = null;         String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);         // 是否使用事务         if (transFlag != null && Boolean.parseBoolean(transFlag)) {             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                 response.setCode(ResponseCode.NO_PERMISSION);                 response.setRemark(                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                 + "] sending transaction message is forbidden");                 return CompletableFuture.completedFuture(response);             }             // 事务处理             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);         } else {             // 消息写入             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);         }         // 返回消息持久化结果         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);     } }
  MessageStore  是一个接口,在BrokerController  的初始化方法中可以看到,具体使用的是DefaultMessageStore  :public class BrokerController {     private MessageStore messageStore;     public boolean initialize() throws CloneNotSupportedException {         boolean result = this.topicConfigManager.load();         // ...         if (result) {             try {                 // 创建DefaultMessageStore                 this.messageStore =                     new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,                         this.brokerConfig);                 // ...             } catch (IOException e) {                 result = false;                 log.error("Failed to initialize", e);             }     }                // 获取MessageStore     public MessageStore getMessageStore() {         return messageStore;     } } 消息存储
  DefaultMessageStore  中有一个CommitLog  类型的成员变量,在DefaultMessageStore  中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog  ,DLedgerCommitLog  是CommitLog  的子类,如果未启用Dleger,就使用CommitLog  自己(接下来会以CommitLog  为例)。
  在 DefaultMessageStore  的asyncPutMessage  方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog  的asyncPutMessage  进行消息写入:public class DefaultMessageStore implements MessageStore {       private final CommitLog commitLog; // CommitLog       public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {         // ...         // 如果启用了Dleger         if (messageStoreConfig.isEnableDLegerCommitLog()) {             // 使用DLedgerCommitLog             this.commitLog = new DLedgerCommitLog(this);         } else {             // 否则使用CommitLog             this.commitLog = new CommitLog(this);         }         // ...     }          @Override     public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {         // 校验存储状态         PutMessageStatus checkStoreStatus = this.checkStoreStatus();         if (checkStoreStatus != PutMessageStatus.PUT_OK) {             return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));         }         // 校验消息合法性         PutMessageStatus msgCheckStatus = this.checkMessage(msg);         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {             return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));         }         // 进行一系列校验         PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);         if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {             return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));         }         long beginTime = this.getSystemClock().now();         // 调用CommitLog的asyncPutMessage方法写入消息         CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);         putResultFuture.thenAccept((result) -> {             long elapsedTime = this.getSystemClock().now() - beginTime;             if (elapsedTime > 500) {                 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);             }             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);              if (null == result || !result.isOk()) {                 this.storeStatsService.getPutMessageFailedTimes().add(1);             }         });          return putResultFuture;     }      } 合法性校验Broker存储检查
  checkStoreStatus  主要对Broker是否可以写入消息进行检查,包含以下几个方面:MessageStore  是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储Broker是否是从节点,从节点只能读不能写 Broker是否有写权限,如果没有写入权限,不能进行写入操作 操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作    private PutMessageStatus checkStoreStatus() {         // 是否处于停止状态         if (this.shutdown) {             log.warn("message store has shutdown, so putMessage is forbidden");             return PutMessageStatus.SERVICE_NOT_AVAILABLE;         }         // 是否SLAVE角色         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {             long value = this.printTimes.getAndIncrement();             if ((value % 50000) == 0) {                 log.warn("broke role is slave, so putMessage is forbidden");             }             return PutMessageStatus.SERVICE_NOT_AVAILABLE;         }         // 是否可写         if (!this.runningFlags.isWriteable()) {             long value = this.printTimes.getAndIncrement();             if ((value % 50000) == 0) {                 log.warn("the message store is not writable. It may be caused by one of the following reasons: " +                     "the broker"s disk is full, write to logic queue error, write to index file error, etc");             }             return PutMessageStatus.SERVICE_NOT_AVAILABLE;         } else {             this.printTimes.set(0);         }         // 操作系统是否处于PAGECACHE繁忙状态         if (this.isOSPageCacheBusy()) {             return PutMessageStatus.OS_PAGECACHE_BUSY;         }         return PutMessageStatus.PUT_OK;     } 消息长度检查
  checkMessage  方法主要是对主题的长度校验和消息属性的长度校验:  private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {         // 如果主题的长度大于最大值         if (msg.getTopic().length() > Byte.MAX_VALUE) {             log.warn("putMessage message topic length too long " + msg.getTopic().length());             return PutMessageStatus.MESSAGE_ILLEGAL;         }         // 如果消息属性长度大于最大值         if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {             log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());             return PutMessageStatus.MESSAGE_ILLEGAL;         }         return PutMessageStatus.PUT_OK;     } checkLmqMessage
  checkLmqMessage  主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量:  private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {         // 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量         if (msg.getProperties() != null             && StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))             && this.isLmqConsumeQueueNumExceeded()) {             return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;         }         return PutMessageStatus.PUT_OK;    }     private boolean isLmqConsumeQueueNumExceeded() {         // 开启了LMQ && 开启了多个队列分发 && 消费数量大于了限定值         if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch()             && this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {             return true;         }         return false;     } 消息写入
  对消息进行校验完毕之后,调用了 CommitLog  的asyncPutMessage  进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:首先对消息的相关属性进行了设置,主要包括以下内容 存储时间 消息内容的CRC校验和 如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识 获取当前线程绑定的PutMessageThreadLocal对象,里面有一个 MessageExtEncoder  类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner  的setEncodedBuff  方法将buffer设置到encodedBuff  中加锁,从 mappedFileQueue  中获取上一次使用的映射文件mappedFile  ,并更新消息的存储时间, 如果mappedFile  为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1G mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录 调用mappedFile  的appendMessage  方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback  类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中调用submitFlushRequest  方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘public class CommitLog {     // 所有mappedFile集合     protected final MappedFileQueue mappedFileQueue;          // ThreadLocal     private final ThreadLocal putMessageThreadLocal;     // 写入消息的回调函数     private final AppendMessageCallback appendMessageCallback;     public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数         //...         // 创建回调函数         this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());         //...     }        public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {         // 设置存储时间         msg.setStoreTimestamp(System.currentTimeMillis());         // 设置消息的CRC值         msg.setBodyCRC(UtilAll.crc32(msg.getBody()));         // 写入结果         AppendMessageResult result = null;         // 获取存储统计服务         StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();         // 获取主题         String topic = msg.getTopic();         // 获取事务类型         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {             // 省略事务相关处理         }         // 获取发送消息的主机地址         InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();         if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6             msg.setBornHostV6Flag(); // 设置IPV6标识         }         // 获取存储消息的主机地址         InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();         if (storeSocketAddress.getAddress() instanceof Inet6Address) {             msg.setStoreHostAddressV6Flag(); // 设置IPV6标识         }         // 获取当前线程绑定的PutMessageThreadLocal对象         PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();         // 调用encode方法对消息进行编码,并写入buffer         PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);         if (encodeResult != null) {             return CompletableFuture.completedFuture(encodeResult);         }         // 将存储编码消息的buffer设置到msg中         msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);         // 创建PutMessageContext         PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));         long elapsedTimeInLock = 0;         MappedFile unlockMappedFile = null;         // 加锁         putMessageLock.lock();          try {             // 获取上一次写入的文件             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();             // 获取系统时间戳             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();             this.beginTimeInLock = beginLockTimestamp;             // 再次更新存储时间戳,保证全局顺序             msg.setStoreTimestamp(beginLockTimestamp);             // 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件             if (null == mappedFile || mappedFile.isFull()) {                 // 使用偏移量0创建一个新的文件                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);             }             // 如果依旧为空             if (null == mappedFile) {                 // 提示错误                 log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));             }             // 写入消息             result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);             // ...              elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;         } finally {             beginTimeInLock = 0;             putMessageLock.unlock();         }         // ...         PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);          // 统计相关         storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);         storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());         // 执行刷盘         CompletableFuture flushResultFuture = submitFlushRequest(result, msg);         CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg);         return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {             if (flushStatus != PutMessageStatus.PUT_OK) {                 putMessageResult.setPutMessageStatus(flushStatus);             }             if (replicaStatus != PutMessageStatus.PUT_OK) {                 putMessageResult.setPutMessageStatus(replicaStatus);             }             // 返回结果             return putMessageResult;         });     } } 写入内存Buffer编码消息
  MessageExtEncoder  是CommitLog  的一个内部类,它被CommitLog  的另外一个内部类PutMessageThreadLocal  所引用,ThreadLocal  一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:public class CommitLog {          // ThreadLocal     private final ThreadLocal putMessageThreadLocal;          // 添加消息的ThreadLocal对象     static class PutMessageThreadLocal {         private MessageExtEncoder encoder; // 引用MessageExtEncoder         private StringBuilder keyBuilder;         PutMessageThreadLocal(int size) {             // 创建MessageExtEncoder,size用来指定分配内存的大小             encoder = new MessageExtEncoder(size);             keyBuilder = new StringBuilder();         }         // ...     } }
  MessageExtEncoder  中使用了ByteBuffer  作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal  的构造函数中指定的,MessageExtEncoder的  encode方法中对消息进了编码并将数据写入分配的缓冲区:对消息属性数据的长度进行校验判断是否超过限定值 对总消息内容长度进行校验,判断是否超过最大的长度限制 根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域 将消息相关信息写入buffer: 写入消息长度写入魔数写入消息体CRC校验和写入队列ID写入标识队列的偏移量, 需要注意这里还没达到偏移量的值,先占位稍后写入文件的物理偏移量, 先占位稍后写入写入系统标识写入发送消息的时间戳写入发送消息的主机地址写入存储时间戳写入存储消息的主机地址RECONSUMETIMESPrepared Transaction Offset写入消息体长度和消息内容写入主题长度写入主题写入属性长度和属性内容 public class CommitLog {          // MessageExtEncoder     public static class MessageExtEncoder {         // 字节缓冲区,存储消息内容的buffer         private final ByteBuffer encoderBuffer;                  MessageExtEncoder(final int size) {             // 分配内存             this.encoderBuffer = ByteBuffer.allocateDirect(size);             this.maxMessageSize = size;         }         // 对消息进行编码并写入buffer         protected PutMessageResult encode(MessageExtBrokerInner msgInner) {              // 消息属性数据             final byte[] propertiesData =                     msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);             // 属性数据长度             final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;             // 校验长度是否超过最大值             if (propertiesLength > Short.MAX_VALUE) {                 log.warn("putMessage message properties length too long. length={}", propertiesData.length);                 return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);             }             // 获取主题数据             final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);             final int topicLength = topicData.length;// 主题数据长度             // 获取消息体内容长度             final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;             // 总消息内容长度             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);              // 是否超过最大长度限制             if (msgLen > this.maxMessageSize) {                 CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength                         + ", maxMessageSize: " + this.maxMessageSize);                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);             }              // 初始化             this.resetByteBuffer(encoderBuffer, msgLen);             // 1 写入消息长度             this.encoderBuffer.putInt(msgLen);             // 2 写入魔数             this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);             // 3 写入消息体CRC校验和             this.encoderBuffer.putInt(msgInner.getBodyCRC());             // 4 写入队列ID             this.encoderBuffer.putInt(msgInner.getQueueId());             // 5 写入标识             this.encoderBuffer.putInt(msgInner.getFlag());             // 6 队列的偏移量, 稍后写入             this.encoderBuffer.putLong(0);             // 7 文件的物理偏移量, 稍后写入             this.encoderBuffer.putLong(0);             // 8 写入系统标识             this.encoderBuffer.putInt(msgInner.getSysFlag());             // 9 写入发送消息的时间戳             this.encoderBuffer.putLong(msgInner.getBornTimestamp());             // 10 写入发送消息的主机地址             socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);             // 11 写入存储时间戳             this.encoderBuffer.putLong(msgInner.getStoreTimestamp());             // 12 写入存储消息的主机地址             socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);             // 13 RECONSUMETIMES             this.encoderBuffer.putInt(msgInner.getReconsumeTimes());             // 14 Prepared Transaction Offset             this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());             // 15 写入消息体长度             this.encoderBuffer.putInt(bodyLength);             if (bodyLength > 0)                 this.encoderBuffer.put(msgInner.getBody());// 写入消息内容             // 16 写入主题长度             this.encoderBuffer.put((byte) topicLength);             // 写入主题             this.encoderBuffer.put(topicData);             // 17 写入属性长度             this.encoderBuffer.putShort((short) propertiesLength);             if (propertiesLength > 0)                 this.encoderBuffer.put(propertiesData); // 写入属性数据             encoderBuffer.flip();             return null;         }     } } 写入内存映射文件
  前面提到 MappedFile  可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:
  ByteBuffer :字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。
  MappedByteBuffer :是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。
  MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:
  第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。
  第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。
  综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。 public class MappedFile extends ReferenceResource {     // 记录文件的写入位置     protected final AtomicInteger wrotePosition = new AtomicInteger(0);     // 文件大小     protected int fileSize;     // 字节buffer     protected ByteBuffer writeBuffer = null;     // 文件映射     private MappedByteBuffer mappedByteBuffer;     // 暂存池,类似线程池,只不过池中存放的是申请的内存     protected TransientStorePool transientStorePool = null;     // 初始化     public void init(final String fileName, final int fileSize,         final TransientStorePool transientStorePool) throws IOException {         init(fileName, fileSize);         // 从暂存池中获取一块内存         this.writeBuffer = transientStorePool.borrowBuffer();         this.transientStorePool = transientStorePool;     }          // 初始化     private void init(final String fileName, final int fileSize) throws IOException {         // ...         try {             // 获取文件             this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();             // 进行文件映射             this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);             TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);             TOTAL_MAPPED_FILES.incrementAndGet();             ok = true;         } catch (FileNotFoundException e) {             // ...         } catch (IOException e) {             // ...         } finally {             if (!ok && this.fileChannel != null) {                 this.fileChannel.close();             }         }     } }  // 暂存池 public class TransientStorePool {     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);      private final int poolSize; // 暂存池大小     private final int fileSize; // 申请的每一块内存大小     private final Deque availableBuffers; // 双端队列,存放申请的内存     private final MessageStoreConfig storeConfig; // 存储配置      public TransientStorePool(final MessageStoreConfig storeConfig) {         this.storeConfig = storeConfig;         this.poolSize = storeConfig.getTransientStorePoolSize();         this.fileSize = storeConfig.getMappedFileSizeCommitLog();         this.availableBuffers = new ConcurrentLinkedDeque<>();     }      /**      * 初始化      */     public void init() {         // 根据暂存池大小申请内存         for (int i = 0; i < poolSize; i++) {             // 申请直接内存             ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);             final long address = ((DirectBuffer) byteBuffer).address();             Pointer pointer = new Pointer(address);             LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));             // 放入到暂存池中             availableBuffers.offer(byteBuffer);         }     } }
  经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的 appendMessagesInner  方法将内存中的内容写入映射文件,处理逻辑如下:MappedFile  中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的 slice  方法创建一个与MappedFile  共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中调用回调函数的doAppend方法进行写入,前面可知回调函数是 DefaultAppendMessageCallback  类型的更新 MappedFile  写入位置,返回写入结果public class MappedFile extends ReferenceResource {     // 记录文件的写入位置     protected final AtomicInteger wrotePosition = new AtomicInteger(0);     // 文件大小     protected int fileSize;     // 字节buffer     protected ByteBuffer writeBuffer = null;     // 文件映射     private MappedByteBuffer mappedByteBuffer;     // 写入消息     public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,             PutMessageContext putMessageContext) {         // 调用appendMessagesInner         return appendMessagesInner(msg, cb, putMessageContext);     }          public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,             PutMessageContext putMessageContext) {         assert messageExt != null;         assert cb != null;         // 获取写入位置         int currentPos = this.wrotePosition.get();         // 如果写指针小于文件大小         if (currentPos < this.fileSize) {             // 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer             ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();             // 设置共享内存区的写入位置             byteBuffer.position(currentPos);             AppendMessageResult result;             if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理                 // 通过共享内存区byteBuffer写入数据                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,                         (MessageExtBrokerInner) messageExt, putMessageContext);             } else if (messageExt instanceof MessageExtBatch) { // 批量消息                 // 通过共享内存区byteBuffer写入数据                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,                         (MessageExtBatch) messageExt, putMessageContext);             } else {                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);             }             // 更新MappedFile的写入位置             this.wrotePosition.addAndGet(result.getWroteBytes());             this.storeTimestamp = result.getStoreTimestamp();             return result;         }         log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);     } }
  进入到 DefaultAppendMessageCallback  的doAppend  方法中,首先来看方法的入参:fileFromOffset:文件的起始位置偏移量 byteBuffer:缓冲区,也就是上一步中创建的共享内存区 maxBlank:上一步中可知传入的是文件总大小减去当前要写入的位置,也就是文件剩余空间大小 msgInner:消息内容的封装体 putMessageContext:消息写入上下文
  方法的处理逻辑如下: 计算文件要写入位置偏移量:文件起始位置偏移量 + 准备写入位置的偏移量 从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中 从msgInner中获取之前已经写入到内存的消息数据 preEncodeBuffer  ,并获取消息内容的长度校验是否有足够的空间写入数据,如果消息长度 +  END_FILE_MIN_BLANK_LENGTH  (预留空间大小) 大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为END_OF_FILE  (超过文件大小)。这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。 计算队列偏移量在preEncodeBuffer  中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,**此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。 **
  消息写入结果 PUT_OK:写入成功; END_OF_FILE:超过文件大小; MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度: PROPERTIES_SIZE_EXCEEDED:消息、属性超过最大允许长度; UNKNOWN_ERROR:未知异常; public class CommitLog {     class DefaultAppendMessageCallback implements AppendMessageCallback {         // 预留空间大小,8个字节         private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,             final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {             // 计算写入位置物理偏移量:文件起始位置 + 准备写入位置的偏移量             long wroteOffset = fileFromOffset + byteBuffer.position();              Supplier msgIdSupplier = () -> {                 int sysflag = msgInner.getSysFlag();                 int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;                 ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);                 MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);                 msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer                 msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);                 return UtilAll.bytes2string(msgIdBuffer.array());             };              // 获取消息队列信息             String key = putMessageContext.getTopicQueueTableKey();             // 从主题队列路由表中获取队列偏移量             Long queueOffset = CommitLog.this.topicQueueTable.get(key);             // 如果偏移量为空             if (null == queueOffset) {                 queueOffset = 0L; // 初始化为0                 // 添加到路由表中                 CommitLog.this.topicQueueTable.put(key, queueOffset);             }              boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);             if (!multiDispatchWrapResult) {                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);             }              // 如果开启事务需要特殊处理             final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());             // ...             // 获取之前已经写入到buffer的消息数据             ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();             // 获取数据长度             final int msgLen = preEncodeBuffer.getInt(0);              // 校验是否有足够的空间写入数据,如果消息长度 + 预留空间大小 大于最大值             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {                 this.msgStoreItemMemory.clear();                 // 1 设置文件大小                 this.msgStoreItemMemory.putInt(maxBlank);                 // 2 写入魔数                 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);                 // 开始时间                 final long beginTimeMills = CommitLog.this.defaultMessageStore.now();                 // 将文件大小和魔数写入buffer                 byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);                 // 返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为END_OF_FILE                 return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,                         maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */                         msgIdSupplier, msgInner.getStoreTimestamp(),                         queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);             }             // 计算队列偏移量的位置             int pos = 4 + 4 + 4 + 4 + 4;             // 6 写入队列偏移量             preEncodeBuffer.putLong(pos, queueOffset);             pos += 8;             // 7 写入物理偏移量             preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());             int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;             // 8 系统标识, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP             pos += 8 + 4 + 8 + ipLen; // 计算存储时间戳的写入位置             // 更新新存储时间戳             preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());               final long beginTimeMills = CommitLog.this.defaultMessageStore.now();             // 将preEncodeBuffer的数据写入byteBuffer             byteBuffer.put(preEncodeBuffer);             // 清空buffer             msgInner.setEncodedBuff(null);             // 设置返回结果             AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);              switch (tranType) {                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                     break;                 case MessageSysFlag.TRANSACTION_NOT_TYPE:                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:                     // The next update ConsumeQueue information                     CommitLog.this.topicQueueTable.put(key, ++queueOffset);                     CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);                     break;                 default:                     break;             }             return result;         }     } } 刷盘
  由于篇幅原因,刷盘机制将另写一篇文章。
  总结
  参考
  丁威、周继锋《RocketMQ技术内幕》
  https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md
  RocketMQ版本:4.9.3

千年等一回!11月19日,1000年来时间最长月偏食,北美出现速报,喜欢天文观测的小伙伴们注意啦!大家将在11。19号目睹一场罕见的月偏食这次月偏食是2021年的第二次月偏食同时还将是1000年来持续时间最长的月偏食在北美阿拉斯加澳大利亚东部11月19日傍晚月偏食极佳观测时间11月19日月偏食1400半影食始1518初亏1703食甚1847复圆2006半影食终。北京时间11月19日将发生月偏食(此次月偏食的沙罗周期为第126号),包括美洲北欧亚洲东部澳资深班主任提醒以下性格是孩子不成才的预兆,家长及时纠正文好孕姐都说性格决定命运,从成年人的角度来说,这倒是实话。就我的切身体会而言,身边太多的人因为性格问题在事业上停滞不前,有抓不住机遇的,有遇上挑战第一反应退缩的,最后的结局无一不是曾嘉一个站在杨幂身后的厉害角色文晚唐提到杨幂,可以说是无人不知,无人不晓。2015年,与尚世影业签署3。1亿对赌协议。2017年,仅仅3年时间,杨幂团队就完成了税后净利润3。1亿元。同时,团队核心成员与核心艺人没颜值没演技,还能当主角?娱乐圈关系户的下限到底在哪里?上篇众所周知,娱乐圈就是一个资本控制的圈子,在这个圈子里,资本就是后台,资源就是一切,就算是一个颜值演技顶级的明星,没有资本捧着,也火不了太久,所以娱乐圈中关系户,可以说是遍地都是。但刚释放了孟晚舟,加拿大就又开始对华找事!不惜毁掉国家的前程?随着中美博弈日益激烈,由美国主导的情报共享组织五眼联盟也异常活跃。除了分享很多见不得光的情报,白宫方面还不断裹挟其余四国,要求共同排挤打压包括华为在内的中国高科技企业。众所周知,不1488元起!就刚刚,华为一口气发布了六款新品时间来到了11月17日的晚上七点半,而华为全场景智慧生活新品发布会也如期与我们见面,而此次发布会华为为我们带来了多款产品,随我一起去看看吧!第一款产品,华为WatchGT3。此次华一如,曾经你口中那也许的也许我一直觉得我的文字有着无法言说的悲伤,就好像它那伤痕累累的精灵,一下子变得忧伤而美丽起来,惆怅而凄迷。我曾经看过它多么悲伤的样子,也许我的情感对于它来说也起着灭绝性的作用,一如那漆上海光源超导波荡器样机带束测试获得成功版权归原作者所有,如有侵权,请联系我们近日,上海光源自主研发的我国首台超导波荡器样机完成了储存环上的大流强带束测试,这表明我国已掌握超导波荡器研制的关键技术,并取得了重要的实质性进在航天领域中,核聚变担任什么角色?科幻网11月17日讯(朱曦薇)1995年12月7日,NASA的一个航天探测器进入木星的大气层,并立即开始燃烧。现在,在飞行了8000万英里之后,它准备对太阳系最大行星周围的厚厚的氢科学家发现暗物质的存在,实现量子力学的事实?天文学家阿尔伯特爱因斯坦宇宙存在早有发现的暗物质,天外星空,带来了量子力学的引力,科学家已经发现宇宙的暗物质,它们早已存在整个星系范围,探索知道人类会带来以往引力,相信科学家研究发
自己救赎自己的句子1。生活没这么复杂,你敢试,世界敢回答。2。生活就是在无光的日子里使劲熬,那些没有结果的事最终都要学会放弃。3。所有快乐都向你靠拢,所有好运都在路上。4。我们给了生活多少耕耘,生活三伏天将至,要为家人抓住养阳3宝,养好元气不受苦,别大意老话说冷在三九热在三伏,三伏天是一年中气温最高且又潮湿闷热的阶段,持续的时间比较长。入伏后空气湿度大,再加上持续的高温,人体异常的不适。动一动就会汗流浃背的,恨不得整天呆在空调房里哪些症状表明月子没坐好?建议收藏怀孕的时候妈妈们都会认为分娩是最可怕的事情但是宝宝出生之后还有很多难关在等着妈妈包括身体恢复给宝宝哺乳等等以下几种症状就比较常见,如果产后妈妈不幸遇到,一定不要自己傻傻忍着,会对身5百多度选择镜片,依路视A31。60和明月1。71哪个性价比高一些?没有绝对高性价比的镜片,适合自己的就是最好的。就算是超级牛的镜片,价格很高,也可以称之为不好。就算普通镜片,价格很友好,也可以称之为好,因为适合预算低的人群。这两者之间性价比肯定是一梦江湖关山绝美画作流出?神仙太太妙手绘出侠骨柔情自从一梦江湖全服大件事关山怒开启后,关山就凭借着帅气的外表和热血的剧情故事受到了很多少侠的喜欢,人气非常之高。近期,关山开放了转职系统,有不少的少侠已经正式加入到了关山门下,还有才我行天下游关山2019年9月,我曾和老公一起去辉县关山国家地质公园游玩,这次带母亲去关山,一是因为那里的游客少,二是因为景区不大,有山有水,能看到太行山的风景。8月1号上午,我开车进市区,先接上下馆子,少点这5道脏菜,厨师从不做给自己吃,但很多人爱吃下馆子,少点这5道脏菜,厨师从不做给自己吃,但很多人爱吃如今生活水平高了,很多人都喜欢去饭店吃饭,尤其是过年过节,不仅味道好,而且上档次,大家也吃得开心。虽然饭店的饭菜好吃,但并不iOS16更新,日历终于支持显示节假日是否休息了iOS16第七个开发者测试版迎来更新,此时距离上一个测试版发布隔了8天。而目前距离iPhone14系列新机发布已经不到半个月的时间了,预计在这段时间里,可能还会有一次iOS16开发金爵体育NBA资讯湖人交易威少的最后一步!美媒曝光3大潜在下家厄文(KyrieIrving)确定留在篮网,老控卫贝弗利(PatrickBeverley)交易来后,湖人送走威斯布鲁克(RussellWestbrook)的前景。NBA记者Eric2号碎片商城轮换,貂蝉限定返场,司空震哭了,备好5w电竞豆巨赚文丨可儿游戏说原创下个月2号王者荣耀会迎来一次更新,这次更新的同时,皮肤碎片商城也会跟着一起更新,这次皮肤碎片商城更新之后,将会上线戈娅的伴生皮肤,戈娅这个英雄的伴生皮肤确实很快就暗黑破坏神不朽,看看这两天刷的橙装,能有几件能用的两天刷的,纯手动刷的,共36件,666护身符分解眼花缭乱三个剪头的好少,还好,有一个。鉴定完了,就这几件,其他的不是提取就是分解。副本给的项链,终于刷到了。这个是精华给的两件抠鼻有