范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

没用过消息队列?一文带你体验RabbitMQ收发消息

  楔子
  先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理。
  今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想要梳理一遍还是需要很多时间,所以我打算慢慢写,先把MQ给写了,再慢慢写其他相关的,毕竟偏理论的东西一遍要比较难写,像MQ这种偏实战的大家可以clone代码去玩一玩,还是比较方便的。
  同时MQ也是Java进阶不必可少的技术栈之一,所以Java开发从业者对它是必须要了解的。
  现在市面上有三种消息队列比较火分别是:RabbitMQ,RocketMQ和Kafka。
  今天要讲的消息队列中我会以RabbitMQ作为案例来入门,因为SpringBoot的amqp中默认只集成了RabbitMQ,用它来讲会方便许多,且RabbitMQ的性能和稳定性都很不错,是一款经过时间考验的开源组件。
  祝有好收获。1. 消息队列?
  消息队列(MQ)全称为Message Queue,是一种应用程序对应用程序的通信方法。
  翻译一下就是:在应用之间放一个消息组件,然后应用双方通过这个消息组件进行通信。
  好端端的为啥要在中间放个组件呢?
  小系统其实是用不到消息队列的,一般分布式系统才会引入消息队列,因为分布式系统需要抗住高并发,需要多系统解耦,更需要对用户比较友好的响应速度,而消息队列的特性可以天然解耦,方便异步更能起到一个顶住高并发的削峰作用,完美解决上面的三个问题。
  然万物抱阳负阴,系统之间突然加了个中间件,提高系统复杂度的同时也增加了很多问题:消息丢失怎么办?消息重复消费怎么办?某些任务需要消息的顺序消息,顺序消费怎么保证?消息队列组件的可用性如何保证?
  这些都是使用消息队列过程中需要思考需要考虑的地方,消息队列能给你带来很大的便利,也能给你带来一些对应的麻烦。
  上面说了消息队列带来的好处以及问题,而这些不在我们今天这篇的讨论范围之内,我打算之后再写这些,我们今天要做的是搭建出一个消息队列环境,让大家感受一下基础的发消息与消费消息,更高级的问题会放在以后讨论。2. RabbitMQ一览
  RabbitMQ是一个消息组件,是一个erlang开发的AMQP(Advanced Message Queue)的开源实现。
  AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
  RabbitMQ采用了AMQP协议,至于这协议怎么怎么样,我们关心的是RabbitMQ结构如何且怎么用。
  还是那句话,学东西需要先观其大貌,我们要用RabbitMQ首先要知道它整体是怎么样,这样才有利于我们接下来的学习。
  我们先来看看我刚画的架构图,因为RabbitMQ实现了AMQP协议,所以这些概念也是AMQP中共有的。
  Broker: 中间件本身。接收和分发消息的应用,这里指的就是RabbitMQ Server。Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。Channel: 渠道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销会比较大且效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。Exchange: 路由。根据分发规则,匹配查询表中的routing key,分发消息到queue中去。Queue: 消息的队列。消息最终被送到这里等待消费,一个message可以被同时拷贝到多个queue中。Binding: 绑定。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
  看完了这些概念,我再给大家梳理一遍其流程:
  当我们的生产者端往Broker(RabbitMQ)中发送了一条消息,Broker会根据其消息的标识送往不同的Virtual host,然后Exchange会根据消息的路由key和交换器类型将消息分发到自己所属的Queue中去。
  然后消费者端会通过Connection中的Channel获取刚刚推送的消息,拉取消息进行消费。
  Tip:某个Exchange有哪些属于自己的Queue,是由Binding绑定关系决定的。3. RabbitMQ环境
  上面讲了RabbitMQ大概的结构图和一个消息的运行流程,讲完了理论,这里我们就准备实操一下吧,先进行RabbitMQ安装。
  官网下载地址:www.rabbitmq.com/download.ht…
  由于我还没有属于自己MAC电脑,所以这里的演示就按照Windows的来了,不过大家都是程序员,安装个东西总归是难不倒大家的吧
  Windows下载地址:www.rabbitmq.com/install-win…
  进去之后可以直接找到Direct Downloads,下载相关EXE程序进行安装就可以了。
  由于RabbitMQ是由erlang语言编写的,所以安装之前我们还需要安装erlang环境,你下载RabbitMQ之后直接点击安装,如果没有相关环境,安装程序会提示你,然后会让你的浏览器打开erlang的下载页面,在这个页面上根据自己的系统类型点击下载安装即可,安装完毕后再去安装RabbitMQ。
  这两者的安装都只需要一直NEXT下一步就可以了。
  安装完成之后可以按一下Windows键看到效果如下:
  Tip:其中Rabbit-Command后面会用到,是RabbitMQ的命令行操作台。
  安装完RabbitMQ我们需要对我们的开发环境也导入RabbitMQ相关的JAR包。
  为了方便起见,我们可以直接使用Spring-boot-start的方式导入,这里面也会包含所有我们需要用到的RabbitMQ相关的JAR包。                      org.springframework.boot             spring-boot-starter-amqp          
  直接引入spring-boot-starter-amqp即可。4. ✍Hello World
  搭建好环境之后,我们就可以上手了。
  考虑到这是一个入门文章,读者很多可能没有接触过RabbitMQ,直接使用自动配置的方式可能会令大家很迷惑,因为自动配置会屏蔽很多细节,导致大家只看到了被封装后的样子,不利于大家理解。
  所以在本节Hello World这里,我会直接使用最原始的连接方式就行演示,让大家看到最原始的连接的样子。
  Tip:这种方式演示的代码我都在放在prototype包下面。4.1 生产者
  先来看看生产者代码,也就是我们push消息的代码:    public static final String QUEUE_NAME = "erduo";      // 创建连接工厂     ConnectionFactory connectionFactory = new ConnectionFactory();      // 连接到本地server     connectionFactory.setHost("127.0.0.1");      // 通过连接工厂创建连接     Connection connection = connectionFactory.newConnection();      // 通过连接创建通道     Channel channel = connection.createChannel();      // 创建一个名为耳朵的队列,该队列非持久(RabbitMQ重启后会消失)、非独占(非仅用于此链接)、非自动删除(服务器将不再使用的队列删除)     channel.queueDeclare(QUEUE_NAME, false, false, false, null);      String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();     // 发布消息     // 四个参数为:指定路由器,指定key,指定参数,和二进制数据内容     channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));      System.out.println("生产者发送消息结束,发送内容为:" + msg);     channel.close();     connection.close();
  代码我都给了注释,但是我还是要给大家讲解一遍,梳理一下。
  先通过RabbitMQ中的ConnectionFactory配置一下将要连接的server-host,然后创建一个新连接,再通过此连接创建通道(Channel),通过这个通道创建队列和发送消息。
  这里看上去还是很好理解的,我需要把创建队列和发送消息这里再拎出来说一下。
  创建队列    AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;
  创建队列的方法里面有五个参数,第一个是参数是队列的名称,往后的三个参数代表不同的配置,最后一个参数是额外参数。durable:代表是否将此队列持久化。exclusive:代表是否独占,如果设置为独占队列则此队列仅对首次声明它的连接可见,并在连接断开时自动删除。autoDelete:代表断开连接后是否自动删除此队列。arguments:代表其他额外参数。
  这些参数中durable经常会用到,它代表了我们可以对队列做持久化,以保证RabbitMQ宕机恢复后此队列也可以自行恢复。
  发送消息    void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
  发送消息的方法里是四个参数,第一个是必须的指定exchange,上面的示例代码中我们传入了一个空字符串,这代表我们交由默认的匿名exchange去帮我们路由消息。
  第二个参数是路由key,exchange会根据此key对消息进行路由转发,第三个参数是额外参数,讲消息持久化时会用到一下,最后一个参数就是我们要发送的数据了,需要将我们的数据转成字节数组的方式传入。
  测试
  讲完了这些API之后,我们可以测试一下我们的代码了,run一下之后,会在控制台打出如下:
  这样之后我们就把消息发送到了RabbitMQ中去,此时可以打开RabbitMQ控制台(前文安装时提到过)去使用命令rabbitmqctl.bat list_queues去查看消息队列现在的情况:
  可以看到有一条message在里面,这就代表我们的消息已经发送成功了,接下来我们可以编写一个消费者对里面的message进行消费了。4.2 消费者
  消费者代码和生产者的差不多,都需要建立连接建立通道:    // 创建连接工厂     ConnectionFactory connectionFactory = new ConnectionFactory();      // 连接到本地server     connectionFactory.setHost("127.0.0.1");      // 通过连接工厂创建连接     Connection connection = connectionFactory.newConnection();      // 通过连接创建通道     Channel channel = connection.createChannel();      // 创建消费者,阻塞接收消息     com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {         @Override         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {             System.out.println("-------------------------------------------");             System.out.println("consumerTag : " + consumerTag);             System.out.println("exchangeName : " + envelope.getExchange());             System.out.println("routingKey : " + envelope.getRoutingKey());             String msg = new String(body, StandardCharsets.UTF_8);             System.out.println("消息内容 : " + msg);         }     };      // 启动消费者消费指定队列     channel.basicConsume(Producer.QUEUE_NAME, consumer); //        channel.close(); //        connection.close();
  建立完通道之后,我们需要创建一个消费者对象,然后用这个消费者对象去消费指定队列中的消息。
  这个示例中我们就是新建了一个consumer,然后用它去消费队列-erduo中的消息。
  最后两句代码我给注释掉了,因为一旦把连接也关闭了,那我们的消费者就不能保持消费状态了,所以要开着连接,监听此队列。
  ok,运行这段程序,然后我们的消费者会去队列-erduo拿到里面的消息,效果如下:
  consumerTag:是这个消息的标识。exchangeName:是这个消息所发送exchange的名字,我们先前传入的是空字符串,所以这里也是空字符串。exchangeName:是这个消息所发送路由key。
  这样我们的程序就处在一个监听的状态下,你再次调用生产者发送消息消费者就会实时的在控制上打印消息内容。5. 消息接收确认(ACK)
  上面我们演示了生产者和消费者,我们生产者发送一条消息,消费者消费一条信息,这个时候我们的RabbitMQ应该有多少消息?
  理论上来说发送一条,消费一条,现在里面应该是0才对,但是现在的情况并不是:
  消息队列里面还是有1条信息,我们重启一下消费者,又打印了一遍我们消费过的那条消息,通过消息上面的时间我们可以看出来还是当时我们发送的那条信息,也就是说我们消费者消费过了之后这条信息并没有被删除。
  这种状况出现的原因是因为RabbitMQ消息接收确认机制,也就是说一条信息被消费者接收到了之后,需要进行一次确认操作,这条消息才会被删除。
  RabbitMQ中默认消费确认是手动的,也可以将其设置为自动删除,自动删除模式消费者接收到消息之后就会自动删除这条消息,如果消息处理过程中发生了异常,这条消息就等于没被处理完但是也被删除掉了,所以这里我们会一直使用手动确认模式。
  消息接受确认(ACK)的代码很简单,只要在原来消费者的代码里加上一句就可以了:    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {         @Override         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {             System.out.println("-------------------------------------------");             System.out.println("consumerTag : " + consumerTag);             System.out.println("exchangeName : " + envelope.getExchange());             System.out.println("routingKey : " + envelope.getRoutingKey());             String msg = new String(body, StandardCharsets.UTF_8);             System.out.println("消息内容 : " + msg);              // 消息确认             channel.basicAck(envelope.getDeliveryTag(), false);             System.out.println("消息已确认");         }     };
  我们将代码改成如此之后,可以再run一次消费者,可以看看效果:
  再来看看RabbitMQ中的队列情况:
  从图中我们可以看出消息消费后已经成功被删除了,其实大胆猜一猜,自动删除应该是在我们的代码还没执行之前就帮我们返回了确认,所以这就导致了消息丢失的可能性。
  我们采用手动确认的方式之后,可以先将逻辑处理完毕之后(可能出现异常的地方可以try-catch起来),把手动确认的代码放到最后一行,这样如果出现异常情况导致这条消息没有被确认,那么这条消息会在之后被重新消费一遍。后记
  今天的内容就到这里,下一篇将会我们将会撇弃传统的手动建立连接的方式进行发消息收消息,而转用Spring帮我们定义好的注解和Spring提供的RabbitTemplate,更方便的收发消息。
  消息队列呢,其实用法都是一样的,只是各个开源消息队列的侧重点稍有不同,我们应该根据我们自己的项目需求来决定我们应该选取什么样的消息队列来为我们的项目服务,这个项目选型的工作一般都是开发组长帮你们做了,一般是轮不到我们来做的,但是面试的时候可能会考察相关知识,所以这几种消息队列我们都应该有所涉猎。
  好了,以上就是本期的全部内容,感谢你能看到这里,欢迎对本文点赞收藏与评论,你们的每个点赞都是我创作的最大动力。
  我是耳朵,一个一直想做知识输出的伪文艺程序员,我们下期见。
  作者:和耳朵
  链接:https://juejin.im/post/6856571028496351239

恐吓中国留学生?美国这回惹下大祸,24小时内,中方火速行动美国政府在最近几年时间里频繁针对中国采取各方面的行动,为了能够对中国进行限制和打压,美国采取的行动包括经济外交等诸多领域,这样的情况也给我国利益带来了相当巨大的影响,同时也让两国关重磅调整!电价迎来大改革!10月15日起实施,国人都请注意了前段时间,拉闸限电成为国人热议的话题。广东东北等多地在9月份陆续出台一些限电措施,多家公司主动对外透露,接下来将陆续削减水泥等各种产品的产量。事实上,之所以企业会选择在需求高居不下下一个立陶宛?捷克反华激进派接掌政权,竟提出废除一中政策捷克议会众议院选举已于上月结束,根据投票结果来看,捷克总理巴比什所在的执政党遭遇惨败,巴比什所在政党的得票率为27。12,以微弱的优势失去第一大党的地位。最终由3个反对党组成的在一反转来了!我将放弃党主席,拉舍特终于摊牌,默克尔绝没料到9月26日,德国大选结果正式出炉,社民党以微弱的优势获得领先,而稳坐执政党位子多年的联盟党遭遇有史以来最低的得票率。当前,社民党正在组织组阁谈判,而基民盟内部极有可能会迎来重大变化2912!法国提交一项决议草案在联合国通过,中日印联手投反对票10月9日,马克龙发表演讲指出,接下来法国将会联合其他欧盟成员国,继续推动全球废除死刑。马克龙向世界各国发出呼吁称,保留死刑是在拒绝人权,各国应该做出废除死刑的决定。接下来,法国还真相果然没那么简单!美核潜艇被撞第9天,机密泄露响彻全美近段时间,一则美军核潜艇在南海被撞的消息引起国际社会热议。当地时间10月7日,美军主动对外公布康涅狄格号在南海地区水下潜航时,同不明物体相撞,导致船员受伤。据悉,本次碰撞发生在10浪潮将继续深化云计算3。0战略,坚持技术创新核心理念近日,2020福布斯中国最具创新力企业榜TOP50公布,浪潮华为阿里云等各行业优秀创新企业榜上有名。该榜单对企业的评价维度包括创新能力创新转化企业治理企业成长性以及企业社会形象等,RedmiK30Pro太适合VLOG爱好者!双OIS光学防抖8K视频如今短视频内容崛起,很多人都成为VOLG爱好者。相信滑雪跑酷或滑板爱好者追星少女都遇到过这种问题照片视频拍下来高糊怎么办?还要再购买专业设备吗?最近Redmi要发布的开年旗舰就很适脱欧或已成定局,波兰全境一片大乱,我们不走响彻街头根据路透社等外国媒体报道,波兰同欧盟爆发法律争端之后,有许多民众担心波兰有可能会脱离欧盟,于是在波兰全国各地举行抗议活动,参加抗议活动的人数达到10万人以上,波兰全境一片混乱。参与浪潮信息携手12306布局智慧化出行平台随着大众需求提升,出行服务领域所需要的技术支持,也会不断提高。12306对电子客票刷脸进站系统扩容升级等的需求逐步提高,如何有效解决日常运维困难,提升工作效率,降低人员成本开支,成浪潮信息为企业级AI生产环境打造的AI推理服务平台近日,2020浪潮云数据中心合作伙伴大会(IPF2020)在线上举行,浪潮重磅发布全新AIStation人工智能推理服务平台,这是业界首款智算中心算力调度软件产品,是专为企业级AI
苹果最廉价5GiPhone即将试产外形变化不大,内存芯片升级对于苹果而言,在明年上半年之前,可能最重要的手机产品就要从iPhone13转变为新iPhoneSE了。目前苹果是全球第二手机厂商,如果想要夯实自己的地位,那么仅凭iPhone13可苹果华为都少不了,台积电的芯片代工业务有多重要?苹果华为高通联发科英伟达AMD他们有什么共同点?一全球知名的科技品牌二拥有自己的芯片产品三都由台积电代工生产芯片。在全球芯片需求暴涨导致缺芯问题短期内无法解决的今天,以我国台积电(特斯拉手机渲染图骁龙8Gen11亿四摄,埃隆马斯克出手是王炸iPhone深受消费者的欢迎,但并不代表苹果仅仅是手机制造商。苹果作为一个科技公司,在很多科技领域都有建树,当然提到苹果很多小伙伴都会联想到埃隆马斯克的特斯拉和可回收的火箭。作为高oppo最值得买的三款手机,最后一名有争议今天小编给大家分享三款opoo最值得买的手机,今年是2021年了这个高价低配的牌子显然已经不适合opoo了,那么今天就分享三款第一款oppoK9pro处理器联发科的旗舰处理器天玑112。15比特币ETHDOGEAXS行情分析01岛论大势尼日利亚政符部长呼吁监管加密或币,而不是取缔他们,尼日利亚是非洲国家里加密市场渗透率最高的,近几个月增长也是极其迅速,未来合规后,会有更多的滋金冲进来。美国田纳西州杰克12。15基金操作白酒医疗领跌,新能源持续走弱,继续观望为主今日大盘继续震荡盘整,低开低走,多空来回博弈,目前三大指数全部小幅回调。两市成交量截止目前已达9700多亿,相比上一个交易日基本持平。北向资金在今天继续净流入17个多亿,继续流入。报告称vivo在国内中高端手机市场取得进展中证网讯(记者张兴旺)日前,市场研究机构CounterpointResearch发布报告显示,过去一年,vivo在国内中高端手机市场取得了进展。vivo在500美元至599美元价格负债率高达80,海康威视子公司萤石网络冲击A股记者郑小琳编辑日前,海康威视(002415。SZ)子公司萤石网络科创板IPO正式获得上交所受理。根据招股说明书,萤石网络拟募资37。39亿元,用于萤石智能制造重庆基地项目(22。0贾跃亭留在国内的烂摊子,到底坑了多少业界大佬?虽然贾跃亭可能不是中国互联网历史上最耀眼的那个人,但他的所作所为直到今天也仍旧吸引着很多人的目光,毕竟很多人还在期待着他能够下周回国,然后把之前的那些烂摊子重新收拾好,再度让梦想窒三星同款技术!小米MIXFOLD2将采用柔性玻璃屏内屏不再是花瓶近日,根据外媒消息,三星将向小米的下一代MIXFOLD提供UTG(超薄柔性玻璃)折叠面板,该技术当前已经被应用在了三星GalaxyZFlip等产品上。UTG(超薄柔性玻璃)是三星开月活1。3亿,月交互次数达20亿,OPPO小布助手技术与应用探秘中国计算机界一年一度的顶级盛会CNCC2021(中国计算机大会)将于12月1618日在深圳拉开帷幕。InfoQ极客传媒已正式成为CNCC2021的战略合作媒体。作为合作的一部分,I