范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

探索一下DefaultMQPushConsumer的实现原理及源码分析

  RocketMQ的前提回顾RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力 为什么使用RocketMQ强调集群无单点,可扩展,任意一点高可用、水平可扩展 海量消息堆积能力,消息堆积后写入低延迟 支持上万个队列 消息失败重试机制 消息可查询 开源社区活跃 成熟度已经经过淘宝双十一的考验 RocketMQ的发展变化
  RocketMQ开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用oceanBase作为持久化工具。
  在RocketMQ1.x和2.x使用zookeeper管理集群,3.x开始使用nameserver代替zk,更轻量级,此外RocketMQ的客户端拥有两种的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。 DefaultMQPushConsumer的Maven配置
  HTML     org.apache.rocketmq    rocketmq-client    4.3.0  DefaultMQPushConsumer使用示例CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
  以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
  Go public class MQPushConsumer {     public static void main(String[] args) throws MQClientException {         String groupName = "rocketMqGroup1";         // 用于把多个Consumer组织到一起,提高并发处理能力         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);         // 设置nameServer地址,多个以;分隔         consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);         consumer.setMessageModel(MessageModel.BROADCASTING);         // 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息         consumer.subscribe("order-topic", "*");         consumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List mgs,                     ConsumeConcurrentlyContext consumeconcurrentlycontext) {                 System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });         consumer.start();     } } CLUSTERING:默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 订阅topic整体,从而达到负载均衡的目的 BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
  ConsumeConcurrentlyStatus.RECONSUME_LATER boker会根据设置的messageDelayLevel发起重试,默认16次。
  DefaultMQPushConsumerImpl中各个对象的主要功能如下:
  RebalancePushImpl:主要负责决定,当前的consumer应该从哪些Queue中消费消息; 1)PullAPIWrapper:长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑; 2)ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息; 3)OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式; 4)MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成;
  consumer.registerMessageListener执行过程:
  Go /**      * Register a callback to execute on message arrival for concurrent consuming.      * @param messageListener message handling callback.      */     @Override     public void registerMessageListener(MessageListenerConcurrently messageListener) {         this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);     }
  通过源码可以看出主要实现过程在DefaultMQPushConsumerImpl类中consumer.start后调用DefaultMQPushConsumerImpl的同步start方法
  Go public synchronized void start() throws MQClientException {         switch (this.serviceState) {             case CREATE_JUST:                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());                 this.serviceState = ServiceState.START_FAILED;                 this.checkConfig();                 this.copySubscription();                 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {                     this.defaultMQPushConsumer.changeInstanceNameToPID();                 }                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);                 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());                 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);                 this.pullAPIWrapper = new PullAPIWrapper(                     mQClientFactory,                     this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {                     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();                 } else {                     switch (this.defaultMQPushConsumer.getMessageModel()) {                         case BROADCASTING:                             this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                             break;                         case CLUSTERING:                             this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                             break;                         default:                             break;                     }                   this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);                 }                 this.offsetStore.load();                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {                     this.consumeOrderly = true;                     this.consumeMessageService =                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {                     this.consumeOrderly = false;                     this.consumeMessageService =                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());                 }                 this.consumeMessageService.start();                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);                 if (!registerOK) {                     this.serviceState = ServiceState.CREATE_JUST;                     this.consumeMessageService.shutdown();                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                         null);                 }                 mQClientFactory.start();                 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());                 this.serviceState = ServiceState.RUNNING;                 break;             case RUNNING:             case START_FAILED:             case SHUTDOWN_ALREADY:                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "                     + this.serviceState                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                     null);             default:                 break;         }         this.updateTopicSubscribeInfoWhenSubscriptionChanged();         this.mQClientFactory.checkClientInBroker();         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();         this.mQClientFactory.rebalanceImmediately();     }
  通过mQClientFactory.start();发我们发现他调用
  Go public void start() throws MQClientException {         synchronized (this) {             switch (this.serviceState) {                 case CREATE_JUST:                     this.serviceState = ServiceState.START_FAILED;                     // If not specified,looking address from name server                     if (null == this.clientConfig.getNamesrvAddr()) {                         this.mQClientAPIImpl.fetchNameServerAddr();                     }                     // Start request-response channel                     this.mQClientAPIImpl.start();                     // Start various schedule tasks                     this.startScheduledTask();                     // Start pull service                     this.pullMessageService.start();                     // Start rebalance service                     this.rebalanceService.start();                     // Start push service                   this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                     log.info("the client factory [{}] start OK", this.clientId);                     this.serviceState = ServiceState.RUNNING;                     break;                 case RUNNING:                     break;                 case SHUTDOWN_ALREADY:                     break;                 case START_FAILED:                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);                 default:                     break;             }         }     }
  在这个方法中有多个start,我们主要看pullMessageService.start();通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,下面我们来看下pullMessageService处理了哪些逻辑:
  Bash private void pullMessage(final PullRequest pullRequest) {         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());         if (consumer != null) {             DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;             impl.pullMessage(pullRequest);         } else {             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);         }     }
  我们发现其实他还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理. pullRequest拉取方式
  PullRequest这里说明一下,上面我们已经提了一下rocketmq的push模式其实是通过pull模式封装实现的,pullrequest这里是通过长轮询的方式达到push效果。
  长轮询方式既有pull的优点又有push模式的实时性有点。 push方式是server端接收到消息后,主动把消息推送给client端,实时性高。弊端是server端工作量大,影响性能,其次是client端处理能力不同且client端的状态不受server端的控制,如果client端不能及时处理消息容易导致消息堆积已经影响正常业务等。 pull方式是client循环从server端拉取消息,主动权在client端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费CPU资源,时间间隔太长client的处理能力会下降,有时候有些消息会处理不及时。 长轮询的方式可以结合两者优点检查PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法; 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳; 检查该Consumer是否运行中,即DefaultMQPushConsumerImpl.serviceState是否为RUNNING;若不是运行状态或者是暂停状态(DefaultMQPushConsumerImpl.pause=true),则调用PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中timeDelay=3000;该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法; 进行流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值(DefaultMQPushConsumer.pullThresholdForQueue,默认值为1000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法; 若不是顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false),则检查ProcessQueue对象的msgTreeMap:TreeMap变量的第一个key值与最后一个key值之间的差额,该key值表示查询的队列偏移量queueoffset;若差额大于阈值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后重新将该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法; 以PullRequest.messageQueue对象的topic值为参数从RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法,稍后重试;并跳出该方法; 若消息模型为集群模式(RebalanceImpl.messageModel等于CLUSTERING),则以PullRequest对象的MessageQueue变量值、type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl. offsetStore对象(初始化为RemoteBrokerOffsetStore对象)的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。若该offset值大于0 则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap 变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1; 当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0; 该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值; 初始化匿名内部类PullCallback,实现了onSucess/onException方法; 该方法只有在异步请求的情况下才会回调; 调用底层的拉取消息API接口: PullAPIWrapper.pullKernelImpl
  PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法进行消息拉取操作。
  将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。
  Go public void pullMessage(final PullRequest pullRequest) {         final ProcessQueue processQueue = pullRequest.getProcessQueue();         if (processQueue.isDropped()) {             log.info("the pull request[{}] is dropped.", pullRequest.toString());             return;         }         pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());         try {             this.makeSureStateOK();         } catch (MQClientException e) {             log.warn("pullMessage exception, consumer state not ok", e);             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);             return;         }         if (this.isPause()) {             log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);             return;         }         long cachedMessageCount = processQueue.getMsgCount().get();         long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);         if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);             if ((queueFlowControlTimes++ % 1000) == 0) {                 log.warn(                     "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                     this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);             }             return;         }         if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);             if ((queueFlowControlTimes++ % 1000) == 0) {                 log.warn(                     "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                     this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);             }             return;         }         if (!this.consumeOrderly) {             if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);                 if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {                     log.warn(                         "the queue"s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),                         pullRequest, queueMaxSpanFlowControlTimes);                 }                 return;             }         } else {             if (processQueue.isLocked()) {                 if (!pullRequest.isLockedFirst()) {                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());                     boolean brokerBusy = offset < pullRequest.getNextOffset();                     log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",                         pullRequest, offset, brokerBusy);                     if (brokerBusy) {                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",                             pullRequest, offset);                     }                     pullRequest.setLockedFirst(true);                     pullRequest.setNextOffset(offset);                 }             } else {                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);                 log.info("pull message later because not locked in broker, {}", pullRequest);                 return;             }         }         final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());         if (null == subscriptionData) {             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);             log.warn("find the consumer"s subscription failed, {}", pullRequest);             return;         }         final long beginTimestamp = System.currentTimeMillis();         PullCallback pullCallback = new PullCallback() {             @Override             public void onSuccess(PullResult pullResult) {                 if (pullResult != null) {                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                         subscriptionData);                     switch (pullResult.getPullStatus()) {                         case FOUND:                             long prevRequestOffset = pullRequest.getNextOffset();                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());                             long pullRT = System.currentTimeMillis() - beginTimestamp;                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),                                 pullRequest.getMessageQueue().getTopic(), pullRT);                             long firstMsgOffset = Long.MAX_VALUE;                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                             } else {                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),                                     pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());                                 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                     pullResult.getMsgFoundList(),                                     processQueue,                                     pullRequest.getMessageQueue(),                                     dispatchToConsume);                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                         DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                                 } else {                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                                 }                             }                             if (pullResult.getNextBeginOffset() < prevRequestOffset                                 || firstMsgOffset < prevRequestOffset) {                                 log.warn(                                     "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",                                     pullResult.getNextBeginOffset(),                                     firstMsgOffset,                                     prevRequestOffset);                             }                             break;                         case NO_NEW_MSG:                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());                             DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                             break;                         case NO_MATCHED_MSG:                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());                             DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                             break;                         case OFFSET_ILLEGAL:                             log.warn("the pull request offset illegal, {} {}",                                 pullRequest.toString(), pullResult.toString());                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());                             pullRequest.getProcessQueue().setDropped(true);                             DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {                                 @Override                                 public void run() {                                     try {                                         DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),                                             pullRequest.getNextOffset(), false);                                         DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                         DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());                                         log.warn("fix the pull request offset, {}", pullRequest);                                     } catch (Throwable e) {                                         log.error("executeTaskLater Exception", e);                                     }                                 }                             }, 10000);                             break;                         default:                             break;                     }                 }             }             @Override             public void onException(Throwable e) {                 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                     log.warn("execute the pull request exception", e);                 }                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);             }         };         boolean commitOffsetEnable = false;         long commitOffsetValue = 0L;         if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {             commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);             if (commitOffsetValue > 0) {                 commitOffsetEnable = true;             }         }         String subExpression = null;         boolean classFilter = false;         SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());         if (sd != null) {             if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {                 subExpression = sd.getSubString();             }             classFilter = sd.isClassFilterMode();         }         int sysFlag = PullSysFlag.buildSysFlag(             commitOffsetEnable, // commitOffset             true, // suspend             subExpression != null, // subscription             classFilter // class filter         );         try {             // 下面我们看继续跟进这个方法,这个方法已经就是客户端如何拉取消息             this.pullAPIWrapper.pullKernelImpl(                 pullRequest.getMessageQueue(),                 subExpression,                 subscriptionData.getExpressionType(),                 subscriptionData.getSubVersion(),                 pullRequest.getNextOffset(),                 this.defaultMQPushConsumer.getPullBatchSize(),                 sysFlag,                 commitOffsetValue,                 BROKER_SUSPEND_MAX_TIME_MILLIS,                 CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                 // 消息的通信方式为异步                 CommunicationMode.ASYNC,                 pullCallback             );         } catch (Exception e) {             log.error("pullKernelImpl exception", e);             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);         }     } 发送远程请求拉取消息
  在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。
  无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理: 发送失败后直接删掉responseTable变量中的相应记录; 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从responseTable中查找ResponseFuture对象,并设置该对象的responseCommand变量。若是同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;若是异步发送会调用ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理; 在NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔1秒定期扫描responseTable列表,遍历该列表中的ResponseFuture对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback()方法,并将该对象从responseTable列表中删除;
  Go public PullResult pullMessage(         final String addr,         final PullMessageRequestHeader requestHeader,         final long timeoutMillis,         final CommunicationMode communicationMode,         final PullCallback pullCallback     ) throws RemotingException, MQBrokerException, InterruptedException {         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);         switch (communicationMode) {             case ONEWAY:                 assert false;                 return null;             case ASYNC:                 this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);                 return null;             case SYNC:                 return this.pullMessageSync(addr, request, timeoutMillis);             default:                 assert false;                 break;         }         return null;     } 同步拉取
  对于同步发送方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步骤如下: 调用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法: 获取Broker地址的Channel信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量;若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中,便于下次使用; 若NettyRemotingClient.rpcHook:RPCHook变量不为空(该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值),则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法; 调用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下: A)使用请求的序列号(opaue)、超时时间初始化ResponseFuture对象;并将该ResponseFuture对象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap变量中; B)调用Channel.writeAndFlush(Object msg)方法将请求对象RemotingCommand发送给Broker;然后调用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了ChannelFutureListener接口的operationComplete方法,在发送完成之后回调该监听类的operationComplete方法,在该方法中,首先调用ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置ResponseFuture对象的sendRequestOK等于true并退出此回调方法等待响应结果;若不成功则置ResponseFuture对象的sendRequestOK等于false,然后从NettyRemotingAbstract.responseTable中删除此请求序列号(opaue)的记录,置ResponseFuture对象的responseCommand等于null,并唤醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待; C)调用ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息(详见5.10.3小节)或者超时的情况下会唤醒该方法返回ResponseFuture.responseCommand变量值; D)若上一步返回的responseCommand值为null,则抛出异常:若ResponseFuture.sendRequestOK为true,则抛出RemotingTimeoutException异常,否则抛出RemotingSendRequestException异常; E)若上一步返回的responseCommand值不为null,则返回responseCommand变量值; 若NettyRemotingClient.rpcHook: RPCHook变量不为空,则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法; 以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下: 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND; 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG; 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG; 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;
  Go @Override     public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {         long beginStartTime = System.currentTimeMillis();         final Channel channel = this.getAndCreateChannel(addr);         if (channel != null && channel.isActive()) {             try {                 if (this.rpcHook != null) {                     this.rpcHook.doBeforeRequest(addr, request);                 }                 long costTime = System.currentTimeMillis() - beginStartTime;                 if (timeoutMillis < costTime) {                     throw new RemotingTimeoutException("invokeSync call timeout");                 }                 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);                 if (this.rpcHook != null) {                     this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);                 }                 return response;             } catch (RemotingSendRequestException e) {                 log.warn("invokeSync: send request exception, so close the channel[{}]", addr);                 this.closeChannel(addr, channel);                 throw e;             } catch (RemotingTimeoutException e) {                 if (nettyClientConfig.isClientCloseSocketIfTimeout()) {                     this.closeChannel(addr, channel);                     log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);                 }                 log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);                 throw e;             }         } else {             this.closeChannel(addr, channel);             throw new RemotingConnectException(addr);         }     }
  getMQClientAPIImpl().pullMessage最终通过channel写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次waitForRunning一段时间默认5秒,然后再check,如果broker一直没有新新消息,第三次check的时间等到时间超过SuspendMaxTimeMills就返回空,如果在等待过程中收到了新消息直接调用notifyMessageArriving函数返回请求结果。"长轮询"的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在consumer中,即使broker有大量的消息堆积也不会主动推送给consumer。 RocketMQ的前提回顾 RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 为什么使用RocketMQ RocketMQ的发展变化 DefaultMQPushConsumer的Maven配置 DefaultMQPushConsumer使用示例 pullRequest拉取方式 长轮询的方式可以结合两者优点 PullAPIWrapper.pullKernelImpl 发送远程请求拉取消息 同步拉取
  原文链接:https://www.cnblogs.com/liboware/p/15596209.html

