范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQ源码分析之consumer消息拉取PullMessageProcessor

  #头条创作挑战赛#一、前言
  当consumer向broker发起RequestCode.PULL_MESSAGE消息拉取请求时,broker是怎么处理的呢?带着这个疑问,就开始分析消息拉取过程中broker端的处理流程;
  从BrokerController的registerProcessor方法注册的事件处理器以及我们之前分析的BrokerController核心组件来看,consumer消息拉取处理组件为:PullMessageProcessor;
  二、源码导读类继承关系及构造方法;处理请求方法;处理请求方法涉及到的核心值对象;从messageStore中拉取到具体的消息;
  注:处理请求方法中延迟执行拉取消息逻辑是通过拉取消息长轮询挂起服务 PullRequestHoldService进行实现的,这个会放在下一篇进行分析;三、源码分析1、类继承关系及构造方法// 拉取消息处理组件 public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);      private final BrokerController brokerController;     // 消费消息回调钩子     private List consumeMessageHookList;      public PullMessageProcessor(final BrokerController brokerController) {         this.brokerController = brokerController;     } }
  2、处理请求方法// netty网络服务器收到的拉取消息请求的处理组件 @Override public RemotingCommand processRequest(         final ChannelHandlerContext ctx,         RemotingCommand request) throws RemotingCommandException {     return this.processRequest(ctx.channel(), request, true); }
  RocketMQ作者将这个方法写的比较长,建议大家写的时候过长的方法还是拆分一下;处理请求方法里面最重要的逻辑就是找消息存储组件查询到这一次要拉取的消息org.apache.rocketmq.store.DefaultMessageStore#getMessage ;这个方法是MessageStore 消息存储组件中的,这个后续会进行详细分析,这一篇就只分析getMessage方法
  我们先以processRequest作为入口进行分析:Channel 结合NIO中的Channel概念,进行理解 RemotingCommand 传入的请求参数,这个在之前的分析中提到过 brokerAllowSuspend 是否允许被挂起,也就是是否允许在未找到消息的时候,暂时挂起处理线程,第一次传入的参数默认为true。 // 处理拉取消息请求 private RemotingCommand processRequest(         final Channel channel,         RemotingCommand request,         boolean brokerAllowSuspend) throws RemotingCommandException {     final long beginTimeMills = this.brokerController.getMessageStore().now();     RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);      final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();     final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(             PullMessageRequestHeader.class);      response.setOpaque(request.getOpaque());      log.debug("receive PullMessage request command, {}", request);      // 校验当前 broker 是否可读     if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {         response.setCode(ResponseCode.NO_PERMISSION);         response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));         return response;     }      // 查询或者自动创建 consumerGroup 对应的 subscriptionGroupConfig     // 保存subscriptionGroupConfig信息到UsersxxxstoreconfigsubscriptionGroup.json中     SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager()             .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());     if (null == subscriptionGroupConfig) {         response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);         response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));         return response;     }      if (!subscriptionGroupConfig.isConsumeEnable()) {         response.setCode(ResponseCode.NO_PERMISSION);         response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());         return response;     }      // 如果没有消息时,是否在 broker 端挂起等待,默认true     final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());     // 是否消息拉取时就提交 offset     final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());     // 请求头是否包含订阅信息     final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());     // 获取挂起超时     final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;      // 当前这个请求是要拉取哪个topic的数据     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(             requestHeader.getTopic());     if (null == topicConfig) {         log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));         response.setCode(ResponseCode.TOPIC_NOT_EXIST);         response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));         return response;     }      // 判断读的权限     if (!PermName.isReadable(topicConfig.getPerm())) {         response.setCode(ResponseCode.NO_PERMISSION);         response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");         return response;     }      // 判断 queueId 是否超过读取队列数     if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {         String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",             requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());         log.warn(errorInfo);         response.setCode(ResponseCode.SYSTEM_ERROR);         response.setRemark(errorInfo);         return response;     }      // 第一个是拿消费组对这个topic的订阅数据     SubscriptionData subscriptionData = null;     // 消费者过滤数据     ConsumerFilterData consumerFilterData = null;      // 如果说有一个订阅标识     if (hasSubscriptionFlag) {         try {             // 针对某个topic,去进行一定的订阅,支持某一种表达式类型             subscriptionData = FilterAPI.build(                     requestHeader.getTopic(),                     requestHeader.getSubscription(),                     requestHeader.getExpressionType()             );              // 如果说过滤表达式的类型不是tag标签             if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {                 // 构建针对topic,消费组做一个订阅,我的表达式类型,子版本号                 consumerFilterData = ConsumerFilterManager.build(                         requestHeader.getTopic(),                         requestHeader.getConsumerGroup(),                         requestHeader.getSubscription(),                         requestHeader.getExpressionType(),                         requestHeader.getSubVersion()                 );                 assert consumerFilterData != null;             }         } catch (Exception e) {             log.warn("Parse the consumer"s subscription[{}] failed, group: {}", requestHeader.getSubscription(),                 requestHeader.getConsumerGroup());             response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);             response.setRemark("parse the consumer"s subscription failed");             return response;         }     }     // 如果说没有订阅标识     else {         // 一开始你需要跟我建立一个连接,连接建立了以后消费组和消费者都会归consumer manager来管理         // 先获取到你的消费组的信息         ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(                 requestHeader.getConsumerGroup());         if (null == consumerGroupInfo) {             log.warn("the consumer"s group info not exist, group: {}", requestHeader.getConsumerGroup());             response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);             response.setRemark("the consumer"s group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));             return response;         }          if (!subscriptionGroupConfig.isConsumeBroadcastEnable()             && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {             response.setCode(ResponseCode.NO_PERMISSION);             response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");             return response;         }          // 从我的消费组订阅的topic里的订阅数据获取出来         subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());         if (null == subscriptionData) {             log.warn("the consumer"s subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());             response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);             response.setRemark("the consumer"s subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));             return response;         }          if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {             log.warn("The broker"s subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),                 subscriptionData.getSubString());             response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);             response.setRemark("the consumer"s subscription not latest");             return response;         }         if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {             consumerFilterData = this.brokerController.getConsumerFilterManager().get(                     requestHeader.getTopic(),                     requestHeader.getConsumerGroup()             );             if (consumerFilterData == null) {                 response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);                 response.setRemark("The broker"s consumer filter data is not exist!Your expression may be wrong!");                 return response;             }             if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {                 log.warn("The broker"s consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",                     requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());                 response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);                 response.setRemark("the consumer"s consumer filter data not latest");                 return response;             }         }     }     // 校验表达式类型     if (!ExpressionType.isTagType(subscriptionData.getExpressionType())         && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {         response.setCode(ResponseCode.SYSTEM_ERROR);         response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());         return response;     }      // 构建出来一个消息过滤器     MessageFilter messageFilter;     // 重试消息是否启用过滤     if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {         messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,             this.brokerController.getConsumerFilterManager());     } else {         messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,             this.brokerController.getConsumerFilterManager());     }      // 找消息存储组件查询到这一次我要拉取的消息     final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(             requestHeader.getConsumerGroup(),   // 消费组,谁             requestHeader.getTopic(),           // topic,对哪个topic             requestHeader.getQueueId(),         // queueId,对topic里的哪个queueId             requestHeader.getQueueOffset(),     // queue偏移量,从哪个queue的offset偏移量开始             requestHeader.getMaxMsgNums(),      // 最大的消息数量,拉取多少条数据             messageFilter                       // 消息过滤器,如何过滤数据     );      // 如果说拉取到了数据以后     if (getMessageResult != null) {         response.setRemark(getMessageResult.getStatus().name());         responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());         responseHeader.setMinOffset(getMessageResult.getMinOffset());         responseHeader.setMaxOffset(getMessageResult.getMaxOffset());          // 拉取消息是有一个读写分离的概念,正常情况下,我写入和拉取消息都是针对master节点来的         // 有可能master节点负载和压力很大,我就会建议你从slave节点来拉取         if (getMessageResult.isSuggestPullingFromSlave()) {             // 设置SuggestWhichBrokerId为1             responseHeader.setSuggestWhichBrokerId(                     subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()             );         } else {             responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);         }          switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {             case ASYNC_MASTER:             case SYNC_MASTER:                 break;             // 如果本机是slave角色             case SLAVE:                 // 并且不允许从slave获取数据                 if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {                     response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);                     responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);                 }                 break;         }         // 设置建议读取消息的节点         if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {             // consume too slow ,redirect to another machine             // 消费太慢,指向从节点             if (getMessageResult.isSuggestPullingFromSlave()) {                 responseHeader.setSuggestWhichBrokerId(                         subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()                 );             }             // consume ok             else {                 responseHeader.setSuggestWhichBrokerId(                         subscriptionGroupConfig.getBrokerId()                 );             }         } else {             responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);         }          switch (getMessageResult.getStatus()) {             case FOUND:                 response.setCode(ResponseCode.SUCCESS);                 break;             case MESSAGE_WAS_REMOVING:                 response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);                 break;             case NO_MATCHED_LOGIC_QUEUE:             case NO_MESSAGE_IN_QUEUE:                 if (0 != requestHeader.getQueueOffset()) {                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);                      // XXX: warn and notify me                     log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",                         requestHeader.getQueueOffset(),                         getMessageResult.getNextBeginOffset(),                         requestHeader.getTopic(),                         requestHeader.getQueueId(),                         requestHeader.getConsumerGroup()                     );                 } else {                     response.setCode(ResponseCode.PULL_NOT_FOUND);                 }                 break;             case NO_MATCHED_MESSAGE:                 response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);                 break;             case OFFSET_FOUND_NULL:                 response.setCode(ResponseCode.PULL_NOT_FOUND);                 break;             case OFFSET_OVERFLOW_BADLY:                 response.setCode(ResponseCode.PULL_OFFSET_MOVED);                 // XXX: warn and notify me                 log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",                     requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());                 break;             case OFFSET_OVERFLOW_ONE:                 response.setCode(ResponseCode.PULL_NOT_FOUND);                 break;             case OFFSET_TOO_SMALL:                 response.setCode(ResponseCode.PULL_OFFSET_MOVED);                 log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",                     requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),                     getMessageResult.getMinOffset(), channel.remoteAddress());                 break;             default:                 assert false;                 break;         }         // 执行消费消息的hook函数         if (this.hasConsumeMessageHook()) {             ConsumeMessageContext context = new ConsumeMessageContext();             context.setConsumerGroup(requestHeader.getConsumerGroup());             context.setTopic(requestHeader.getTopic());             context.setQueueId(requestHeader.getQueueId());              String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);              switch (response.getCode()) {                 case ResponseCode.SUCCESS:                     int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();                     int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;                      context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);                     context.setCommercialRcvTimes(incValue);                     context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());                     context.setCommercialOwner(owner);                      break;                 case ResponseCode.PULL_NOT_FOUND:                     if (!brokerAllowSuspend) {                          context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);                         context.setCommercialRcvTimes(1);                         context.setCommercialOwner(owner);                      }                     break;                 case ResponseCode.PULL_RETRY_IMMEDIATELY:                 case ResponseCode.PULL_OFFSET_MOVED:                     context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);                     context.setCommercialRcvTimes(1);                     context.setCommercialOwner(owner);                     break;                 default:                     assert false;                     break;             } 						           	// 执行消费消息回调钩子             this.executeConsumeMessageHookBefore(context);         }          switch (response.getCode()) {             // 请求成功             case ResponseCode.SUCCESS:                 // 数据统计                 this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),                     getMessageResult.getMessageCount());                  this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),                     getMessageResult.getBufferTotalSize());                  this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());                 // 从内存发送数据                 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {                     final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());                     this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),                         requestHeader.getTopic(), requestHeader.getQueueId(),                         (int) (this.brokerController.getMessageStore().now() - beginTimeMills));                     response.setBody(r);                 }                 // 从磁盘发送数据                 else {                     try {                         FileRegion fileRegion =                             new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);                         channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {                             @Override                             public void operationComplete(ChannelFuture future) throws Exception {                                 getMessageResult.release();                                 if (!future.isSuccess()) {                                     log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());                                 }                             }                         });                     } catch (Throwable e) {                         log.error("transfer many message by pagecache exception", e);                         getMessageResult.release();                     }                      response = null;                 }                 break;             case ResponseCode.PULL_NOT_FOUND:                 // hasSuspendFlag, 构建消息拉取时的拉取标记,默认为true                 if (brokerAllowSuspend && hasSuspendFlag) {                     // 取自 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis属性                     long pollingTimeMills = suspendTimeoutMillisLong;                     // 如果不支持长轮询,则忽略 brokerSuspendMaxTimeMillis 属性,                     // 使用 shortPollingTimeMills,默认为1000ms作为下一次拉取消息的等待时间                     if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {                         pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();                     }                      String topic = requestHeader.getTopic();                     long offset = requestHeader.getQueueOffset();                     int queueId = requestHeader.getQueueId();                     // 创建 PullRequest, 然后提交给 PullRequestHoldService 线程去调度,触发消息拉取                     PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,                         this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);                     // pullRequestHoldService 定时线程,最大延迟5秒判断是否有消息到达,然后执行消息的拉取                     this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);                     // 设置response=null,则此时此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态                     response = null;                     break;                 }              case ResponseCode.PULL_RETRY_IMMEDIATELY:                 break;             case ResponseCode.PULL_OFFSET_MOVED:                 if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE                     || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {                     MessageQueue mq = new MessageQueue();                     mq.setTopic(requestHeader.getTopic());                     mq.setQueueId(requestHeader.getQueueId());                     mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());                      OffsetMovedEvent event = new OffsetMovedEvent();                     event.setConsumerGroup(requestHeader.getConsumerGroup());                     event.setMessageQueue(mq);                     event.setOffsetRequest(requestHeader.getQueueOffset());                     event.setOffsetNew(getMessageResult.getNextBeginOffset());                     this.generateOffsetMovedEvent(event);                     log.warn(                         "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",                         requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),                         responseHeader.getSuggestWhichBrokerId());                 } else {                     responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());                     response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);                     log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",                         requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),                         responseHeader.getSuggestWhichBrokerId());                 }                  break;             default:                 assert false;         }     } else {         response.setCode(ResponseCode.SYSTEM_ERROR);         response.setRemark("store getMessage return null");     }      boolean storeOffsetEnable = brokerAllowSuspend;     storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;     storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;      // 基于broker来存储你最新提交的offset偏移量     if (storeOffsetEnable) {         // 提交offset,保存在内存中ConsumerOffsetManager.offsetTable中         this.brokerController.getConsumerOffsetManager().commitOffset(                 RemotingHelper.parseChannelRemoteAddr(channel),                 requestHeader.getConsumerGroup(),                 requestHeader.getTopic(),                 requestHeader.getQueueId(),                 requestHeader.getCommitOffset()         );     }      return response; }3、处理请求方法涉及到的核心值对象public class SubscriptionGroupConfig {      // 消费组名称     private String groupName;     // 是否启用消费     private boolean consumeEnable = true;     // 是否启用从最小偏移量开始消费     private boolean consumeFromMinEnable = true;     // 是否启用消费广播     private boolean consumeBroadcastEnable = true;     // 重试队列数量     private int retryQueueNums = 1;     // 重试最大次数     private int retryMaxTimes = 16;     // masterid     private long brokerId = MixAll.MASTER_ID;     // 慢消费的时候选用哪个broker     private long whichBrokerWhenConsumeSlowly = 1;     // 是否启用通知消费者ids变化     private boolean notifyConsumerIdsChangedEnable = true; }// 核心的topic元数据结构 public class TopicConfig {      private static final String SEPARATOR = " ";     public static int defaultReadQueueNums = 16; // 默认的topic是有16个read queue和write queue     public static int defaultWriteQueueNums = 16;      private String topicName; // 当前你的broker会告诉你的这个nameserver,我这里放了哪些topic的队列     private int readQueueNums = defaultReadQueueNums; // 我的这个broker机器对这个队列,放了多少个read queue和write queue     private int writeQueueNums = defaultWriteQueueNums;     private int perm = PermName.PERM_READ | PermName.PERM_WRITE;     private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; // 默认的topic过滤类型是基于tag     private int topicSysFlag = 0;     private boolean order = false;      public TopicConfig() {     } }// 消费组信息 public class ConsumerGroupInfo {      private static final InternalLogger log = InternalLoggerFactory.getLogger(             LoggerName.BROKER_LOGGER_NAME);      // 消费组名称     private final String groupName;     // 消费组订阅数据,这个消费组订阅了哪些topic     private final ConcurrentMap subscriptionTable =         new ConcurrentHashMap();     // 消费组跟broker之间的各个网络连接     private final ConcurrentMap channelInfoTable =         new ConcurrentHashMap(16);     // 消费类型,pull模型还是push模型     private volatile ConsumeType consumeType;     // 消息模型,集群模式还是广播模式     private volatile MessageModel messageModel;     // 从哪里开始消费的策略     private volatile ConsumeFromWhere consumeFromWhere;     // 最近一次更新时间戳     private volatile long lastUpdateTimestamp = System.currentTimeMillis();      public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,         ConsumeFromWhere consumeFromWhere) {         this.groupName = groupName;         this.consumeType = consumeType;         this.messageModel = messageModel;         this.consumeFromWhere = consumeFromWhere;     } }public class SubscriptionData implements Comparable {     public final static String SUB_ALL = "*";      // 是否启用类过滤模式     private boolean classFilterMode = false;     // topic主题     private String topic;     // 子字符串     private String subString;     // tags     private Set tagsSet = new HashSet();     // code     private Set codeSet = new HashSet();     private long subVersion = System.currentTimeMillis();     private String expressionType = ExpressionType.TAG;      @JSONField(serialize = false)     private String filterClassSource;      public SubscriptionData() {      } }// 消费者客户端网络连接信息 public class ClientChannelInfo {      // 消费者客户端网络连接     private final Channel channel;     // 消费者客户端网络连接id     private final String clientId;     // 编程语言code     private final LanguageCode language;     // 版本号     private final int version;     // 最近一次更新时间戳     private volatile long lastUpdateTimestamp = System.currentTimeMillis();      public ClientChannelInfo(Channel channel) {         this(channel, null, null, 0);     }      public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {         this.channel = channel;         this.clientId = clientId;         this.language = language;         this.version = version;     } }4、从messageStore中拉取到具体的消息
  根据配置实例化messageFilter,并且从messageStore中拉取到具体的消息。可以看到这里就是方法的核心,接下来继续看getMessage是如何拉取到消息的 public GetMessageResult getMessage(         final String group,         final String topic,         final int queueId,         final long offset,         final int maxMsgNums,         final MessageFilter messageFilter) {     // 判断 store 是否关闭     if (this.shutdown) {         log.warn("message store has shutdown, so getMessage is forbidden");         return null;     }     // 判断当前运行状态是否可读     if (!this.runningFlags.isReadable()) {         log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());         return null;     }      if (MixAll.isLmq(topic) && this.isLmqConsumeQueueNumExceeded()) {         log.warn("message store is not available, broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num");         return null;     }      long beginTime = this.getSystemClock().now();      GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;     // 待查找队列的偏移量     long nextBeginOffset = offset;     // 当前队列的最小偏移量     long minOffset = 0;     // 当前队列的最大偏移量     long maxOffset = 0;      // lazy init when find msg.     GetMessageResult getResult = null;     // 当前commitLog的最大偏移量     final long maxOffsetPy = this.commitLog.getMaxOffset();      // 先获取到这个queueId对应的一个consumequeue     ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);     if (consumeQueue != null) {         // 获取 ConsumeQueue 的最小逻辑 offset         minOffset = consumeQueue.getMinOffsetInQueue();         // 最大逻辑offset         maxOffset = consumeQueue.getMaxOffsetInQueue();         // 消息队列无数据         if (maxOffset == 0) {             status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;             nextBeginOffset = nextOffsetCorrection(offset, 0);         }         // 查询的队列offset太小         else if (offset < minOffset) {             status = GetMessageStatus.OFFSET_TOO_SMALL;             nextBeginOffset = nextOffsetCorrection(offset, minOffset);         }         // 查询的offset溢出一个         else if (offset == maxOffset) {             status = GetMessageStatus.OFFSET_OVERFLOW_ONE;             nextBeginOffset = nextOffsetCorrection(offset, offset);         }         // 查询的队列offset过大         else if (offset > maxOffset) {             status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;             if (0 == minOffset) {                 nextBeginOffset = nextOffsetCorrection(offset, minOffset);             } else {                 nextBeginOffset = nextOffsetCorrection(offset, maxOffset);             }         } else {             // 根据指定的offset可以去查找出一段consumequeue里面的数据             SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);             // 用于检测bufferConsumeQueue中每个offset对应的物理偏移量的commitLog数据是否存在             if (bufferConsumeQueue != null) {                 try {                     // 预先设置状态未NO_MATCHED_MESSAGE                     status = GetMessageStatus.NO_MATCHED_MESSAGE;                      long nextPhyFileStartOffset = Long.MIN_VALUE;                     long maxPhyOffsetPulling = 0;                      int i = 0;                     // 最多需要校验的消息条数                     final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);                     // 是否记录消费落后磁盘量                     final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();                      getResult = new GetMessageResult(maxMsgNums);                      ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();                     for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {                         // 对这段consumequeue里面的数据,每一条数据都是一个消息offset+sizes+tags                         // 读取第1-8个字节为物理偏移量offsetPy                         long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();                         // 第9-12个字节为消息大小sizePy                         int sizePy = bufferConsumeQueue.getByteBuffer().getInt();                         // 第13-16个字节为tagsCode                         long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();                          maxPhyOffsetPulling = offsetPy;                         // 表示上一轮的消息,未在commitLog获取到                         if (nextPhyFileStartOffset != Long.MIN_VALUE) {                             // 如果下一个消息起始offset大于当前的要获取的offsetPy                             if (offsetPy < nextPhyFileStartOffset)                                 continue;                         }                         // 通过最大偏移量-当前数据偏移量和内存大小作比较                         // 判断数据是否可以从内存获取                         boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);                         // 检查拉取的消息总大小是否到达上限,如果达到则中止这次消息拉取                         if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {                             break;                         }                          boolean extRet = false, isTagsCodeLegal = true;                         // 判断tagsCode是否小于等于0                         if (consumeQueue.isExtAddr(tagsCode)) {                             // 读取下一条消息,保存到cqExtUnit中                             extRet = consumeQueue.getExt(tagsCode, cqExtUnit);                             if (extRet) {                                 tagsCode = cqExtUnit.getTagsCode();                             } else {                                 // can"t find ext content.Client will filter messages by tag also.                                 log.error("[BUG] can"t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",                                     tagsCode, offsetPy, sizePy, topic, group);                                 isTagsCodeLegal = false;                             }                         }                          // 消息过滤,如果匹配不成功并且消息为空,暂时设置状态为NO_MATCHED_MESSAGE                         // 匹配成功有以下几种情况                         // SubscriptionData对象为空;                         // SubscriptionData.classFilterMode变量为true;                         // SubscriptionData对象的subString变量等于*;                         // SubscriptionData对象的codeSet集合包含tagsCode值;                         if (messageFilter != null                             && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {                             if (getResult.getBufferTotalSize() == 0) {                                 status = GetMessageStatus.NO_MATCHED_MESSAGE;                             }                              continue;                         }                          // 根据consumequeue里指定的消息物理便偏移量,去commitlog查询一条消息                         SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);                         if (null == selectResult) {                             if (getResult.getBufferTotalSize() == 0) {                                 status = GetMessageStatus.MESSAGE_WAS_REMOVING;                             }                             // 未读取到消息,表示对应的mappedFile已经删除,从下一个文件的起始位置开始读取消息                             nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);                             continue;                         }                         // 过滤真实的消息                         if (messageFilter != null                             && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {                             if (getResult.getBufferTotalSize() == 0) {                                 status = GetMessageStatus.NO_MATCHED_MESSAGE;                             }                             // release...                             selectResult.release();                             continue;                         }                         // 获取消息计数                         this.storeStatsService.getGetMessageTransferedMsgCount().add(1);                         // 他会把这条消息数据添加到结果里面去                         getResult.addMessage(selectResult);                         status = GetMessageStatus.FOUND;                         nextPhyFileStartOffset = Long.MIN_VALUE;                     }                      if (diskFallRecorded) {                         // 消费落后的数据量 = 最大offset - 此次消费的最大offset                         long fallBehind = maxOffsetPy - maxPhyOffsetPulling;                         // 记录消费落后的数据量                         brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);                     }                     // 下一个开始的offset                     nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                     long diff = maxOffsetPy - maxPhyOffsetPulling;                     // 内存大小                     long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE                         * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));                     // 当消费进度落后量大于物理内存时,建议调换到从库去处理读                     getResult.setSuggestPullingFromSlave(diff > memory);                 } finally {                      bufferConsumeQueue.release();                 }             } else {                 // 没找到消息                 status = GetMessageStatus.OFFSET_FOUND_NULL;                 // 设置从下一个mappedFile相同位置开始读取                 nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));                 log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "                     + maxOffset + ", but access logic queue failed.");             }         }     } else {         status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;         nextBeginOffset = nextOffsetCorrection(offset, 0);     }      // 统计拉取消息和未拉取到消息的次数     if (GetMessageStatus.FOUND == status) {         this.storeStatsService.getGetMessageTimesTotalFound().add(1);     } else {         this.storeStatsService.getGetMessageTimesTotalMiss().add(1);     }     long elapsedTime = this.getSystemClock().now() - beginTime;     this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);      // lazy init no data found.     if (getResult == null) {         getResult = new GetMessageResult(0);     }      getResult.setStatus(status);     getResult.setNextBeginOffset(nextBeginOffset);     getResult.setMaxOffset(maxOffset);     getResult.setMinOffset(minOffset);     return getResult; }四、总结前置判断;调用MessageStore的getMessage方法拉取消息;执行消费消息回调钩子;基于broker来存储你最新提交的offset偏移量;
  消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。
  注:下文会分析长轮询和短轮询;

约古宗列的两眼泉黄河笔记之一张中海一掬银盘似的泉水。大小好似月轮,深浅又像面盆,如果不仔细观察,几乎看不出它在涌动,只从泉眼周围草丛间溢出的水渍,才感觉到它的涌淌。确切地说,它的泉水不是涌出,而是溢出。这里是东西问短评中国为何设立文化和自然遗产日?(东西问)短评中国为何设立文化和自然遗产日?中新社北京6月11日电题中国为何设立文化和自然遗产日?作者徐雪莹6月11日是中国第六个文化和自然遗产日,今年主题为文物保护时代共进人民共这个国家首都,垃圾成山怎么回事?加德满都是尼泊尔的首都和最大城市,也是世界闻名的旅游胜地。不过如今,这座城市却面临垃圾堆积成山的困扰,居民的日常生活和身体健康受到严重影响。杜巴广场位于加德满都老城区,属于世界文化文化和自然遗产日青海大地历史遗珍星辉闪耀央广网西宁6月11日消息(记者汪晓青通讯员时奎元)在我国西北,青藏高原,在地域广袤的青海,这里有悠久的历史,文物类型多样,文化内涵丰富,资源得天独厚。这里坐拥不可移动文物6411处中国国家级非遗代表性项目达3600余,多如繁星的非遗项目你知道多少文旅融合之下,人们出游途中对于文化遗产的关注度越来越高。在6月11日中国迎来第17个文化和自然遗产日之际,多个旅游平台推出相关主题活动助力文化遗产推广。其中,同程旅行宣布推出文化遗河北省有多强?面积比山东河南大,人口7400多万,GDP超过4万亿我们祖国是一个幅员辽阔的国家,正是因为区域广阔,所以难免会出现发展不均衡的情况!例如有些城市的经济总量能抵得上西部一个省份!今天小编英格杰克给大家介绍一个很强的省,该省的土地面积比黄龙景区(九)山上的水泊,天上的镜子,一闪一烁,都来自于遥远的关怀。如果要问,遥远的是来自于天,那它就有云彩的影子如果又要问,遥远是来自于山峰,同样,它就有山顶上的风采。我终于可以临近到水池了,鹧鸪天。登铁寨鹧鸪天。登铁寨胜日寻芳金顶登,崎岖幽径走停停。古松虬曲入云际,怪石嶙峋作障屏。盘千载,裂几层。许之昔错补天庭。缓移脚步沿阶上,立在峰巅可摘星。鹧鸪天。铁寨林立奇形仙石家,千年阅尽世官宣!十大晋菜十大山西面食50种山西特色小吃你吃过几个?人世间,唯有爱与美食不可辜负!因为一道菜,爱上一座城。6月12日,由山西省商务厅联合山西省委宣传部山西省财政厅山西省文旅厅山西省农业农村厅山西省市场监管局主办的品鉴山西美食晋享山西走进户外!潭柘寺赵家台穿越清晨我们踏上征程,在我的好哥们零号的介绍指引下,我荣幸的空降到户外穿越的活动中这是一个团结温馨欢快和谐的团队,在领队一样哥的带领下有老孟零号风妹还有几个不知道姓名的朋友共同的鼓励和三华李上市啦!酸甜脆爽,一口一个够滋味时至仲夏经历了阳光的催长龙舟水的洗礼高明三华李上市啦街坊们可自驾采摘快跟随明仔去一探究竟志忠生态农庄在高明区更合镇蛟塘村志忠生态农庄三华李已经成熟可摘产量约1万斤放眼望去一颗颗饱满
利用人工智能改善心理健康未来治疗的新选择!心理健康问题越来越受到社会的关注,而人工智能技术的快速发展也为解决这个问题提供了新的途径。尽管聊天机器人如ChatGPT等人工智能平台可以提供一些心理健康支持,但它们绝非合格的医疗沙依巴克区将打造一处新能源汽车文旅园3月13日,在乌鲁木齐市2023年度招商引资大会上,沙依巴克区人民政府与乌鲁木齐众意盛达实业发展有限责任公司签约老满城新能源汽车文旅园项目,计划在老满城打造以新能源汽车交付中心为主数字经济板块技术分析特发信息,立昂技术,新国都中望软件一,特发信息概念算力信创公司立足于光通信主导产业,顺应行业趋势,逐渐发展形成线缆制造,光电制造,科技融合和智慧服务四个业务板块,紧扣产品服务的十四五规划新定位,围绕5G,大数据,云绝了!卡戴珊姐妹先后取出填充物,丰腴身姿不夸张更有韵味卡戴珊姐妹,算是把流量玩明白了。科勒卡戴珊与金卡戴珊,先后取出了她们的体内填充物,实现了身材大变样。众所周知,科勒卡戴珊与金卡戴珊的身材是5姐妹中最夸张的2个,两人都是沙漏型身材,到处都狂飙,一线大咖吴刚你可把俺带偏了!到处都狂飙,好评也如潮。俺的好奇心一下子被吊起来了!难道自己弃剧弃错了?一口气追到第七集,才知道自己真是有眼无珠啊!可也不能不怪罪大咖吴刚,是你可把俺带偏了!狂飙刚刚开播第一集的时吉娘娘心酸史18岁穿露胸装出场,哭着完成走秀,成就超级模特面对失业,一步45万,以巴西国宝的身份亮相里约奥运会。面对爱情,怒甩风流公子小李子,让其对她恋恋不忘。这就是来自于时尚界的传奇吉娘娘吉赛尔邦辰的压迫感,这位世界模特圈的顶级人物,巴陈国军婚内出轨刘晓庆,前妻和儿子陈赫的做法让他汗颜无地2015年,陈国军和前妻刘晓庆在节目明星同乐会上再次相遇。这是两人离婚25年后首次同框,也是他们撕破脸皮的第25年。曾记否,陈国军婚内出轨刘晓庆,抛下妻儿也要和刘晓庆在一起谁料刘晓被曝离婚的欧阳夏丹,初九与同学欢乐联谊,班里最大牌柴璐却缺席1月30号正月初九的傍晚,就在多数人穿梭在熙来攘往的车流人流中,奔向自己的幸福暖窝之际,欧阳夏丹也通过社交媒体,分享了她与传媒大学(她就读时该校叫北广)同学温暖而其乐融融的联谊之旅这个拥有高尚品格的女孩,最近红遍了朋友圈,人人都想认识她从今天起记录我的2023昨晚偶然间,听到了一段关于冯小刚与冯巩的故事。冯小刚在当导演之前,是一名美工师。在一次酒会上认识了冯巩,两人一见如故。冯小刚1990年生下了女儿,可女儿不幸日荷加入对华芯片限制短期形成冲击,中长期国产替代必将受益据彭博社报道,日本和荷兰将加入美国行列,建立共同联盟限制中国获得先进半导体设备。美日荷最快将于美国时间周五就新供应限制结束谈判,三国不打算公开管制内容,而是直接执行。去年10月美就ChatGPT未必能改写剽窃的定义,但肯定能推动高校考核方式变革刚刚读到新闻学生用ChatGPT拿下全班最高分,教授惊呆!全美高校打响AI反击战,这是我第二次读到关于ChatGPT的文章,之前只是一笑而过,这次认真了解了一下,一方面对这个新技术