MQ是什么? MQ(MessageQueue)消息队列,是基础数据结构中先进先出的一种数据结构。 指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递生产者产生消息并把消息放入队列,然后由消费者去处理。 消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。MQ的作用? 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。 异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。 ps:以上内容摘选自百科。实现mq的准备工作maven引入dependencygroupIdio。nettygroupIdnettyallartifactIdversion4。1。42。FinalversiondependencydependencygroupIdcom。alibabagroupIdfastjsonartifactIdversion1。2。76versiondependency模块划分 Themessagequeueinjava。作为mq的从零开始的学习项目,目前已开源。 项目的模块如下: 模块 说明 mqcommon 公共代码 mqbroker 注册中心 mqproducer 消息生产者 mqconsumer 消息消费者消息消费者接口定义packagecom。github。houbb。mq。consumer。api;authorbinbin。housince1。0。0publicinterfaceIMqConsumer{订阅paramtopicNametopic名称paramtagRegex标签正则voidsubscribe(StringtopicName,StringtagRegex);注册监听器paramlistener监听器voidregisterListener(finalIMqConsumerListenerlistener);} IMqConsumerListener作为消息监听类的接口,定义如下:publicinterfaceIMqConsumerListener{消费parammqMessage消息体paramcontext上下文return结果ConsumerStatusconsumer(finalMqMessagemqMessage,finalIMqConsumerListenerContextcontext);} ConsumerStatus代表消息消费的几种状态。消息体 启动消息体MqMessage定义如下:packagecom。github。houbb。mq。common。dto;importjava。util。Arrays;importjava。util。List;authorbinbin。housince1。0。0publicclassMqMessage{标题名称privateStringtopic;标签privateListStringtags;内容privatebyte〔〕payload;业务标识privateStringbizKey;负载分片标识privateStringshardingKey;gettersettertoString}push消费者策略实现 消费者启动的实现如下:推送消费策略authorbinbin。housince1。0。0publicclassMqConsumerPushextendsThreadimplementsIMqConsumer{省略。。。Overridepublicvoidrun(){启动服务端log。info(MQ消费者开始启动服务端groupName:{},port:{},brokerAddress:{},groupName,port,brokerAddress);EventLoopGroupbossGroupnewNioEventLoopGroup();EventLoopGroupworkerGroupnewNioEventLoopGroup();try{ServerBootstrapserverBootstrapnewServerBootstrap();serverBootstrap。group(workerGroup,bossGroup)。channel(NioServerSocketChannel。class)。childHandler(newChannelInitializerChannel(){OverrideprotectedvoidinitChannel(Channelch)throwsException{ch。pipeline()。addLast(newMqConsumerHandler());}})这个参数影响的是还没有被accept取出的连接。option(ChannelOption。SOBACKLOG,128)这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着。。childOption(ChannelOption。SOKEEPALIVE,true);绑定端口,开始接收进来的链接ChannelFuturechannelFutureserverBootstrap。bind(port)。syncUninterruptibly();log。info(MQ消费者启动完成,监听【port】端口);channelFuture。channel()。closeFuture()。syncUninterruptibly();log。info(MQ消费者关闭完成);}catch(Exceptione){log。error(MQ消费者启动异常,e);thrownewMqException(ConsumerRespCode。RPCINITFAILED);}finally{workerGroup。shutdownGracefully();bossGroup。shutdownGracefully();}}省略。。。} ps:初期我们把consumer作为服务端,后续引入broker则只有broker是服务端。MqConsumerHandler处理类 这个类是一个空的实现。publicclassMqConsumerHandlerextendsSimpleChannelInboundHandler{OverrideprotectedvoidchannelRead0(ChannelHandlerContextchannelHandlerContext,Objectobject)throwsException{nothing}}测试代码MqConsumerPushmqConsumerPushnewMqConsumerPush();mqConsumerPush。start(); 启动日志:〔DEBUG〕〔2022042119:16:41。343〕〔main〕〔c。g。h。l。i。c。LogFactory。setImplementation〕Logginginitializedusingclasscom。github。houbb。log。integration。adaptors。stdout。StdOutExImpladapter。〔INFO〕〔2022042119:16:41。356〕〔Thread0〕〔c。g。h。m。c。c。MqConsumerPush。run〕MQ消费者开始启动服务端groupName:CDEFAULTGROUPNAME,port:9527,brokerAddress:〔INFO〕〔2022042119:16:43。196〕〔Thread0〕〔c。g。h。m。c。c。MqConsumerPush。run〕MQ消费者启动完成,监听【9527】端口消息生产者接口定义 最基本的消息发送接口。packagecom。github。houbb。mq。producer。api;importcom。github。houbb。mq。common。dto。MqMessage;importcom。github。houbb。mq。producer。dto。SendResult;authorbinbin。housince1。0。0publicinterfaceIMqProducer{同步发送消息parammqMessage消息类型return结果SendResultsend(finalMqMessagemqMessage);单向发送消息parammqMessage消息类型return结果SendResultsendOneWay(finalMqMessagemqMessage);}生产者实现 MqProducer启动的实现如下,基于netty。packagecom。github。houbb。mq。producer。core;默认mq生产者authorbinbin。housince1。0。0publicclassMqProducerextendsThreadimplementsIMqProducer{省略。。。Overridepublicvoidrun(){启动服务端log。info(MQ生产者开始启动客户端GROUP:{},PORT:{},brokerAddress:{},groupName,port,brokerAddress);EventLoopGroupworkerGroupnewNioEventLoopGroup();try{BootstrapbootstrapnewBootstrap();ChannelFuturechannelFuturebootstrap。group(workerGroup)。channel(NioSocketChannel。class)。option(ChannelOption。SOKEEPALIVE,true)。handler(newChannelInitializerChannel(){OverrideprotectedvoidinitChannel(Channelch)throwsException{ch。pipeline()。addLast(newLoggingHandler(LogLevel。INFO))。addLast(newMqProducerHandler());}})。connect(localhost,port)。syncUninterruptibly();log。info(MQ生产者启动客户端完成,监听端口:port);channelFuture。channel()。closeFuture()。syncUninterruptibly();log。info(MQ生产者开始客户端已关闭);}catch(Exceptione){log。error(MQ生产者启动遇到异常,e);thrownewMqException(ProducerRespCode。RPCINITFAILED);}finally{workerGroup。shutdownGracefully();}}省略。。。}MqProducerHandler处理类 默认的空实现,什么都不做。packagecom。github。houbb。mq。producer。handler;importio。netty。channel。ChannelHandlerContext;importio。netty。channel。SimpleChannelInboundHandler;authorbinbin。housince1。0。0publicclassMqProducerHandlerextendsSimpleChannelInboundHandler{OverrideprotectedvoidchannelRead0(ChannelHandlerContextchannelHandlerContext,Objectobject)throwsException{donothingnow}}启动代码MqProducermqProducernewMqProducer();mqProducer。start(); 启动日志:〔DEBUG〕〔2022042119:17:11。960〕〔main〕〔c。g。h。l。i。c。LogFactory。setImplementation〕Logginginitializedusingclasscom。github。houbb。log。integration。adaptors。stdout。StdOutExImpladapter。〔INFO〕〔2022042119:17:11。974〕〔Thread0〕〔c。g。h。m。p。c。MqProducer。run〕MQ生产者开始启动客户端GROUP:PDEFAULTGROUPNAME,PORT:9527,brokerAddress:四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerchannelRegistered信息:〔id:0x5cb48145〕REGISTERED四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerconnect信息:〔id:0x5cb48145〕CONNECT:localhost127。0。0。1:9527四月21,20227:17:13下午io。netty。handler。logging。LoggingHandlerchannelActive信息:〔id:0x5cb48145,L:127。0。0。1:57740R:localhost127。0。0。1:9527〕ACTIVE〔INFO〕〔2022042119:17:13。833〕〔Thread0〕〔c。g。h。m。p。c。MqProducer。run〕MQ生产者启动客户端完成,监听端口:9527小结 基于netty最基本的服务端启动、客户端启动到这里就结束了。 千里之行,始于足下。 我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。 希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。 我是老马,期待与你的下次重逢。开源地址 Themessagequeueinjava。(java简易版本mq实现):https:github。comhoubbmq拓展阅读 rpc从零开始实现rpc:https:github。comhoubbrpc