范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

老弟问我,RocketMQ中的ProcessQueue怎么理解?

  今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。
  一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:
  1 ProcessQueue 构建
  RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下: //MQClientInstance.java public void start() throws MQClientException {  synchronized (this) {   switch (this.serviceState) {    case CREATE_JUST:     //...     // Start rebalance service     this.rebalanceService.start();    //...   }  } }
  这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:
  可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。
  下面看一下维护 processQueueTable 的代码: private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,  final boolean isOrder) {  boolean changed = false;   Iterator> it = this.processQueueTable.entrySet().iterator();  while (it.hasNext()) {   Entry next = it.next();   MessageQueue mq = next.getKey();   ProcessQueue pq = next.getValue();    if (mq.getTopic().equals(topic)) {    if (!mqSet.contains(mq)) {     //从processQueueTable上移除    } else if (pq.isPullExpired()) {     switch (this.consumeType()) {      case CONSUME_ACTIVELY://拉模式       break;      case CONSUME_PASSIVELY://推模式       //从processQueueTable上移除       break;      default:       break;     }    }   }  }     //创建ProcessQueue并放到processQueueTable  List pullRequestList = new ArrayList();  for (MessageQueue mq : mqSet) {   if (!this.processQueueTable.containsKey(mq)) {    //...    ProcessQueue pq = new ProcessQueue();     long nextOffset = -1L;    try {     nextOffset = this.computePullFromWhereWithException(mq);    } catch (Exception e) {     log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);     continue;    }     if (nextOffset >= 0) {     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);     if (pre != null) {      log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);     } else {         //封装好processQueueTable后再创建一个PullRequest进行消息拉取      log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);      PullRequest pullRequest = new PullRequest();      pullRequest.setConsumerGroup(consumerGroup);      pullRequest.setNextOffset(nextOffset);      pullRequest.setMessageQueue(mq);      pullRequest.setProcessQueue(pq);      pullRequestList.add(pullRequest);      changed = true;     }    } else {     log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);    }   }  }   this.dispatchPullRequest(pullRequestList);   return changed; } 2 拉取消息
  上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量: //PullMessageService.java private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();  public void executePullRequestImmediately(final PullRequest pullRequest) {  try {   this.pullRequestQueue.put(pullRequest);  } catch (InterruptedException e) {   log.error("executePullRequestImmediately pullRequestQueue.put", e);  } }
  这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理: 对拉取到的消息根据 TAG 再次 进行过滤; 更新 PullRequest 下次拉取的偏移量 nextOffset;
  把拉取的消息封装到 ProcessQueue 的 msgTreeMap( 放到 msgTreeMap 之前首先要获取到写锁 treeMapLock ); 封装 ConsumeRequest 进行消息消费; 封装消息拉取请求再次进行拉取。
  代码如下: //DefaultMQPushConsumerImpl.java public void onSuccess(PullResult pullResult) {  if (pullResult != null) {      //1. 对拉取到的消息根据 TAG 再次进行过滤   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,    subscriptionData);    switch (pullResult.getPullStatus()) {    case FOUND:     //2. 更新 PullRequest 下次拉取的偏移量 nextOffset     pullRequest.setNextOffset(pullResult.getNextBeginOffset());          if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);     } else {      //3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());      //4. 封装 ConsumeRequest 进行消息消费      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(       pullResult.getMsgFoundList(),       processQueue,       pullRequest.getMessageQueue(),       dispatchToConsume);                     //5. 封装消息拉取请求      if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {       DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());      } else {       DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);      }     }     break;    //...   }  } } 3 消费消息
  在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码: //ConsumeMessageConcurrentlyService.ConsumeRequest public void run() {  if (this.processQueue.isDropped()) {   log.info("the message queue not be able to consume, because it"s dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);   return;  }   MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;  ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);  ConsumeConcurrentlyStatus status = null;   try {   status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);  }//...   if (!processQueue.isDropped()) {   ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);  } }
  消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。
  消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除( 需要获取到写锁 treeMapLock ),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:public void processConsumeResult(  final ConsumeConcurrentlyStatus status,  final ConsumeConcurrentlyContext context,  final ConsumeRequest consumeRequest ) {  int ackIndex = context.getAckIndex();   switch (status) {   case CONSUME_SUCCESS:    if (ackIndex >= consumeRequest.getMsgs().size()) {     ackIndex = consumeRequest.getMsgs().size() - 1;    }    int ok = ackIndex + 1;    break;   //...  }  switch (this.defaultMQPushConsumer.getMessageModel()) {   case BROADCASTING:    //...    break;   case CLUSTERING:    List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {     MessageExt msg = consumeRequest.getMsgs().get(i);     //消费失败的,发送回Broker     boolean result = this.sendMessageBack(msg, context);     //...    }     break;   default:    break;  }     //从msgTreeMap中移除并返回msgTreeMap第一条消息的offset  long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());  if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {   this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);  } } 4 消费者限流4.1 缓存消息数量
  如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码: long cachedMessageCount = processQueue.getMsgCount().get(); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);  return; } 4.2 缓存的消息大小
  如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码: long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);  return; } 4.3 消息间隔
  对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码: if (!this.consumeOrderly) {  if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {   this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);   return;  } } 4.4 获取锁失败
  对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码: if (processQueue.isLocked()) {  //... } else {  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } 5 总结
  ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构:
  来源:https://mp.weixin.qq.com/s/zB7dM9xt26c6Z04PvYfOmQ

2023濮院时装周启幕,诠释针织即时尚新民晚报讯(记者唐闻宜)近日,2023濮院时装周在浙江嘉兴濮院时尚古镇拉开帷幕。今年,濮院时装周的主题是针织即时尚,旨在加速助力濮院时尚产业高质量发展进程,并联合国际色彩大会与世界张馨予晒田园美照,时尚短裙搭配雨靴好接地气,老公视角下笑容甜4月14日,张馨予更新动态,称自己的闺蜜让她多发一些生活照,嫌弃她太懒了。听完闺蜜的话后,张馨予就把生活照给安排上了,发文地址还是广东,是在自己的家里,说不定照片正是何捷拍摄的。老边境制作人EA限时优惠及外挂处理今日(4月13日),柳叶刀工作室公布边境制作人CTEA限时优惠打9折,我们与外挂不共戴天,同时公布外挂作弊举报渠道,边境中杯61。2元大杯88。2元特大杯142。2元,优惠持续一周暴雪台湾分公司购买大量服务器订单曝光,国服短期内重开更难了国服何时重开是所有暴雪游戏中国玩家最关心的事情,不过眼下正值微软收购动视暴雪最关键的时期,即使国服代理谈判取得了一定成果也不会在这个时间透露以免节外生枝,因此代表暴雪官方的微博账号教学,如何从模仿走向创新?校长会面对调皮捣蛋的孩子怎么办?其实,孩子顽皮淘气喜欢搞恶作剧的背后,依然有一颗渴望学习渴望长大,甚至渴望伟大的童心。把他们当作UFO一样来研究,你就会创造教育的奇迹。跟校长会(I如何有效预防并减少鼻出血您家孩子或者您儿时有没有鼻出血的经历呢?据国外一项研究统计,30的5岁以下儿童56的610岁儿童和64的1115岁儿童至少发生过1次鼻出血1,如此看来,儿童鼻出血是非常常见的现象了好评中国走进湖南丨湘江新区科技创新引领产业集群高质量发展潇湘之滨春潮涌,奋进扬帆正当时。4月1日好评中国走进湖南网络主题宣传活动报道团走进湘江新区,在国家网络安全产业园中联重科智慧产业城湘江智能网联产业园,感受科技创新引领产业集群高质量万安科技2022年净利润同比增长228。29拟10派0。8元中证智能财讯万安科技(002590)4月13日披露2022年年度报告。2022年,公司实现营业总收入33。64亿元,同比增长23。99归母净利润7163。75万元,同比增长228。ROG游戏手机7Pro评测用性能诠释信仰对于游戏手机厂商来说,新旗舰的打磨总是更需要一些时间的,毕竟如何进一步释放芯片的潜能总是一个困难的课题。在4月13日,ROG终于为我们带来了全新的腾讯ROG游戏手机7系列,展现了他换机预算3000,这四款是很不错的选择,从千元到旗舰,都是真性能如果您喜欢,可以点击上面的关注二字。后续会为您提供更多有价值的内容。今天分享换机预算3000,这四款是很不错的选择,从千元到旗舰,都是真性能第一款OPPOK10参考价格1699元(一加Ace2原神定制礼盒脚步声近了,专属18GB内存4月17日发布2023年,大家手机内存多大才够用呢?对于这个问题可能大家众说纷纭。但不可否认的是,更大的手机内存,确实能够为用户带来增益效果。一方面,大内存可以提高系统的运行流畅度,同时也能够后
搜狐网抄袭我的作品,头条却是审核未通过建立跨平台维权系统最近有点气人,我写了一篇文章松哥民宿事件的警示,民宿开业要有章程,并要整顿龙蛇混杂的民宿,我的是3月9日写的,并发表于头条是原创。这篇文章的展现量和阅读量都不行。就是今天下午,我突男人生娃不是梦?日本科学家让两只雄性老鼠产仔男人生娃雄性老鼠产仔日本科学家让两只雄性老鼠产仔你可能听说过克隆羊多莉,也可能听说过基因编辑婴儿,但你一定没想到,科学家竟然可以让两只雄性老鼠产仔!是的,你没有看错。这不是科幻小说绍兴印象我家的后面有一个很大的园,相传叫作百草园。现在是早已并屋子一起卖给朱文公的子孙了,连那最末次的相见也已经隔了七八年,其中似乎确凿只有一些野草但那时却是我的乐园。绍兴一直令我向往,只虚不受补怎么办?不如试试这道汤,健脾益气开胃提神今日推荐鸡枞菌响螺鸡汤材料鸡枞菌150克,响螺干34片,鸡半只,生姜数片,食盐适量。做法(1)用毛刷把鸡枞洗干净,并用手撕成小条响螺干用清水浸泡,切条。(2)鸡肉洗干净切块焯水,加1碗面粉,1个鸡蛋,油条最好配方,金黄酥脆个个空心,凉了也不硬家有面粉就能做,只需1碗面粉,1个鸡蛋,操作简单,在家也能轻松炸油条了,今天就同大家一起来分享一个油条最好的配方,操作简单,炸出来的油条,金黄酥酥个个空心,学会都可以去开店了,而且麻辣小龙虾,麻辣鲜香色泽红亮虾肉嫩滑可口,你们想吃吗?头条创作挑战赛麻辣小龙虾,麻辣鲜香色泽红亮虾肉滑嫩可口,你们想吃吗?小龙虾含蛋白质比较高,含有人体所需的8种氨基酸,其氨基酸的组成优于肉类。下面让我们一起看看麻辣小龙虾是怎么做的吧减肥必定要知道的菜谱!!1西兰花炒番茄做法很简单,但需要注意的是,最好先把西蓝花用清水冲洗干净后,切成小块,这样能使烹饪后的菜肴更鲜嫩多汁。2冬瓜排骨汤冬瓜是减肥食物中的佼佼者,它有很好的利尿作用,且营养为什么现在女孩子穿的越来越少了,其实她们是有想法的,你觉得呢不知道大家发现没有,现在的女孩子穿衣风格很多都是以少为美,穿的越少越好。即使在寒风刺骨的冬天,也依然能看到有些女孩子光腿穿着裙子行走在大街上,尽管冻得瑟瑟发抖,可还是选择风度不要温低成本除螨!皮肤瘙痒爱长痘的情况再也不见!牛奶好皮摸着很丝滑低成本除螨法!皮肤瘙痒爱长痘的情况再也不见!牛奶好皮摸着很丝滑!前段时间一直摸索着如何去除后背痘,就跟着博主学了个法子!也就坚持了一个多月,身上一直长痘出油的情况就缓解了。因为效果看一眼美女,长10秒钟寿命美女是一种令人心驰神往的存在。她们的美丽不仅令人惊叹,更是一种艺术,一种生命的灵魂。在这个时代,美女已经成为了一种文化现象,吸引着无数人的眼球。那么,什么才是真正的美女呢?首先,美口红测评推荐超级滋润显白显温柔,令唇部长时间保持柔软潤滑口红色号测评推荐超级滋润显白显温柔,令唇部长时间保持柔软潤滑!YSL纯色唇釉9番茄水红用了快半年要见底了,还是很喜欢。有次连夜坐飞机赶去北京参加活动,落地已经半夜一点多了。第二天参