RabbitMQ的消息确认机制(含代码实现)
哈喽大家好,我是阿Q!
上文,我们已经完成了SpringBoot快速集成RabbitMQ的小Demo,本文我们来聊一下RabbitMQ为了防止消息丢失,增加的消息确认机制:生产者消息确认机制和消费者消息确认机制。确认机制
一、生产者消息确认机制在yml中增加配置信息spring: rabbitmq: #确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true
spring.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果增加回调@Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("ReturnCallback: "+"消息:"+returned.getMessage()); System.out.println("ReturnCallback: "+"回应码:"+returned.getReplyCode()); System.out.println("ReturnCallback: "+"回应信息:"+returned.getReplyText()); System.out.println("ReturnCallback: "+"交换机:"+returned.getExchange()); System.out.println("ReturnCallback: "+"路由键:"+returned.getRoutingKey()); } }); return rabbitTemplate; }confirm机制是只保证消息到达exchange,并不保证消息可以路由到正确的queue当前的exchange不存在或者指定的路由key路由不到才会触发return机制
大家可以自行演示以下情况的执行结果:不存在交换机和队列存在交换机,不存在队列消息推送成功
二、消费者消息的确认机制
默认情况下如果一个消息被消费者正确接收则会从队列中移除。如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存,当有消费者订阅时则会立即发送,进而从队列中移除。
消费者消息的确认机制可以分为以下3种:自动确认
AcknowledgeMode.NONE 默认为自动确认,不管消费者是否成功处理了消息,消息都会从队列中被移除。根据情况确认
AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入队列)如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认当抛出AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且消息不会重回队列当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认其他的异常,则消息会被拒绝,并且该消息会重回队列,如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的。可以通过 setDefaultRequeueRejected(默认是true)去设置
可能造成消息丢失,一般是需要我们在try-catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。手动确认
AcknowledgeMode.MANUAL对于手动确认,也是我们工作中最常用到的,它的用法如下:/* * 肯定确认 * deliveryTag:消息队列数据的唯一id * multiple:是否批量 * true :一次性确认所有小于等于deliveryTag的消息 * false:对当前消息进行确认; */ channel.basicAck(long deliveryTag, boolean multiple); /* * 否定确认 * multiple:是否批量 * true:一次性拒绝所有小于deliveryTag的消息 * false:对当前消息进行确认; * requeue:被拒绝的是否重新入列, * true:就是将数据重新丢回队列里,那么下次还会消费这消息; * false:就是拒绝处理该消息,服务器把该消息丢掉即可。 */ channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);/* * 用于否定确认,但与basicNack相比有一个限制,一次只能拒绝单条消息 */ channel.basicReject(long deliveryTag, boolean requeue); 手动确认
在yml配置中开启手动确认模式spring: rabbitmq: listener: simple: acknowledge-mode: manual
或者在代码中开启@Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MQReciever mqReciever;//消息接收处理类 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //并发使用者的数量 container.setConcurrentConsumers(1); //消费者人数上限 container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 //设置一个队列,此处支持设置多个 container.setQueueNames("directQueue"); container.setMessageListener(mqReciever); return container; } }
消息消费类@Component @RabbitListener(queues = "directQueue")//监听队列名称 public class MQReciever implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { String msg = message.toString(); String[] msgArray = msg.split(""");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据 System.out.println("消费的消息内容:"+msgArray[1]); System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue()); //业务处理 ...... channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒绝重新入队列 channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }
无ack:效率高,存在丢失大量消息的风险;有ack:效率低,不会丢消息。
作者:阿Q说代码
链接:https://juejin.cn/post/7063259228898590750
以浪漫之名谈谈圣托里尼装修风格的WiFi覆盖有人说,爱她就一定要带她去圣托里尼位于希腊的旅游圣地月牙形环岛山崖上的蓝白色小屋和蔚蓝的爱琴海遥相辉映圣托里尼不只是诗和远方这里是失落的亚特兰蒂斯有古希腊神话的浪漫唯美傍晚更能看到
当MSI冠军属于RNG樱木花道和流川枫从未配合过一次。直到他们看到自己有希望赢山王重工的一瞬间,两个人放下了隔阂,随后樱木就对山王工业打出了绝杀。希望是最好的东西。说起来,你们可能都不信。上一次,各大赛
捷豹XFL只聊三大件,技术实力到底如何?捷豹汽车,在国内征战时间不算短了,作为豪华品牌,销量上却一直起不来。家族中大型轿车捷豹XFL很多人并不陌生,目前在终端市场有不错的优惠,外观颜值很多人都有所了解,今天主要从技术的角
法拉利296GTB采用插电式混动方案,性能表现如何?说到性能跑车,我们会想到法拉利,在今年10月份,家族推出了一款新的GT跑车,名字叫做法拉利296GTB。从售价上来看,新款车型指导价在298。8万,搭配3。0T双涡轮发动机,并且是
微信8。0。14内测更新老年模式上线时隔多日,微信终于迎来新一轮内测更新。昨天,微信发布了安卓8。0。14内测版本,本次内测又带来了哪些新功能呢?早前,微信被工信部要求进行适老化改造。在本次内测版本中,微信为此推出了
Linux实战020Ubuntu截图工具及快捷键设置在写文章的时候为了更好地阐述内容我们通常需要附上图片,图片可以更加简单直观的作者想要传递的内容。截图也是获取图片的一种方式,我们可以在系统中获取桌面上显示的全屏所有当前窗口和任意区
Linux实战019解决Ubuntu更新NOPUBKEYNOPUBKEY在虚拟机体验了几天Ubuntu感觉还不错,今天直接在笔记本上安装上了。由于Ubuntu官方的软件源着实是太慢了,所以我第一件事情就是将软件源换成国内的镜像源(具体操
Linux实战023Ubuntu解决Wine乱码问题刚装上了Wine6。0稳定版,终于可以在Ubuntu上面运行Windows软件了(O()O)。但是遇到的第一个问题就是通过Wine打开的Windows软件中文都乱码了,这样啥也看不
Linux实战024快速为Firefox添加百度搜索引擎搜索功能是我们日常用的比较多的功能,ubuntu为用户默认安装了Firefox浏览器。该浏览器默认的搜索引擎是Google,如果你在地址栏输入的不是地址而是内容那么浏览器就会自动启
盲盒之外的泡泡玛特8月27日,泡泡玛特发布2021年半年报,财报显示,泡泡玛特收入达到17。73亿元,同比增长116。8,经调整后净利润达到人民币4。35亿元,同比增长144。1,增速明显高于营收。
泛生子Q2财报创历史新高,肿瘤精准医疗企业如何脱颖而出发展多年的基因检测,依旧是资本眼中的宠儿。根据动脉网数据,上半年国内基因检测领域一级市场融资额超70亿元。这其中,超过半数融资额由肿瘤精准医疗行业公司完成。不得不说,肿瘤精准医疗赛