保健励志美文体育育儿作文
投稿投诉
作文动态
热点娱乐
育儿情感
教程科技
体育养生
教案探索
美文旅游
财经日志
励志范文
论文时尚
保健游戏
护肤业界

面试官你来简单回答一下RocketMQ的默认发送流程

  今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。
  思考问题消息发送者是如何做负载均衡的?消息发送者是如何保证高可用的?消息发送批量消息如何保证一致性的?默认发送流程工作原理
  源码入口:org。apache。rocketmq。client。producer。DefaultMQProducersend(org。apache。rocketmq。common。message。Message)
  启动Demo:DefaultMQProducerproducernewDefaultMQProducer(group1);producer。setNamesrvAddr(xxx:9876);producer。start();MessagemsgnewMessage(TopicTestTopic,TagATag,(HelloRocketMQi)。getBytes(RemotingHelper。DEFAULTCHARSET)Messagebody);SendResultsendResultproducer。send(msg);
  流程:
  1。校验主题,设置主题msg。setTopic(withNamespace(msg。getTopic()));publicStringwithNamespace(Stringresource){returnNamespaceUtil。wrapNamespace(this。getNamespace(),resource);}
  2。默认发送方式为同步发送,默认超时时间为3sprivateintsendMsgTimeout3000;publicSendResultsend(Messagemsg,longtimeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{returnthis。sendDefaultImpl(msg,CommunicationMode。SYNC,null,timeout);}
  3。确认producerservice运行状态是否为运行中
  入口:org。apache。rocketmq。client。impl。producer。DefaultMQProducerImplmakeSureStateOK检查状态,如果不是RUNNING状态则抛出异常privatevoidmakeSureStateOK()throwsMQClientException{if(this。serviceState!ServiceState。RUNNING){thrownewMQClientException(TheproducerservicestatenotOK,this。serviceStateFAQUrl。suggestTodo(FAQUrl。CLIENTSERVICENOTOK),null);}}
  4。校验信息topic长达是否大于TOPICMAXLENGTH,topic是否为空是否通过正则校验,body是否为空,body大小是否超过4MpublicstaticvoidcheckTopic(Stringtopic)throwsMQClientException{if(UtilAll。isBlank(topic)){thrownewMQClientException(Thespecifiedtopicisblank,null);}if(topic。length()TOPICMAXLENGTH){thrownewMQClientException(String。format(Thespecifiedtopicislongerthantopicmaxlengthd。,TOPICMAXLENGTH),null);}if(isTopicOrGroupIllegal(topic)){thrownewMQClientException(String。format(Thespecifiedtopic〔s〕containsillegalcharacters,allowingonlys,topic,〔azAZ09〕34;),null);}}bodyif(nullmsg。getBody()){thrownewMQClientException(ResponseCode。MESSAGEILLEGAL,themessagebodyisnull);}if(0msg。getBody()。length){thrownewMQClientException(ResponseCode。MESSAGEILLEGAL,themessagebodylengthiszero);}if(msg。getBody()。lengthdefaultMQProducer。getMaxMessageSize()){thrownewMQClientException(ResponseCode。MESSAGEILLEGAL,themessagebodysizeovermaxvalue,MAX:defaultMQProducer。getMaxMessageSize());
  5。找到主题发布的信息,未找到则抛出异常
  入口:org。apache。rocketmq。client。impl。producer。DefaultMQProducerImpltryToFindTopicPublishInfo
  消息生产者更新和维护路由信息缓存privateTopicPublishInfotryToFindTopicPublishInfo(finalStringtopic){TopicPublishInfotopicPublishInfothis。topicPublishInfoTable。get(topic);if(nulltopicPublishInfo!topicPublishInfo。ok()){this。topicPublishInfoTable。putIfAbsent(topic,newTopicPublishInfo());消息生产者更新和维护路由信息缓存this。mQClientFactory。updateTopicRouteInfoFromNameServer(topic);topicPublishInfothis。topicPublishInfoTable。get(topic);}if(topicPublishInfo。isHaveTopicRouterInfo()topicPublishInfo。ok()){returntopicPublishInfo;}else{this。mQClientFactory。updateTopicRouteInfoFromNameServer(topic,true,this。defaultMQProducer);topicPublishInfothis。topicPublishInfoTable。get(topic);returntopicPublishInfo;}}
  6。通过TopicPublishInfo找到对应的MessageQueue下的,BrokerName信息
  入口:org。apache。rocketmq。client。impl。producer。DefaultMQProducerImplselectOneMessageQueue
  获取到BrokerName对应的MessageQueue信息publicMessageQueueselectOneMessageQueue(finalStringlastBrokerName){if(lastBrokerNamenull){returnselectOneMessageQueue();}else{for(inti0;ithis。messageQueueList。size();i){intindexthis。sendWhichQueue。incrementAndGet();intposMath。abs(index)this。messageQueueList。size();if(pos0)pos0;MessageQueuemqthis。messageQueueList。get(pos);if(!mq。getBrokerName()。equals(lastBrokerName)){returnmq;}}returnselectOneMessageQueue();}}
  如果lastBrokerName为null,通过对sendWhichQueue方法获取一个队列
  取余,然后从messageQueueList中获取一个MessageQueuepublicMessageQueueselectOneMessageQueue(){intindexthis。sendWhichQueue。incrementAndGet();intposMath。abs(index)this。messageQueueList。size();if(pos0)pos0;returnthis。messageQueueList。get(pos);}
  7。最后消息发送
  入口:org。apache。rocketmq。client。impl。producer。DefaultMQProducerImplsendKernelImpl
  1。根绝BrokerName获取到broker地址
  在启动阶段,对BrokerAddrTable信息进行了维护publicStringfindBrokerAddressInPublish(finalStringbrokerName){HashMapLongbrokerId,Stringaddressmapthis。brokerAddrTable。get(brokerName);if(map!null!map。isEmpty()){returnmap。get(MixAll。MASTERID);}returnnull;}
  如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常if(nullbrokerAddr){1。1如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常tryToFindTopicPublishInfo(mq。getTopic());brokerAddrthis。mQClientFactory。findBrokerAddressInPublish(mq。getBrokerName());}
  2。为消息分配全局唯一ID为消息分配全局唯一IDif(!(msginstanceofMessageBatch)){MessageClientIDSetter。setUniqID(msg);}
  在RocketMQ消息发送请求与响应文章中,我们已经学习了请求参数中,创建了全局唯一的MsgId,可以回头看一看
  3。注册钩子消息发送钩子函数
  这里主要做了三件事情,确认MsgType类型、是否为延迟消息、调用钩子函数内的方法if(this。hasSendMessageHook()){contextnewSendMessageContext();context。setProducer(this);context。setProducerGroup(this。defaultMQProducer。getProducerGroup());context。setCommunicationMode(communicationMode);context。setBornHost(this。defaultMQProducer。getClientIP());context。setBrokerAddr(brokerAddr);context。setMessage(msg);context。setMq(mq);context。setNamespace(this。defaultMQProducer。getNamespace());StringisTransmsg。getProperty(MessageConst。PROPERTYTRANSACTIONPREPARED);3。1通过isTrans来确定MsgType类型if(true。equals(isTrans)){context。setMsgType(MessageType。TransMsgHalf);}3。2如果msg里面STARTDELIVERTIME或者DELAY不为空,则设置为延迟消息if(msg。getProperty(STARTDELIVERTIME)!nullmsg。getProperty(MessageConst。PROPERTYDELAYTIMELEVEL)!null){context。setMsgType(MessageType。DelayMsg);}3。3调用钩子函数里的方法this。executeSendMessageHookBefore(context);}
  4。设置发送信息请求头SendMessageRequestHeader
  最后根据默认发送方式,进行消息的发送
  主要利用NettyRemotingClient进行发送,这里就先不展开来说了入口:MQClientAPIImpl。sendMessage()问题答复消息发送者是如何做负载均衡的?默认采用轮询,每一个消息发送者全局会维护一个Topic上一次选择的队列,然后基于这个序号进行递增轮询AllocateMessageQueueAveragely平均分配,按照总数除以消费者个数进行,对每个消费者进行分配AllocateMessageQueueAveragelyByCircle轮流平均分配,按照消费者个数,进行轮询分配消息发送者是如何保证高可用的?在上面的步骤中通过TopicPublishInfo找到对应的MessageQueue下的,BrokerName信息,利用参数sendLatencyFaultEnable来开启关闭故障规避机制sendLatencyFaultEnable设置为true:开启延迟规避机制,一旦消息发送失败会将brokera悲观地认为在接下来的一段时间内该Broker不可用,在为未来某一段时间内所有的客户端不会向该Broker发送消息。使用本次消息发送延迟时间来计算Broker故障规避时长,不参与消息发送队列负载finalStringnotBestBrokerlatencyFaultTolerance。pickOneAtLeast();intwriteQueueNumstpInfo。getQueueIdByBroker(notBestBroker);if(writeQueueNums0){finalMessageQueuemqtpInfo。selectOneMessageQueue();if(notBestBroker!null){mq。setBrokerName(notBestBroker);mq。setQueueId(tpInfo。getSendWhichQueue()。incrementAndGet()writeQueueNums);}returnmq;}
  但是这样子做可能带来的后果是Broker没有可用的情况,或者是某个Broker数据激增,增加消费者的压力,所以默认不开启规避机制,遇到消息发送失败,规避brokera,但是在下一次消息发送时,即再次调用brokera。消息发送批量消息如何保证一致性的?将一个Topic下的消息,通过batch方法包一起发送客户端ID与使用陷阱
  摘自丁威老师的文章
  总结
  这段时间主要学习了RocketMQ的消息发送,主要是以源码为主,深入了解了消息发送的启动和消息发送的流程,以及认识到客户端ID与使用陷阱一图总结
  作者:叫我小郭
  链接:https:juejin。cnpost7105315713157431332

用华为5G手机壳就能逆袭?别高兴太早,分分钟是大坑hello我是你们的der(得儿)一个爱好数码、喜欢谈论时事热点,挖掘性价比东西的宅,der友们点个关注叭!前几天的一则重磅消息,可谓让很多华为用户沸腾!因为一款支……初中作文800字我的校园校园我们成长的地方,校园,我们学习生活的地方。以下是小编整理的初中作文800字我的校园,欢迎参考,希望对大家有所帮助!初中作文800字我的校园【1】校园,是孕育精英的摇篮……NASA哈勃望远镜显示,我们的宇宙正在发生奇怪的事情Nasa表示,哈勃太空望远镜在发现宇宙膨胀速度方面达到了一个新的里程碑,它支持了宇宙中正在发生奇怪事情的观点。近年来,天文学家利用哈勃等望远镜来准确了解我们的宇宙膨胀速度……每周吃两次鱼可以促进你的健康鱼类和其他海鲜是蛋白质和有益脂肪的健康来源。鱼是维生素和营养素的重要来源。它还有助于支持大脑和心血管健康。是什么让鱼健康?鱼的蛋白质含量高,脂肪和卡路里含量相……2007年武汉市中考满分作文学会留心系列二学会留心?留心一棵草,能感春天之生机;留心一朵花,能观天地之色彩;留心一片叶,能悟人生之哲理。?mdash;mdash;题记?在人生这个大舞台上,我们需要学会……勤奋,胜利的前夕作文800字常言道,rdquo;一分耕耘,一分收获。rdquo;有了辛勤的劳动,我们才会有成果,不劳而获的事情是不存在的。正如我校的体育健儿们,能在县运动会上取得优异的成绩就是他们勤奋的结……认识自己作文范文亲爱的孩子们,终于见到你们了!看着你们这一张张充满期待的小脸,我的内心是快乐的!我想,我们一定能够一起度过一段快乐无比的学习时光,你们信不信?现在,就请你仔细看看,站在你……乳腺结节能吃洋葱吗?为了乳腺健康,尽量少吃6类食物乳腺结节是一种常见的乳腺疾病,最近几年发病率是比较高的,其实大部分情况下都属于良性的病变,但是也有一些女性会面临着恶性病变。其实乳腺结节的出现与多方面的因素有关,比如遗传……动物园小学作文600字合集十篇在学习、工作、生活中,大家都接触过作文吧,作文根据体裁的不同可以分为记叙文、说明文、应用文、议论文。那么你知道一篇好的作文该怎么写吗?下面是小编帮大家整理的动物园小学作文600……写兔子的日记300字写兔子的日记300字怎么写?以下是小编整理的相关范文,欢迎阅读。写兔子的日记300字一今天我妈妈给我买一只小白兔,我特别高兴。早上,我看见小白兔全身都是白色的,圆圆……你是一道忧伤的风歌词〔ti:你是一道忧伤的风〕〔ar:林隆璇〕〔al:〕你是一道忧伤的风词:林隆璇曲:林隆璇演唱;林隆璇你是一道忧伤的风不愿轻易展笑……肺不好会有这四个表现!医生教你如何养肺润肺大家好,我是中医李大夫。肺,是重要的呼吸器官。肺功能,是衡量健康长寿的重要标志之一。因此,在一定程度上,存在着要长寿,先养肺之说。然而,肺部又被称为娇脏,直接与口鼻相连,……
1元钱面前的人生态度一天,在书上看到一道选择题,觉得很有意思,便把它带到单位,让同事做出选择:1、今天一次性给你100万元。2、今天给你1元,连续30天每天都给你前一天2倍的钱。……穿越天下(2)2,穿越ldquo;枫儿姐,小姐到现在还是没有醒,她该不会是hellip;hellip;rdquo;ldquo;胡说!我们的小姐不可能死的rdquo;此时的另一旁h……二年级想象作文我一颗大门牙我是一颗门牙,在小主人的嘴巴里呆了快9年了,最近我觉得自己越来越不中用了,每次小主人咬她爱吃的苹果时,我都哆嗦得无法站稳身子,就像是一个站不稳的老爷爷。小主人有时用手摇动我一下……春天的声音听听,春天到了,万物复苏的时间到了,春姑娘带着她的合唱团来了。叮咚,听到了吗?冰凌像一位钢琴家,每一个水珠都是一个音符,一大串音符串出了一曲优美的乐曲。这声音唤醒了春天,……描写长城的英语作文thegreatwallwasrenovatedfromtimetotimeaftertheqindynasty。amajorrenovationstartedwiththef……她小学生优秀作文500字红颜老了少年心,琴弦断了旧知音。题记那已经是多年前的事了,可如今想起来,仍是记忆犹新。忘不了那个令我伤感的傍晚。最要好的一个朋友忽然打电话来约我出去,说是有急事。我……葛优老婆长得太路人!一家三口合体,她打扮朴素一看就是老实人针对不同的场合,服饰风格的定位也明显不同,像在公众镜头前的明星们,基本都有专门的造型团队来塑造精致的形象,不过脱离了镜头与身份的束缚后,他们也更愿意以一种没有包袱的形象来享受生……老实人被造谣!中国女排35岁功勋终于发声至今未婚,都是谣言在网络上营销号造谣带节奏的行为一向让人深恶痛绝,尤其是给事件的主人公带来不少困扰和伤害。不知道是不是最近没有娱乐新闻可以写,一些营销号甚至开始找体育届的运动员下手啦,造谣曾经的……实用的感恩母亲作文400字汇编6篇在平凡的学习、工作、生活中,大家都不可避免地要接触到作文吧,作文根据体裁的不同可以分为记叙文、说明文、应用文、议论文。你知道作文怎样才能写的好吗?以下是小编为大家整理的感恩母亲……奥尼尔儿子将试训湖人今年22岁曾接受心脏手术据湖人记者约万布哈报道,传奇球星沙奎尔奥尼尔之子沙里夫奥尼尔将试训洛杉矶湖人队。布哈表示,沙里夫今天已经参加了雄鹿队的试训,除了湖人之外,他还将为骑士、老鹰、奇才等队试训……中秋趣事五年级作文中秋趣事五年级作文1中秋节快到了,令人开心的是:这次月饼我要自己亲手做!我迫不及待地想体验一下。我想月饼做出来肯定特别精致,做完以后发到朋友圈,一定会引来大规模的点赞哈哈……2017年为你而歌观后感为你而歌唱响了时代的主旋律,充分展现了当代共产党员的优秀风范,强烈激发了共产党员的荣誉感和自豪感。yuwenmi小编整理了相关观后感,欢迎欣赏与借鉴。2017年《为你而歌》观后……
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网