范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

深入剖析RocketMQ源码负载均衡机制

  一、引言
  RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
  RocketMQ主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr负责存储元数据,各组件的主要功能如下:消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
  RocketMQ整体消息处理逻辑上以Topic维度进行生产消费、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。
  本篇文章分析的核心在于介绍RocketMQ的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。二、RocketMQ的整体架构
  (图片来自于Apache RocketMQ)
  RocketMQ架构上主要分为四部分,如上图所示:Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。NameServer:NameServer是一个非常简单的Topic路由注册中心,支持分布式集群方式部署,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。
  RocketMQ的Topic的物理分布如上图所示:Topic作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的Broker当中。
  Broker中的Queue是Topic对应消息的物理存储单元。
  在RocketMQ的整体设计理念当中,消息的生产消费以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创建对应的MessageQueue。
  producer生产消息的过程本质上就是选择Topic在Broker的所有的MessageQueue并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。
  consumer消费消息的过程本质上就是一个订阅同一个Topic的consumerGroup下的每个consumer按照一定的规则负责Topic下一部分MessageQueue进行消费。
  在RocketMQ整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到Broker选择的负载均衡,消息的消费过程主要涉及多consumer和多Broker之间的负责均衡。三、producer消息生产过程
  producer消息生产过程:producer首先访问namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包括每个topic在每个Broker的队列分布情况)。producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的消息生产的路由信息。producer根据本地路由信息向Broker发送消息,选择本地路由中具体的Broker进行消息发送。3.1 路由同步过程public class MQClientInstance {       public boolean updateTopicRouteInfoFromNameServer(final String topic) {         return updateTopicRouteInfoFromNameServer(topic, false, null);     }         public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,         DefaultMQProducer defaultMQProducer) {         try {             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {                 try {                     TopicRouteData topicRouteData;                     if (isDefault && defaultMQProducer != null) {                         // 省略对应的代码                     } else {                         // 1、负责查询指定的Topic对应的路由信息                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                     }                       if (topicRouteData != null) {                         // 2、比较路由数据topicRouteData是否发生变更                         TopicRouteData old = this.topicRouteTable.get(topic);                         boolean changed = topicRouteDataIsChange(old, topicRouteData);                         if (!changed) {                             changed = this.isNeedUpdateTopicRouteInfo(topic);                         }                         // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息                         if (changed) {                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                               for (BrokerData bd : topicRouteData.getBrokerDatas()) {                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                             }                               // 生成生产者对应的Topic信息                             {                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                                 publishInfo.setHaveTopicRouterInfo(true);                                 Iterator> it = this.producerTable.entrySet().iterator();                                 while (it.hasNext()) {                                     Entry entry = it.next();                                     MQProducerInner impl = entry.getValue();                                     if (impl != null) {                                         impl.updateTopicPublishInfo(topic, publishInfo);                                     }                                 }                             }                             // 保存到本地生产者路由表当中                             this.topicRouteTable.put(topic, cloneTopicRouteData);                             return true;                         }                     }                 } finally {                     this.lockNamesrv.unlock();                 }             } else {             }         } catch (InterruptedException e) {         }           return false;     } }
  路由同步过程:路由同步过程是消息生产者发送消息的前置条件,没有路由的同步就无法感知具体发往那个Broker节点。路由同步主要负责查询指定的Topic对应的路由信息,比较路由数据topicRouteData是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。public class TopicRouteData extends RemotingSerializable {     private String orderTopicConf;     // 按照broker维度保存的Queue信息     private List queueDatas;     // 按照broker维度保存的broker信息     private List brokerDatas;     private HashMap/* Filter Server */> filterServerTable; }     public class QueueData implements Comparable {     // broker的名称     private String brokerName;     // 读队列大小     private int readQueueNums;     // 写队列大小     private int writeQueueNums;     // 读写权限     private int perm;     private int topicSynFlag; }     public class BrokerData implements Comparable {     // broker所属集群信息     private String cluster;     // broker的名称     private String brokerName;     // broker对应的ip地址信息     private HashMap brokerAddrs;     private final Random random = new Random(); }     --------------------------------------------------------------------------------------------------     public class TopicPublishInfo {     private boolean orderTopic = false;     private boolean haveTopicRouterInfo = false;     // 最细粒度的队列信息     private List messageQueueList = new ArrayList();     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();     private TopicRouteData topicRouteData; }   public class MessageQueue implements Comparable, Serializable {     private static final long serialVersionUID = 6191200464116433425L;     // Topic信息     private String topic;     // 所属的brokerName信息     private String brokerName;     // Topic下的队列信息Id     private int queueId; }
  路由解析过程:TopicRouteData核心变量QueueData保存每个Broker的队列信息,BrokerData保存Broker的地址信息。TopicPublishInfo核心变量MessageQueue保存最细粒度的队列信息。producer负责将从namesvr获取的TopicRouteData转化为producer本地的TopicPublishInfo。public class MQClientInstance {       public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {           TopicPublishInfo info = new TopicPublishInfo();           info.setTopicRouteData(route);         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {           // 省略相关代码         } else {               List qds = route.getQueueDatas();               // 按照brokerName进行排序             Collections.sort(qds);               // 遍历所有broker生成队列维度信息             for (QueueData qd : qds) {                 // 具备写能力的QueueData能够用于队列生成                 if (PermName.isWriteable(qd.getPerm())) {                     // 遍历获得指定brokerData进行异常条件过滤                     BrokerData brokerData = null;                     for (BrokerData bd : route.getBrokerDatas()) {                         if (bd.getBrokerName().equals(qd.getBrokerName())) {                             brokerData = bd;                             break;                         }                     }                     if (null == brokerData) {                         continue;                     }                     if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {                         continue;                     }                       // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo                     for (int i = 0; i < qd.getWriteQueueNums(); i++) {                         MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                         info.getMessageQueueList().add(mq);                     }                 }             }               info.setOrderTopic(false);         }           return info;     } }
  路由生成过程:路由生成过程主要是根据QueueData的BrokerName和writeQueueNums来生成MessageQueue 对象。MessageQueue是消息发送过程中选择的最细粒度的可发送消息的队列。{     "TBW102": [{         "brokerName": "broker-a",         "perm": 7,         "readQueueNums": 8,         "topicSynFlag": 0,         "writeQueueNums": 8     }, {         "brokerName": "broker-b",         "perm": 7,         "readQueueNums": 8,         "topicSynFlag": 0,         "writeQueueNums": 8     }] }
  路由解析举例:topic(TBW102)在broker-a和broker-b上存在队列信息,其中读写队列个数都为8。先按照broker-a、broker-b的名字顺序针对broker信息进行排序。针对broker-a会生成8个topic为TBW102的MessageQueue对象,queueId分别是0-7。针对broker-b会生成8个topic为TBW102的MessageQueue对象,queueId分别是0-7。topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。消息发送过程中的路由选择就是从这16个MessageQueue对象当中获取一个进行消息发送。3.2 负载均衡过程public class DefaultMQProducerImpl implements MQProducerInner {       private SendResult sendDefaultImpl(         Message msg,         final CommunicationMode communicationMode,         final SendCallback sendCallback,         final long timeout     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {                  // 1、查询消息发送的TopicPublishInfo信息         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());           if (topicPublishInfo != null && topicPublishInfo.ok()) {                           String[] brokersSent = new String[timesTotal];             // 根据重试次数进行消息发送             for (; times < timesTotal; times++) {                 // 记录上次发送失败的brokerName                 String lastBrokerName = null == mq ? null : mq.getBrokerName();                 // 2、从TopicPublishInfo获取发送消息的队列                 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                 if (mqSelected != null) {                     mq = mqSelected;                     brokersSent[times] = mq.getBrokerName();                     try {                         // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送                         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                         switch (communicationMode) {                             case SYNC:                                 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                     if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                         continue;                                     }                                 }                                   return sendResult;                             default:                                 break;                         }                     } catch (MQBrokerException e) {                         // 省略相关代码                     } catch (InterruptedException e) {                         // 省略相关代码                     }                 } else {                     break;                 }             }               if (sendResult != null) {                 return sendResult;             }         }     } }
  消息发送过程:查询Topic对应的路由信息对象TopicPublishInfo。从TopicPublishInfo中通过selectOneMessageQueue获取发送消息的队列,该队列代表具体落到具体的Broker的queue队列当中。执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送,重新选择队列会避开上一次发送失败的Broker的队列。public class TopicPublishInfo {       public MessageQueue selectOneMessageQueue(final String lastBrokerName) {         if (lastBrokerName == null) {             return selectOneMessageQueue();         } else {             // 按照轮询进行选择发送的MessageQueue             for (int i = 0; i < this.messageQueueList.size(); i++) {                 int index = this.sendWhichQueue.getAndIncrement();                 int pos = Math.abs(index) % this.messageQueueList.size();                 if (pos < 0)                     pos = 0;                 MessageQueue mq = this.messageQueueList.get(pos);                 // 避开上一次上一次发送失败的MessageQueue                 if (!mq.getBrokerName().equals(lastBrokerName)) {                     return mq;                 }             }             return selectOneMessageQueue();         }     } }
  路由选择过程:MessageQueue的选择按照轮询进行选择,通过全局维护索引进行累加取模选择发送队列。MessageQueue的选择过程中会避开上一次发送失败Broker对应的MessageQueue。
  Producer消息发送示意图:某Topic的队列分布为Broker_A_Queue1、Broker_A_Queue2、Broker_B_Queue1、Broker_B_Queue2、Broker_C_Queue1、Broker_C_Queue2,根据轮询策略依次进行选择。发送失败的场景下如Broker_A_Queue1发送失败那么就会跳过Broker_A选择Broker_B_Queue1进行发送。四、consumer消息消费过程
  consumer消息消费过程:consumer访问namesvr同步topic对应的路由信息。consumer在本地解析远程路由信息并保存到本地。consumer在本地进行Reblance负载均衡确定本节点负责消费的MessageQueue。consumer访问Broker消费指定的MessageQueue的消息。4.1 路由同步过程public class MQClientInstance {       // 1、启动定时任务从namesvr定时同步路由信息     private void startScheduledTask() {         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {               @Override             public void run() {                 try {                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();                 } catch (Exception e) {                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);                 }             }         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);     }       public void updateTopicRouteInfoFromNameServer() {         Set topicList = new HashSet();           // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息         {             Iterator> it = this.consumerTable.entrySet().iterator();             while (it.hasNext()) {                 Entry entry = it.next();                 MQConsumerInner impl = entry.getValue();                 if (impl != null) {                     Set subList = impl.subscriptions();                     if (subList != null) {                         for (SubscriptionData subData : subList) {                             topicList.add(subData.getTopic());                         }                     }                 }             }         }           for (String topic : topicList) {             this.updateTopicRouteInfoFromNameServer(topic);         }     }       public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,         DefaultMQProducer defaultMQProducer) {           try {             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {                 try {                     TopicRouteData topicRouteData;                     if (isDefault && defaultMQProducer != null) {                         // 省略代码                     } else {                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                     }                       if (topicRouteData != null) {                         TopicRouteData old = this.topicRouteTable.get(topic);                         boolean changed = topicRouteDataIsChange(old, topicRouteData);                         if (!changed) {                             changed = this.isNeedUpdateTopicRouteInfo(topic);                         }                           if (changed) {                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                               for (BrokerData bd : topicRouteData.getBrokerDatas()) {                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                             }                               // 构建consumer侧的路由信息                             {                                 Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);                                 Iterator> it = this.consumerTable.entrySet().iterator();                                 while (it.hasNext()) {                                     Entry entry = it.next();                                     MQConsumerInner impl = entry.getValue();                                     if (impl != null) {                                         impl.updateTopicSubscribeInfo(topic, subscribeInfo);                                     }                                 }                             }                                   this.topicRouteTable.put(topic, cloneTopicRouteData);                             return true;                         }                     }                 } finally {                     this.lockNamesrv.unlock();                 }             }         } catch (InterruptedException e) {         }           return false;     } }
  路由同步过程:路由同步过程是消息消费者消费消息的前置条件,没有路由的同步就无法感知具体待消费的消息的Broker节点。consumer节点通过定时任务定期从namesvr同步该消费节点订阅的topic的路由信息。consumer通过updateTopicSubscribeInfo将同步的路由信息构建成本地的路由信息并用以后续的负责均衡。4.2 负载均衡过程public class RebalanceService extends ServiceThread {       private static long waitInterval =         Long.parseLong(System.getProperty(             "rocketmq.client.rebalance.waitInterval", "20000"));       private final MQClientInstance mqClientFactory;       public RebalanceService(MQClientInstance mqClientFactory) {         this.mqClientFactory = mqClientFactory;     }       @Override     public void run() {           while (!this.isStopped()) {             this.waitForRunning(waitInterval);             this.mqClientFactory.doRebalance();         }       } }
  负载均衡过程:consumer通过RebalanceService来定期进行重新负载均衡。RebalanceService的核心在于完成MessageQueue和consumer的分配关系。public abstract class RebalanceImpl {       private void rebalanceByTopic(final String topic, final boolean isOrder) {         switch (messageModel) {             case BROADCASTING: {                 // 省略相关代码                 break;             }             case CLUSTERING: { // 集群模式下的负载均衡                 // 1、获取topic下所有的MessageQueue                 Set mqSet = this.topicSubscribeInfoTable.get(topic);                   // 2、获取topic下该consumerGroup下所有的consumer对象                 List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);                                   // 3、开始重新分配进行rebalance                 if (mqSet != null && cidAll != null) {                     List mqAll = new ArrayList();                     mqAll.addAll(mqSet);                       Collections.sort(mqAll);                     Collections.sort(cidAll);                       AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;                       List allocateResult = null;                     try {                         // 4、通过分配策略重新进行分配                         allocateResult = strategy.allocate(                             this.consumerGroup,                             this.mQClientFactory.getClientId(),                             mqAll,                             cidAll);                     } catch (Throwable e) {                           return;                     }                       Set allocateResultSet = new HashSet();                     if (allocateResult != null) {                         allocateResultSet.addAll(allocateResult);                     }                     // 5、根据分配结果执行真正的rebalance动作                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);                     if (changed) {                         this.messageQueueChanged(topic, mqSet, allocateResultSet);                     }                 }                 break;             }             default:                 break;         }     }
  重新分配流程:获取topic下所有的MessageQueue。获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。针对mqAll和cidAll进行排序,mqAll排序顺序按照先BrokerName后BrokerId,cidAll排序按照字符串排序。通过分配策略AllocateMessageQueueStrategy重新分配。根据分配结果执行真正的rebalance动作。public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {     private final InternalLogger log = ClientLogger.getLog();       @Override     public List allocate(String consumerGroup, String currentCID, List mqAll,         List cidAll) {                   List result = new ArrayList();                   // 核心逻辑计算开始           // 计算当前cid的下标         int index = cidAll.indexOf(currentCID);                   // 计算多余的模值         int mod = mqAll.size() % cidAll.size();           // 计算平均大小         int averageSize =             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                 + 1 : mqAll.size() / cidAll.size());         // 计算起始下标         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;         // 计算范围大小         int range = Math.min(averageSize, mqAll.size() - startIndex);         // 组装结果         for (int i = 0; i < range; i++) {             result.add(mqAll.get((startIndex + i) % mqAll.size()));         }         return result;     }     // 核心逻辑计算结束       @Override     public String getName() {         return "AVG";     } }   ------------------------------------------------------------------------------------   rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。   rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,分布在3个broker。 排序后的mqAll的大小为9,依次为 [broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2]   rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为 [192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959]   192.168.0.6@15956 的分配MessageQueue结算过程 index:0 mod:9%4=1 averageSize:9 / 4 + 1 = 3 startIndex:0 range:3 messageQueue:[broker_a_0、broker_a_1、broker_a_2]     192.168.0.6@15957 的分配MessageQueue结算过程 index:1 mod:9%4=1 averageSize:9 / 4 = 2 startIndex:3 range:2 messageQueue:[broker_b_0、broker_b_1]     192.168.0.6@15958 的分配MessageQueue结算过程 index:2 mod:9%4=1 averageSize:9 / 4 = 2 startIndex:5 range:2 messageQueue:[broker_b_2、broker_c_0]     192.168.0.6@15959 的分配MessageQueue结算过程 index:3 mod:9%4=1 averageSize:9 / 4 = 2 startIndex:7 range:2 messageQueue:[broker_c_1、broker_c_2]
  分配策略分析:整体分配策略可以参考上图的具体例子,可以更好的理解分配的逻辑。
  consumer的分配:同一个consumerGroup下的consumer对象会分配到同一个Topic下不同的MessageQueue。每个MessageQueue最终会分配到具体的consumer当中。五、RocketMQ指定机器消费设计思路
  日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {     private final InternalLogger log = ClientLogger.getLog();       @Override     public List allocate(String consumerGroup, String currentCID, List mqAll,         List cidAll) {                    List result = new ArrayList();         // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费         if (!cidAll.contains(currentCID)) {             return result;         }           int index = cidAll.indexOf(currentCID);         int mod = mqAll.size() % cidAll.size();         int averageSize =             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                 + 1 : mqAll.size() / cidAll.size());         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;         int range = Math.min(averageSize, mqAll.size() - startIndex);         for (int i = 0; i < range; i++) {             result.add(mqAll.get((startIndex + i) % mqAll.size()));         }         return result;     } }
  consumer负载均衡策略改写:通过改写负载均衡策略AllocateMessageQueueAveragely的allocate机制保证只有指定IP的机器能够进行消费。通过IP进行判断是基于RocketMQ的cid格式是192.168.0.6@15956,其中前面的IP地址就是对于的消费机器的ip地址,整个方案可行且可以实际落地。六、小结
  本文主要介绍了RocketMQ在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。
  作者:vivo互联网技术
  链接:https://juejin.cn/post/7083676205022445582

华米科技不断突破行业难题!成为全球领先Top级可穿戴设备厂商10月12日,华米科技将会举办Amazfit2021全球年度新品发布会,推出都市时尚系列AmazfitGTR3GTS3智能手表。值得注意的是,在今天的倒计时海报中,华米科技提及将对手机越用越流畅,需要关闭4个开关,减少垃圾,扩大内存大家好,欢迎来到大雨静听的生活。手机要想越用越流畅,需要关闭这四个开关,减少垃圾,扩大内存。分享给大家。第一,杂志锁屏。找到设置桌面和壁纸杂志锁屏开启杂志锁屏右边的按钮关闭就可以了阿里国际站商品高清无水印商品图,怎么分类批量保存?想要分类批量保存电商平台图片,可以这样做1运行工具固乔电商图片助手2复制阿里巴巴店铺的链接3在工具页面打开宝贝分来下载的窗口自4将阿里店铺的链接粘贴上去,获取宝贝分类5勾选分类,可iphone流畅度排名第一等,iphone13pro系列,120hz高刷屏幕加持,史上最流畅ios,13系列的pro和非pro系列,是差距最大的一代,因为高刷带来的流畅体验,是所有人都可以明确感知的,不从5999元跌至1699元,IP682K曲面屏,华为旗舰二手机掉至千元档华为公司今年举办新机发布会的次数比往年少多了,但是今年华为商城中上架的手机却比往年多得多,这是因为其中包括很多的华为二手机,自从华为公司缺乏处理器,手中可售的手机比较少之后,华为公恭喜这些手机将抢先升级体验MIUI13华为本月开启了鸿蒙2。0系统正式版新一批的升级更新,鸿蒙2。0系统正式版新一批支持的机型有6款分别为华为nova8SE活力版,华为平板M5青春版8英寸,华为平板M5青春版10。1英20003000元价位手机选购指南,快来看看吧一iQ00Neo3优点1。高通骁龙865处理器,性能强悍,國体验非常出色2。搭载了一款6。57英寸LCD屏,支持自适应刷新率最高可达144Hz3。4500毫安电池44W闪充组合续航为什么用小米的几乎很少会买OV,而用OV的也几乎不会买小米呢?国内手机市场一直被华为小米OPPOVIVO这几大手机品牌占据。抛开华为不谈,相信使用小米和OV的人已经有几亿人了,但很多米粉表示换机选苹果也不选OV,而OV粉则表示坚决不尝试小米手个人信息保护法实施后,APP账号注销仍存在5大问题未注明注销条件注销条件设置不合理无法通过APP直接注销个人信息保护法实施后,APP账号注销仍存在5大问题中消协建议APP运营者明示合理的注销条件提供便捷的注销路径,保障用户顺利注销年亏10亿,拖欠员工工资,估值600亿的柔宇科技,陷入了大麻烦文有鱼审核张子扬校正知秋作为柔性领域的开拓者,柔宇科技自成立以来,一举一动都颇受关注。因为2014年研制出全球最薄柔性显示屏,柔宇科技迅速蹿升为明星科技企业,2018年柔宇科技又发如何看待刚认识就要微信的现象?这最多是对微信走火入魔而已,至于能否遇上知音,从此网上诉说衷情,还是认识了骗子,不幸落入圈套,咎由自取,休怪他人。若是微信交友别有用心,也可能利用微信招遥撞骗,拉人网购三无产品。这
顶尖空调惊喜豪礼!TCL空调真的大手笔转眼之间,618年中大促又到来了!对于消费者来说,618就是不容错过的狂欢购物盛宴,而对于品牌商家而言,618购物节则是可遇不可求的年度节点,是一个可以尽情展示自身品牌实力扩大品牌推动消费升级苏宁家电3C以旧换新再获央视点赞日前,央视再次报道并点赞苏宁易购响应国家政策,在家电3C新品扎堆首发的三四月份,大力推行以旧换新促进消费的相关举措。苏宁418焕新节的全面大爆发,电商行业上半年家电3C双十一战役进北京汽车全明星阵容登陆成都车展展示产品3。0时代战略成果8月29日,作为西南车市风向标,第二十四届成都国际汽车展览会盛大开幕!车展现场,北京汽车旗下7万级悦享生活家轿北京U5PLUS开启预定,预定价格为711万元,并享受50元抵5000哈弗神兽亮相成都车展中国哈弗深化用户运营布局混动赛道8月29日,第二十四届成都国际汽车展览会启幕,中国哈弗携新科技旗舰SUV哈弗神兽哈弗H6S,哈弗H6superme哈弗H92022款等新车燃动蓉城,开拓用户大运营,布局新能源混动新第一实至名归TCL登顶京东家电6月品牌体验竞争力排行榜据京东家电6月份品牌体验竞争力排行榜数据显示,TCL位列电视品类排行榜第一位。在家电全面热销的月份,各家品牌都使出了浑身解数,TCL能豪夺第一,其品牌竞争力可见一斑,果然不负其全球如何说服固执的父母体验新生活?这台洗烘一体机就是成功的第一步不知道你们有没有这样的感受,每次逢年过节回到家,总觉得家里的父母特别的省,明明儿女也混得不差,先不说什么中产阶级水平,至少也勉强算个小康。可不管怎样,父母亲还是一如既往地遵守能省则ALIENWAREx15x17惊艳上市!重新定义纤薄与性能要说笔记本电脑中的新贵,ALIENWAREx15x17的名字一定会被提及!当仁不让的全新性能水平,这款笔记本到底值不值得购买呢?ALIENWAREx15x17一分钱一分货,今天我们视频达人都需要的拍摄装备,智云WEEBILL2相机云台拍出新花样对于云台这种产品,相信大家都不陌生。想要拍摄高质量的短视频,最基本的就是要平稳!而想要平稳的首选就是安排上一个好用的稳定器。不过当下电商平台上的云台产品可谓是五花八门,如何选择相信骨传导耳机是黑科技or智商税?上手南卡骨传导RunnerCCII一探究竟生活中,我们一般使用的耳机,都是将发声设备的产生的电信号经过喇叭还原成震动波,再由空气介质传递到耳膜,从而还原成声音的。但还有一种相对小众的耳机骨传导耳机,它是利用骨传导技术,通过硬核!给功能机装上微信和抖音战狼厂商AGM上新599元大哥大智能时代,智能手机的功能可以说是越来越强大,但是智能手机也在一定程度上坑了很多人,尤其是老人和孩子。给父母买过智能手机的小伙伴一定经历过这样的事情手机虽然能够切换成老年机模式,但是致敬攀登精神越野炮珠峰版18。18万18。88万元成都车展正式上市8月29日,在第24届成都车展上,长城炮完成第20万台车辆的交付,以长城炮速度再次刷新行业纪录!同时,长城炮越野皮卡珠峰版重磅上市,售价18。18万18。88万元骑士拍档机车炮首发