保健励志美文体育育儿作文
投稿投诉
作文动态
热点娱乐
育儿情感
教程科技
体育养生
教案探索
美文旅游
财经日志
励志范文
论文时尚
保健游戏
护肤业界

mq从零开始实现mq生产者消费者启动

  MQ是什么?
  MQ(MessageQueue)消息队列,是基础数据结构中先进先出的一种数据结构。
  指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递生产者产生消息并把消息放入队列,然后由消费者去处理。
  消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。MQ的作用?
  消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
  解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
  削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
  ps:以上内容摘选自百科。实现mq的准备工作maven引入dependencygroupIdio。nettygroupIdnettyallartifactIdversion4。1。42。FinalversiondependencydependencygroupIdcom。alibabagroupIdfastjsonartifactIdversion1。2。76versiondependency模块划分
  Themessagequeueinjava。作为mq的从零开始的学习项目,目前已开源。
  项目的模块如下:
  模块
  说明
  mqcommon
  公共代码
  mqbroker
  注册中心
  mqproducer
  消息生产者
  mqconsumer
  消息消费者消息消费者接口定义packagecom。github。houbb。mq。consumer。api;authorbinbin。housince1。0。0publicinterfaceIMqConsumer{订阅paramtopicNametopic名称paramtagRegex标签正则voidsubscribe(StringtopicName,StringtagRegex);注册监听器paramlistener监听器voidregisterListener(finalIMqConsumerListenerlistener);}
  IMqConsumerListener作为消息监听类的接口,定义如下:publicinterfaceIMqConsumerListener{消费parammqMessage消息体paramcontext上下文return结果ConsumerStatusconsumer(finalMqMessagemqMessage,finalIMqConsumerListenerContextcontext);}
  ConsumerStatus代表消息消费的几种状态。消息体
  启动消息体MqMessage定义如下:packagecom。github。houbb。mq。common。dto;importjava。util。Arrays;importjava。util。List;authorbinbin。housince1。0。0publicclassMqMessage{标题名称privateStringtopic;标签privateListStringtags;内容privatebyte〔〕payload;业务标识privateStringbizKey;负载分片标识privateStringshardingKey;gettersettertoString}push消费者策略实现
  消费者启动的实现如下:推送消费策略authorbinbin。housince1。0。0publicclassMqConsumerPushextendsThreadimplementsIMqConsumer{省略。。。Overridepublicvoidrun(){启动服务端log。info(MQ消费者开始启动服务端groupName:{},port:{},brokerAddress:{},groupName,port,brokerAddress);EventLoopGroupbossGroupnewNioEventLoopGroup();EventLoopGroupworkerGroupnewNioEventLoopGroup();try{ServerBootstrapserverBootstrapnewServerBootstrap();serverBootstrap。group(workerGroup,bossGroup)。channel(NioServerSocketChannel。class)。childHandler(newChannelInitializerChannel(){OverrideprotectedvoidinitChannel(Channelch)throwsException{ch。pipeline()。addLast(newMqConsumerHandler());}})这个参数影响的是还没有被accept取出的连接。option(ChannelOption。SOBACKLOG,128)这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着。。childOption(ChannelOption。SOKEEPALIVE,true);绑定端口,开始接收进来的链接ChannelFuturechannelFutureserverBootstrap。bind(port)。syncUninterruptibly();log。info(MQ消费者启动完成,监听【port】端口);channelFuture。channel()。closeFuture()。syncUninterruptibly();log。info(MQ消费者关闭完成);}catch(Exceptione){log。error(MQ消费者启动异常,e);thrownewMqException(ConsumerRespCode。RPCINITFAILED);}finally{workerGroup。shutdownGracefully();bossGroup。shutdownGracefully();}}省略。。。}
  ps:初期我们把consumer作为服务端,后续引入broker则只有broker是服务端。MqConsumerHandler处理类
  这个类是一个空的实现。publicclassMqConsumerHandlerextendsSimpleChannelInboundHandler{OverrideprotectedvoidchannelRead0(ChannelHandlerContextchannelHandlerContext,Objectobject)throwsException{nothing}}测试代码MqConsumerPushmqConsumerPushnewMqConsumerPush();mqConsumerPush。start();
  启动日志:〔DEBUG〕〔2022042119:16:41。343〕〔main〕〔c。g。h。l。i。c。LogFactory。setImplementation〕Logginginitializedusingclasscom。github。houbb。log。integration。adaptors。stdout。StdOutExImpladapter。〔INFO〕〔2022042119:16:41。356〕〔Thread0〕〔c。g。h。m。c。c。MqConsumerPush。run〕MQ消费者开始启动服务端groupName:CDEFAULTGROUPNAME,port:9527,brokerAddress:〔INFO〕〔2022042119:16:43。196〕〔Thread0〕〔c。g。h。m。c。c。MqConsumerPush。run〕MQ消费者启动完成,监听【9527】端口消息生产者接口定义
  最基本的消息发送接口。packagecom。github。houbb。mq。producer。api;importcom。github。houbb。mq。common。dto。MqMessage;importcom。github。houbb。mq。producer。dto。SendResult;authorbinbin。housince1。0。0publicinterfaceIMqProducer{同步发送消息parammqMessage消息类型return结果SendResultsend(finalMqMessagemqMessage);单向发送消息parammqMessage消息类型return结果SendResultsendOneWay(finalMqMessagemqMessage);}生产者实现
  MqProducer启动的实现如下,基于netty。packagecom。github。houbb。mq。producer。core;默认mq生产者authorbinbin。housince1。0。0publicclassMqProducerextendsThreadimplementsIMqProducer{省略。。。Overridepublicvoidrun(){启动服务端log。info(MQ生产者开始启动客户端GROUP:{},PORT:{},brokerAddress:{},groupName,port,brokerAddress);EventLoopGroupworkerGroupnewNioEventLoopGroup();try{BootstrapbootstrapnewBootstrap();ChannelFuturechannelFuturebootstrap。group(workerGroup)。channel(NioSocketChannel。class)。option(ChannelOption。SOKEEPALIVE,true)。handler(newChannelInitializerChannel(){OverrideprotectedvoidinitChannel(Channelch)throwsException{ch。pipeline()。addLast(newLoggingHandler(LogLevel。INFO))。addLast(newMqProducerHandler());}})。connect(localhost,port)。syncUninterruptibly();log。info(MQ生产者启动客户端完成,监听端口:port);channelFuture。channel()。closeFuture()。syncUninterruptibly();log。info(MQ生产者开始客户端已关闭);}catch(Exceptione){log。error(MQ生产者启动遇到异常,e);thrownewMqException(ProducerRespCode。RPCINITFAILED);}finally{workerGroup。shutdownGracefully();}}省略。。。}MqProducerHandler处理类
  默认的空实现,什么都不做。packagecom。github。houbb。mq。producer。handler;importio。netty。channel。ChannelHandlerContext;importio。netty。channel。SimpleChannelInboundHandler;authorbinbin。housince1。0。0publicclassMqProducerHandlerextendsSimpleChannelInboundHandler{OverrideprotectedvoidchannelRead0(ChannelHandlerContextchannelHandlerContext,Objectobject)throwsException{donothingnow}}启动代码MqProducermqProducernewMqProducer();mqProducer。start();
  启动日志:〔DEBUG〕〔2022042119:17:11。960〕〔main〕〔c。g。h。l。i。c。LogFactory。setImplementation〕Logginginitializedusingclasscom。github。houbb。log。integration。adaptors。stdout。StdOutExImpladapter。〔INFO〕〔2022042119:17:11。974〕〔Thread0〕〔c。g。h。m。p。c。MqProducer。run〕MQ生产者开始启动客户端GROUP:PDEFAULTGROUPNAME,PORT:9527,brokerAddress:四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerchannelRegistered信息:〔id:0x5cb48145〕REGISTERED四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerconnect信息:〔id:0x5cb48145〕CONNECT:localhost127。0。0。1:9527四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerchannelActive信息:〔id:0x5cb48145,L:127。0。0。1:57740R:localhost127。0。0。1:9527〕ACTIVE〔INFO〕〔2022042119:17:13。833〕〔Thread0〕〔c。g。h。m。p。c。MqProducer。run〕MQ生产者启动客户端完成,监听端口:9527小结
  基于netty最基本的服务端启动、客户端启动到这里就结束了。
  千里之行,始于足下。
  我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。
  希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
  我是老马,期待与你的下次重逢。开源地址
  Themessagequeueinjava。(java简易版本mq实现):https:github。comhoubbmq拓展阅读
  rpc从零开始实现rpc:https:github。comhoubbrpc