冷链还冷吗?冷链物流(ColdChainLogistics)一般指一些产品在其进行生产贮藏运输直至销售过程中,在各个环节温度一直保持在规定的范围之内,以防止产品变质,减少食品损耗的一项系统工程金石资源旗下翔振矿业通过高新技术企业认定根据全国高新技术企业认定管理工作领导小组办公室于2021年10月27日发布的关于内蒙古自治区2021年第一批备案高新技术企业名单的公告,金石资源集团股份有限公司控股子公司内蒙古翔振FPX止步十六强,除上单外,全场梦游,令网友愤怒在刚刚结FPX束的A组淘汰赛中,C9和RGE做梦都没有想到,他们竟然把FPX给淘汰了,原本两战队想的是FPX会小组第二出现,结果成为了他们两个来争夺最后一个A组出线的名额,FPX的河南卫视再次为世人证明,就算没有顶流明星我们也能成功出圈今年真的是被河南卫视成功圈粉了,作者身为河南人感到非常的骄傲,那么河南卫视又带给了大家什么样的惊喜呢?根据相关消息,河南举办的首个音乐会即将来袭,在此之前河南卫视就为我们呈现了很多林志炫表里不一,调侃队友,被爆破可能将退出披荆斩棘综艺节目披荆斩棘的哥哥真在热播中,但有网友在8月28日网上爆料称林志炫可能将退出综艺节目披荆斩棘的哥哥,引发众网友的热议,爆料者称,林志炫最近一直在对外说看不惯赵文卓,觉得他表里不超级自然语音技术PK真人真假难辨AI预测足球统统打脸图灵周报图灵周报精选AI行业一周大事件,从良莠不齐的行业资讯中挑选出最有价值的信息,配上专业点评,值得你细读品味。01小冰发布超级自然语音技术,并披露A轮融资完成7月12日,小冰公司发布全史上最严防沉迷正式启动,家长们高兴,学生党想哭今天就是9月1号了,这意味着什么呢?意味着以后再也没有小学生?还是说以后坑别人不能甩锅给小学生了?我为什么这么说呢!原因是就在今天未成年人网络游戏防沉迷规定正式实施!就在前几天未成炒冷饭关于SmartisanOS7。0,我有这些想说2019年10月31日,在时隔一年多之后,锤子科技发布了又一款新机坚果Pro3。这是锤子2019年发布的第一款新机,第一款旗舰也是第一部没有老罗的锤子手机。从图片上看,坚果Pro3最强国产系统鸿蒙系统终于来了!你会支持吗?!!!2019年8月9日,华为正式发布鸿蒙系统(HarmonyOS)!5000多中外天才,7年多的无盈利的投入,烧掉10亿多人民币,中国有几个企业有这魄力!!!关于鸿蒙体系,今天有个在场2021年全球及中国萤石行业全产业链发展预测及市场需求规模分析1产品主要用途萤石又称氟石,是氟化钙的结晶体和化学氟元素最主要的来源。作为现代工业的重要矿物原料,其主要应用于新能源新材料等战略性新兴产业及国防军事核工业等领域,也是传统的化工冶金中芯国际放弃美股回中国上市了!已能为华为量产14nm工艺先进芯片作为中国最大的芯片代工厂商,之前一直在美国和香港上市。不过去年5月份开始,中芯国际开始退出美国股市,申请国内上市,今天(19日),上海证交所发表公告,中芯国际集成电路制造有限公司科
C位鸿蒙今日发布,华为WATCH3MatePadPro抢先剧透将于6月2日举行的华为鸿蒙线上发布会是现下最受关注的科技盛事之一,通过早前的一些预热信息来看,华为除了会在发布会上带来鸿蒙系统外,还会带来一系列硬核新品,这其中包括新一代MateP富士康的郭台铭最恨的两个人究竟是谁富士康郭台铭最恨的两个人是谁呢?一个是他亲自带出来的女徒弟立讯精密的王来春,一个是曾经的小兄弟,比亚迪的王传福。众所周知,富士康近几年可谓是连年不顺,先是郭台铭在美国投资建厂失败,8分钟充满电?你没有看错!200W有线快充和120W无线快充来了国产手机在快充技术方面,只能用牛这个字来形容,这也是国产机碾压三星苹果等海外巨头的一大优势!就算是国产的千元机,都已经配备了快充技术。就拿Redmi刚刚发布的新机RedmiNote常山北明和润和软件,谁才是真正的鸿蒙龙头?常山北明和润和软件,谁才是真正的鸿蒙龙头?想搞清楚这个问题,我们先要回答下面三个问题,点个赞,往下看。第一,常山北明与华为的深度合作,可以追溯到2013年,同时2021年5月17号小米向印度捐款,竟遭到国内用户抵制?网友省省心吧最近有报道,小米向印度捐赠了3000万卢比,为印度各州的医院采购1000多台氧气浓缩器,还与Givelndia合作,为Covid19抗议人员筹集了1000万卢比。此事一出,小米又被最新!继国外厂商宣布涨价后,国内3大芯片厂宣布涨价,最高涨30继此前ST意法半导体TOSHIBA(东芝)ON(安森美)Maxim(美信)等国外厂商宣布涨价后,士兰微瑞纳捷智浦芯联等国内原厂也纷纷加入涨价队伍。士兰微宣布对LED照明驱动产品调价苹果BS资源机到底能不能买?5月以来,iPhone12系列国行富士康BS资源机大量到货,网上的平台都在推,众说纷纭,卖这种机的疯狂说好,不卖这种机的疯狂贬低。虽然毛哥也是个搞机带货的,看到这些同行的言论,有人玩出自主个性,理想ONE改装气动悬挂随着越来越多厂家宣布停止研发内燃机,不知不觉内燃机汽车已经逐渐走向消亡,哪怕再不情愿,这已经是事实,谁也阻挡不了时代的步伐。虽说纯电汽车还有很多地方需要完善,但不得不承认短期内的将6G时代来了?美国已领先?日韩砸重金追赶,我国在行动吗?随着5G时代的到来,各个强国的竞争日益激烈,我们作为国际上拥有话语权数一数二的大国,在5G方面交出的成绩无疑非常亮眼。根据国家权威机构的数据显示,截至目前,我国已经建成5G基站总计柳传志创业教父的机遇焦灼眼光成功与必然的落寞(一)中国IT企业收购国际巨头,举世皆惊!2004年12月8日,凌晨。位于北京海淀区创业路上的联想大厦三楼,60岁的柳传志在会议室里一夜未眠,因为联想集团收购IBM的PC业务签约仪iPhone13再传新消息,电池容量或将大增iPhone13系列电池容量泄漏2021年即将过半,关于苹果下一代新机(传言称为iPhone13)的消息层出不穷本篇文章为您详细介绍iPhone13据报道,iPhone13整体外观