范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

mq从零开始实现mq生产者消费者启动

  MQ 是什么?
  MQ(Message Queue)消息队列,是基础数据结构中"先进先出"的一种数据结构。
  指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。
  消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。 MQ 的作用?
  消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
  解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
  削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
  ps: 以上内容摘选自百科。 实现 mq 的准备工作maven 引入     io.netty     netty-all     4.1.42.Final        com.alibaba     fastjson     1.2.76  模块划分
  The message queue in java. 作为 mq 的从零开始的学习项目,目前已开源。
  项目的模块如下:
  模块
  说明
  mq-common
  公共代码
  mq-broker
  注册中心
  mq-producer
  消息生产者
  mq-consumer
  消息消费者 消息消费者接口定义package com.github.houbb.mq.consumer.api;  /**  * @author binbin.hou  * @since 1.0.0  */ public interface IMqConsumer {      /**      * 订阅      * @param topicName topic 名称      * @param tagRegex 标签正则      */     void subscribe(String topicName, String tagRegex);      /**      * 注册监听器      * @param listener 监听器      */     void registerListener(final IMqConsumerListener listener);  }
  IMqConsumerListener   作为消息监听类的接口,定义如下:public interface IMqConsumerListener {       /**      * 消费      * @param mqMessage 消息体      * @param context 上下文      * @return 结果      */     ConsumerStatus consumer(final MqMessage mqMessage,                             final IMqConsumerListenerContext context);  }
  ConsumerStatus 代表消息消费的几种状态。 消息体
  启动消息体 MqMessage 定义如下: package com.github.houbb.mq.common.dto;  import java.util.Arrays; import java.util.List;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqMessage {      /**      * 标题名称      */     private String topic;      /**      * 标签      */     private List tags;      /**      * 内容      */     private byte[] payload;      /**      * 业务标识      */     private String bizKey;      /**      * 负载分片标识      */     private String shardingKey;      // getter&setter&toString  } push 消费者策略实现
  消费者启动的实现如下: /**  * 推送消费策略  *  * @author binbin.hou  * @since 1.0.0  */ public class MqConsumerPush extends Thread implements IMqConsumer  {      // 省略...      @Override     public void run() {         // 启动服务端         log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",                 groupName, port, brokerAddress);          EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             ServerBootstrap serverBootstrap = new ServerBootstrap();             serverBootstrap.group(workerGroup, bossGroup)                     .channel(NioServerSocketChannel.class)                     .childHandler(new ChannelInitializer() {                         @Override                         protected void initChannel(Channel ch) throws Exception {                             ch.pipeline().addLast(new MqConsumerHandler());                         }                     })                     // 这个参数影响的是还没有被accept 取出的连接                     .option(ChannelOption.SO_BACKLOG, 128)                     // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                     .childOption(ChannelOption.SO_KEEPALIVE, true);              // 绑定端口,开始接收进来的链接             ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();             log.info("MQ 消费者启动完成,监听【" + port + "】端口");              channelFuture.channel().closeFuture().syncUninterruptibly();             log.info("MQ 消费者关闭完成");         } catch (Exception e) {             log.error("MQ 消费者启动异常", e);             throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);         } finally {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }       }       // 省略...  }
  ps: 初期我们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。 MqConsumerHandler 处理类
  这个类是一个空的实现。 public class MqConsumerHandler extends SimpleChannelInboundHandler {      @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {         //nothing     }  } 测试代码MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start();
  启动日志: [DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using "class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl" adapter. [INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:  [INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口 消息生产者接口定义
  最基本的消息发送接口。 package com.github.houbb.mq.producer.api;  import com.github.houbb.mq.common.dto.MqMessage; import com.github.houbb.mq.producer.dto.SendResult;  /**  * @author binbin.hou  * @since 1.0.0  */ public interface IMqProducer {      /**      * 同步发送消息      * @param mqMessage 消息类型      * @return 结果      */     SendResult send(final MqMessage mqMessage);      /**      * 单向发送消息      * @param mqMessage 消息类型      * @return 结果      */     SendResult sendOneWay(final MqMessage mqMessage);  } 生产者实现
  MqProducer 启动的实现如下,基于 netty。 package com.github.houbb.mq.producer.core;  /**  * 默认 mq 生产者  * @author binbin.hou  * @since 1.0.0  */ public class MqProducer extends Thread implements IMqProducer {      //省略...      @Override     public void run() {         // 启动服务端         log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",                 groupName, port, brokerAddress);          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             Bootstrap bootstrap = new Bootstrap();             ChannelFuture channelFuture = bootstrap.group(workerGroup)                     .channel(NioSocketChannel.class)                     .option(ChannelOption.SO_KEEPALIVE, true)                     .handler(new ChannelInitializer(){                         @Override                         protected void initChannel(Channel ch) throws Exception {                             ch.pipeline()                                     .addLast(new LoggingHandler(LogLevel.INFO))                                     .addLast(new MqProducerHandler());                         }                     })                     .connect("localhost", port)                     .syncUninterruptibly();              log.info("MQ 生产者启动客户端完成,监听端口:" + port);             channelFuture.channel().closeFuture().syncUninterruptibly();             log.info("MQ 生产者开始客户端已关闭");         } catch (Exception e) {             log.error("MQ 生产者启动遇到异常", e);             throw new MqException(ProducerRespCode.RPC_INIT_FAILED);         } finally {             workerGroup.shutdownGracefully();         }     }      //省略... } MqProducerHandler 处理类
  默认的空实现,什么都不做。 package com.github.houbb.mq.producer.handler;  import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqProducerHandler extends SimpleChannelInboundHandler {      @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {         //do nothing now     }  } 启动代码MqProducer mqProducer = new MqProducer(); mqProducer.start();
  启动日志: [DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using "class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl" adapter. [INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress:  四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x5cb48145] REGISTERED 四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect 信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527 四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE [INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527 小结
  基于 netty 最基本的服务端启动、客户端启动到这里就结束了。
  千里之行,始于足下。
  我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。
  希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
  我是老马,期待与你的下次重逢。 开源地址
  The message queue in java.(java 简易版本 mq 实现) : https://github.com/houbb/mq  拓展阅读
  rpc-从零开始实现 rpc: https://github.com/houbb/rpc

斩获最长电漂吉尼斯世界纪录TM荣誉新世界驾控旗舰智己L7正式上市,官宣售价36。88万元40。88万元日前,智己汽车公布新世界驾控旗舰智己L7以连续漂移43。646公里的成绩,成功摘冠电动汽车漂移距离最长吉尼斯世界纪录TM荣誉,为快速变革的电动化技术彻底颠覆传统豪华车游戏规则,重新全球科技品牌排名前十,中国有4个,美国五个,韩国一个科技为人类带来了巨大的好处,科技极大提升了全球产业生产力,以前全球有价值的科技品牌几乎都来自于美国,然而现在全球前十的科技品牌却不单单只有美国,当网友们知道真相后,才知道原来中国科望石智慧原创技术取得重要成果AI制药行业获持续关注近日,望石智慧在核心期刊JCIM以副封面形式发表了原创技术成果一整套以大分子实验电子密度为数据基础的AIDD技术体系,并涵盖了基于靶点口袋的3D分子生成技术和基于高精度能量计算的高22家上市券商去年信息技术投入均超1亿元行业数字化转型加速本报记者周尚伃无科技不金融。证券行业数字化转型加速推进,金融科技正从服务业务向引领业务和赋能业务转变。据证券日报记者不完全统计,目前已有22家上市券商披露2021年信息技术投入情况小米全新MIUI首曝专为小内存打造如今MIUI功能逐渐增多,对于入门机型不太友好,不少网友希望化繁为简。其实MIUI前几年就开始针对小内存入门机型进行优化,比如减少动画特效,以减轻CPU压力,但是显然还不够。4月1洗衣服太费电?变频节能更省电对于大多数人来说,洗衣服是一件很麻烦的事情,不仅浪费时间,而且还会损伤衣物。那么不如选择一款全自动滚筒洗衣机吧,它采用模仿棒锤击打衣物的原理设计,能有效减少衣物磨损,让衣物洁净如新vivoPad体验娱乐刷剧只是基本功,轻移动办公神器前言平板电脑曾经是很多人对于移动办公和大屏影音娱乐中心的完美畅享。对比手机,它拥有更大的显示面积和键鼠搭配,对比电脑,它轻薄便携还能长续航。在理想状态下,不管是轻办公还是追剧打游戏小鹏车主使用辅助驾驶撞车,究竟谁来背锅?自动辅助驾驶再曝安全事故,小鹏汽车和车主陷入罗生门。日前,小鹏汽车车主邓先生发布视频维权,称在国道上开启自动辅助驾驶行驶十几公里后,前方道路出现一辆侧翻车辆,自己驾驶的P7没有任何筋膜枪,缓解疲劳神器!云康宝筋膜枪使用体验分享楼主最近有开始运动,健身环跳绳,这运动大神眼里这不算啥,但是对于运动小白来说,这个运动量已经是楼主能承受的极限了。每次运动完,虽然都有拉伸一下,但也会觉得肌肉紧绷,不是很舒服。和身跑分秒杀骁龙888?GPDXPPLUS凭啥称为最强游戏掌机?要说最近全网关注度最高的游戏掌机是哪款?相信关注数码圈的机友都已知道了,不是任天堂,不是索尼,更也不是微软,而是GPD近期推出的GPDXPPLUS。基于GPDXP全新升级的安卓掌机应采儿夫妇代言索爱256GB仅879元,性价比从未输过提起性价比手机,大家一定想起小米,但是小米已经开始走高端市场,起步价已经很高,不过旗下的红米手机,就是性价比很高,雷军表示,小米的存在就是为了消灭山寨机,然而山寨机一直都存在,从未
教程15国产编程语言Cbrother异常捕获CBrother运行时出现了异常,会直接将抛出,停止当前线程工作。如果开发者没有主动捕获异常,则会被CBrother解释器最终捕获并输出。trycatchCBrother通过try数字化不是简单数据化丨长江评论长江日报评论员鲁珊数字政府和数字治理已经成为当下热门词汇,数字赋能智慧治理被公认为城市发展大势所趋。不过,数字化是不是简单数据化?到2021年,全国省级层面有12个地区成立了大数据今年快递增加到一千亿件的原因分析我们每年的快递量几乎都是暴涨的,今年已经首次突破一千亿件,个人觉得,跟以下几个因素有关,首先,还是物美价廉,这是因为网上开店没有最重要的门店租金成本,而租金成本几乎占了实体店成本的聪明的快递之极兔快递十个月疯狂烧钱两百亿十个月就走完了四通一达老大中通16年的路逼三通一达合力围剿,让顺丰股价一夜暴跌,他就是快递界大名鼎鼎的搅局者极兔速递,将马云和刘强东两个大佬玩弄于股掌之间,和拼2021年进入尾声,多款骁龙888旗舰降价清仓,你准备好捡漏了吗?不知不觉已经到了2021年年底,随着骁龙8Gen1新一代芯片的发布,有奖有一波新旗舰抵达战场。在此种情况下,各大手机品牌也开启了年底清仓模式,那些想要在2021年年末买手机的朋友可华为跌倒,苹果吃饱,捞金没问题但要讲武德华为遭遇到断芯后,市场份额一路下滑,营业收入也顺流直下,下滑十分严重。反观华为的主要竞争对手苹果,却赚得盆满钵满,在中国市场开心地捞金,这不正好应了余承东的那句华为跌倒,苹果吃饱吗荣耀60Pro影像效果实测与简介大家应该知道,影响一部手机的成像质量,大致可归结为镜头光圈影像传感器图像计算引擎处理器等。手机镜头有用途与品质等分类。按照用途分类有广角镜头微距镜头长焦镜头等。按品质分类又有玻璃镜孟祥飞用超算推动一个加速创新的中国每秒可计算百亿亿次的超级计算机正在思考的问题,关乎宇宙演化自然灾害人工智能以及生命健康等各个领域百亿级高效高通量虚拟药物筛选全尺寸航空航天飞行器超百亿网格计算流体力学模拟全球尺度地交子现代版外媒称中国数字货币发展历程值得借鉴智利美洲经济网站近日刊载了一篇题为我们做好准备迎接数字货币了吗?来自中国的答案是肯定的的文章,以专家视角分析中国数字货币市场潜力,现将文章摘编如下你能想象马可波罗描述虚拟货币的场景中国掀起抵制英特尔浪潮后,印度抛出橄榄枝,砸百亿邀英特尔建厂随着美国总统拜登签署总统令,一项闹得沸沸扬扬的涉疆法案写进了美国宪法里。根据该法案,美国企业将不能进口来自新疆地区的任何产品,除非能提供证据,证明所使用的产品不是强迫劳动生产出来的如果马云的公司倒闭了,会对我们产生多大影响?没任何影响,一切正常,比现在更好。普天同庆,皆大欢喜不仅毫无影响,可能还能促进社会繁荣,因为会向社会输送一大批亿万千万富翁可能去创业带动更多就业,聚是一团火,散是满天星,阿里反正也