我与杨树共成长的作文它没有松树那么能够经受艰难困苦;没有梧桐树那么自树一帜;没有柿树那么盘虬卧龙,但它有着自己的坚强不屈。它可以打家具,可以做屋檩栋梁,可以制作农具,在一些人眼中,几乎会忘记……照片里叙事作文每当我翻开那一本厚厚的相册,总会忍不住笑起来。有一张照片,在我的记忆里最深刻。那张照片是我5岁时拍的。有一天,我和奶奶还有麦兜去康店河堤上玩。那里有个沙滩迷宫,去到那里,……我学会了炒菜作文400字我最爱吃的菜是可乐鸡翅,正巧赶上国庆放假,于是妈妈便教我如何做这道菜。准备工作开始了。鸡翅十根、可乐一瓶,还有葱花、姜丝和几瓣蒜。首先切葱花,将洗干净的京葱用左手拿住,右……新年连连看(一2月18号19号)第一集我今天早上发现家里不寻常的气息,家人忙忙碌碌的,一副恨不得时间停止不走的样子mdash;mdash;从今天就开始一直做饭菜,而且,原来不会这么早做菜的呢,我看了一眼……要豪华还是要绿牌别错过奇瑞小蚂蚁在这几年里,汽车市场的发展越来越快,我们也看到了很多汽车品牌的新能源汽车产品上市。但其实对于国内的新能源汽车市场来说,早在2015年就已经开始了,而豪华品牌的加入,让这场新能源……描写母爱的作文愿爱在温馨中绽放你的成长是刻在她额头的皱纹,你的放纵是印在她眉心的不安,你的欢乐是画在她眼角的鱼尾,你的幸福是染在她头上的白发。不管有多忙,不管在哪里,今天请一定记得对她说:我爱你,妈妈!……第一次领奖的幻想小学作文人人都有第一次,例如:第一次洗衣服、第一次煮饭等等。今天是一个我既高兴又兴奋的一天,因为今天是我第一次听周老师在电话里对我说:唐明鸿,你作文获得了龙凤杯奖,而且还是二等奖……篮球搭起的桥500字作文今年暑假,我回到了我的老家建瓯,爱运动的我,每天都要到体育场打篮球。去体育场的第一天,我便在体育场门口看到了我小学时的死对头小周,他也是到体育场打篮球的。说起我和小周的纠……一个平民对宇宙的看法从美国科学家发现宇宙大爆炸的余晖宇宙微波背景辐射,接着发射宇宙微波背景辐射太空探测器,探测到比绝对零度负273度之上2。7度,宇宙余晖温度也不均匀,有些星系测得比绝对零度负27……三个月内三次自燃,威马电动车再发生起火事故4月18日,海口市内一辆威马EX5在充电完成几分钟后突然冒烟起火。据了解,起火处位于底盘,燃烧中还伴随着轮胎爆炸声。目前威马汽车官方尚未对此进行回复,而该车辆起火所处充电站目前……工信部将面向脱贫户老年人残疾人等群体提供更多资费优惠人民网北京4月19日电(记者申佳平)4月19日,在国务院新闻办公室召开的发布会上,工业和信息化部新闻发言人、信息通信管理局局长赵志国表示,总的来看一季度信息通信服务水平稳中趋好……前瞻性的易流科技,助力货运企业数字化升级对于我们货运企业来说企业升级是一个很重要的事情,因为只有将企业升级才能够让自己在激烈的竞争中留住属于自己的那一份蛋糕,或者有实力去争夺更多的蛋糕。其实现在企业升级最重要的……
红的黄的蓝的绿的,JBLPULSE4音乐脉动四代好听的声音看红的黄的,紫的绿的蓝的,各种颜色任我选择。。我总是在说,音乐在不同设备之间的呈现就像潘多拉的魔盒,打开是惊是喜只需一耳就能分辨。而对于爱音乐的人来说,这趟寻谜的过程又是有……苹果手机这么贵为什么用户这么多?生活中,手机已经成为人们必不可少的工具之一。很多人出门忘记携带钥匙都不会忘记携带手机。作为生活的必需品,苹果手机价格这么高,这么多人黑,它的销量和使用人群为什么一直是最高……减肥第九天今天折腾游戏昨天跳绳,今天小腿肚和两边手臂特别酸,决定今天不运动了,上楼都疼,哈哈,称下体重还行,还是有效果的今天准备折腾折腾游戏,拿出很久以前的采集卡,圆刚gc550因为电脑……8个你一定用得上的电脑技巧,对办公很有帮助,建议收藏电脑已经是大多数办公人士无法拒绝的一款工具,我们在办公时的任何操作都是离不开电脑的,但想要熟练使用电脑,不是光靠复制粘贴就可以熟练地,接下来的这8个电脑必学知识,或许可以帮到你……小米有品上新野小兽高颜值划船机,在家也能高效健身恭喜我经过一夏天的吃喝玩乐,现在又胖了,一年一年的长肉真是受不了。本来想去办健身卡,被朋友给拔草了,不仅健身的人多,每个健身设备都需要排队,而且还有健身教练,一直推销健身……没有方向盘?最便宜的特斯拉有望2023年发布售价16万特斯拉打算推出售价2。5万美元的电动车已经不是什么秘密了,那么这款电动车究竟在什么时候发布呢?外媒给出了最新的说法。外媒援引消息人士的爆料称,特斯拉有望于2023年开始生……iPhoneSE3后置摄像模组浴霸设计,刘海设计被改造,网友笔歌科技独家报道:苹果iPhoneSE3方面消息,外媒曝光了一组iPhoneSE3的设计概念图,iPhoneSE3后置摄像模组继承当前浴霸设计,刘海设计也被改造,网上有瞬间炸开……退款就拉黑?一团伙闲鱼低价卖二手iPhone诈骗141万【手机中国新闻】对于一部分人来说,买一台全新的iPhone手机可能预算不够,所以去二手平台入手一部二手iPhone不失为一种选择。但购买二手手机往往不省心,甚至可能遭受诈骗。……小米有品好物清单年轻人的第一把茶壶健腹轮进入智能化时代小米MIX4的发布会结尾,雷军放出一个意料之外的惊喜:回馈小米手机1首批用户,赠送等值小米手机1的1999元小米商城无门槛红包。作为小米手机1的首批用户,我自然也得到了这份红包……手机店老板的建议,这四款才是换机首选,你买对了吗?当下手机市场上的手机可以说是越来越多,很多人为此也是陷入了选择手机的苦恼,其实在这么多的产品当中能够买到适合自己的机型就很不错了,下面不妨给大家介绍一下当下口碑不错的几款手机,……又在他城忆起你品学网专稿未经允许不得转载叶落知秋的季节,遗忘的是你如花的笑靥,笔下的文字许是生疏,竟,绘不出你的容颜。跌跌撞撞,搁不下的牵绊,错乱浮生,解……有一种爱叫做感恩高中作文人世间,最浓的不是血,而是一种亲情,一种有着血缘关系的爱;最值得眷恋的不是物,而是一种刻骨铭心的真爱,一种凝聚崇高的关爱。题记生活在一个充满爱的家庭里我觉得有父母的关爱、……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网