范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

MQ消费失败,自动重试思路,如何保证MQ消息正常被业务消费

  MQ消费失败,自动重试思路
  在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。
  MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。
  我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+.NET CORE 3.1
  设计思路为:
  内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。
  那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。
  下面结合代码讲下具体实现:
  我们先看看业务队列发送消息时,如何定义IBasicProperties properties = channel.CreateBasicProperties();                 properties.Persistent = true;                 //初始化,需要内置一些消费异常,自动重试参数                  if (headers == null)                 {                     headers = new Dictionary();                 }                 //ttlSecond 有值表示消息将投递到延迟队列                 //因为可以自建延迟队列,ttlSecond是业务标识                  if (ttlSecond.HasValue)                 {                     if (!headers.ContainsKey("x-delay"))                     {                         headers.Add("x-delay", ttlSecond * 1000);                     }                     else                     {                         headers["x-delay"] = ttlSecond * 1000;                     }                     //queueType = 1表示延迟队列                      //框架内部重试机制需要此参数,因为重新投递到原始队列时,需要区分普通队列还是延迟队列                     if (!headers.ContainsKey("queueType"))                     {                         headers.Add("queueType", 1);                     }                 }                 else                 {                     //queueType = 0表示普通队列                     if (!headers.ContainsKey("queueType"))                     {                         headers.Add("queueType", 0);                     }                 }                 //重试次数                 if (!headers.ContainsKey("retryCount"))                 {                     headers.Add("retryCount", 0);                 }                 //原始交换机名称                 if (!headers.ContainsKey("retryExchangeName"))                 {                     headers.Add("retryExchangeName", exchangeName);                 }                 //原始路由键                 if (!headers.ContainsKey("retryRoutingKey"))                 {                     headers.Add("retryRoutingKey", routingKey);                 }                 properties.Headers = headers;                 channel.BasicPublish(exchangeName, routingKey, properties, Encoding.UTF8.GetBytes(message));
  这里会内置上面描述的重试队列需要的参数
  再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)       //每次消费一条             channel.BasicQos(0, 1, false);              //定义消费者             EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);             eventingBasicConsumer.Received += async (sender, basicConsumer) =>             {                 string body = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());                 Deadletter deadletter = null;                 try                 {                     string errorMsg = await action(body);                     if (!errorMsg.IsNullOrWhiteSpace())                     {                         deadletter = new Deadletter() { Body = body, ErrorMsg = errorMsg };                         _logger.LogError(#34;业务队列消费异常(已知),消息头:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}{Environment.NewLine}错误:{errorMsg}");                     }                 }                 catch (Exception ex)                 {                     deadletter = new Deadletter() { Body = body, ErrorMsg = ex.Message };                     _logger.LogError(ex, #34;业务队列消费异常(未知),消息头:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}");                 }                 //必定应答,不管消费成功还是失败                 channel.BasicAck(basicConsumer.DeliveryTag, false);                 //消费失败,投递消息至重试队列                 if (deadletter != null)                 {                     PublishRetry(deadletter, basicConsumer.BasicProperties.Headers);                 }             };
  我们再看看PublishRetry重试队列的推送方法如何实现IBasicProperties properties = channel.CreateBasicProperties();                 properties.Persistent = true;                 //x-delay为延迟队列的延迟时间                 //如果第一次进行重试,请求头中是不存在延迟时间的,需要新增                 //因为可以进行多次重试,所以第二次时,就会存在延迟时间                 //但因为可以自建用于业务的延迟队列,所以自建的延迟队列,第一次重试也会存在x-delay,但是如果自建的延迟队列失败进行重试时,不能还使用自身的延迟时间,所以需要重新设置为系统默认的失败重试时间                 if (!headers.ContainsKey("x-delay"))                 {                     headers.Add("x-delay", 0);                 }                  //重试次数                 int retryCount = Convert.ToInt32(headers["retryCount"]);                 //可以根据重试次数,实现上面说描述的微信回调的重试时间变长效果                 headers["x-delay"] = retryCount * 1000;                 properties.Headers = headers;                 channel.BasicPublish(RETRY_EXCHANGE_NAME, string.Empty, properties, Encoding.UTF8.GetBytes(JsonUtils.Serialize(deadletter)));
  重试队列的消费者实现channel.BasicQos(0, 1, false);              EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);             eventingBasicConsumer.Received += async (sender, basicConsumer) =>             {                 string message = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());                 Deadletter deadletter = JsonUtils.Deserialize(message);                  IDictionary headers = basicConsumer.BasicProperties.Headers;                 //请求头中肯定会有如下参数,因为在框架代码中已经内置                 //重试次数                 int retryCount = Convert.ToInt32(headers["retryCount"]);                 //原队列类型,如果原队列本身为延迟队列,重试投递的时候,必须也要为延迟队列,只是不需要延迟时间,投递回原队列后,会立马重新消费                 int queueType = Convert.ToInt32(headers["queueType"]);                 //原队列名称                 string retryExchangeName = Encoding.UTF8.GetString((byte[])headers["retryExchangeName"]);                 //原路由键                 string retryRoutingKey = Encoding.UTF8.GetString((byte[])headers["retryRoutingKey"]);                 if (retryCount <= 10)                 {                     headers["retryCount"] = retryCount + 1;                     //原有队列为普通队列,重新投递时,也需要投递为普通队列类型                     if (queueType == 0)                     {                         PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers);                     }                     //原有队列为延迟队列,重新投递时,也需要投递为延迟队列类型                     else                     {                         PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers, 0);                     }                 }                 //超过重试最大次数不再处理,交由外部委托来处理死信                 else                 {                     await deadLetterTask(retryExchangeName, deadletter.Body, deadletter.ErrorMsg);                 }                 //应答                 channel.BasicAck(basicConsumer.DeliveryTag, false);             };             //开启监听             channel.BasicConsume(RETRY_QUEUE_NAME, false, eventingBasicConsumer);
  然后在系统中,内置重试队列消费者//注册框架内自动重试             _rabbitMQClient.SubscribeRetry(async (exchangeName, message, errorMsg) =>             {                 string content = #34;原始交换机名称:{exchangeName}{Environment.NewLine}" +                              #34;原始消息内容:{message}{Environment.NewLine}" +                              #34;错误消息:{errorMsg}";                  await PushWeChatMessage(content);             });
  上述为我们MQ实现自动重试的一种方案,当然中间包括每次如果消费失败都可以发送通知,来通知业务人员关注消费失败的情况。可以自定义最大重试次数、重试间隔时间、死信的处理,这里仅仅是MQ重试机制的一种思路而已,大家如果有更好的方案,欢迎多多沟通。
  原文链接:https://www.cnblogs.com/jiangbiao/p/15748007.html

俄罗斯研发猴痘病毒检测盒,已完成实验室测试据新华社5月21日电,俄罗斯矢量病毒学与生物技术国家科学中心(下称矢量中心)科研人员研制出了一种自动操作的检测试剂盒(下称检测盒),可用于检测包括猴痘病毒在内的正痘病毒。针对该检测俄罗斯研发猴痘病毒检测盒,已完成实验室测试俄罗斯矢量病毒学与生物技术国家科学中心(下称矢量中心)科研人员研制出了一种自动操作的检测试剂盒(下称检测盒),可用于检测包括猴痘病毒在内的正痘病毒。针对该检测盒的实验室测试已顺利完在气候变化的推动下,预计到2070年至少会发生15000次新的跨物种病毒传播作者融媒体记者王璐新型冠状病毒肺炎大流行已经肆虐世界近2年,但其新变种仍在不断出现。作为最新变体,Omicron已蔓延六大洲,导致确诊病例迅速增加,并超过德尔塔成为许多国家的主要变2022年一季度广东地区ICT项目中标情况分析赛立信通信研究部作者谢剑超来源通信竞争2022年第2期本文重点研究2022年一季度广东地区在ICT项目中标情况,包括中标数量中标金额中标行业类别中标项目类型等,并从中分析出广东地区加拿大禁止华为中兴参与5G网络建设外交部将采取一切必要手段维护中企正当权益来源环球时报环球网环球时报环球网报道记者白云怡综合加拿大电视台(CTV)等多家加媒最新消息,加拿大政府当地时间19日以国家安全为由,禁止华为和中兴参与加拿大的5G网络建设。对此,中加拿大国家禁止华为参与5g建设,理由国家安全根据加拿大电视台的最新消息,又有一个国家以国家安全为由,禁止了华为和中兴参与5g的网络建设!华为又一次被别人拒绝门外,又一次地证明自己的强大让对手害怕,害怕的自己赶不上你的脚步,因性能测试工具wrk的安装与使用前言想和大家来聊聊性能测试,聊到了性能测试必须要说的是性能测试中的工具,在这些工具中我今天主要给大家介绍wrk。介绍wrk是一款开源的性能测试工具,简单易用,没有LoadRunne针对特定人种的基因武器存在吗?伪科学概念,无实用性昨天晚上,有一位朋友询问我针对特定人种的基因武器存在吗?我告诉他主流科学界早就定论,针对特定人种的基因武器是伪科学概念。科学理论上来看,就是吃力不讨好,研究出来没有意义。即使搞出来假美国货,真中国产,凯托V1多功能应急收音机评测一译者前言近些年,大家经常能听到一个名词叫美国产业空心化,大体上指的是美国的产业结构出现了问题,第二产业急剧衰退,第三产业占比过大。用更通俗的话讲,长期以来,美国人把主要精力放在了2022年618预算7000元以内,买什么大屏手机送父亲好?如果大家的预算在7000元以内,想要购买一台比较好的大屏手机,小芳建议大家还是从华为Mate40ProvivoXNote和荣耀Magic4Pro这三款手机中选择一款手机进行购买会比如果你的手机出现这些情况,可能是被监听了要知道每个人的手机里都藏有或多或少不想给任何人知道的秘密,浏览记录聊天记录不想给别人知道的APP任何一个秘密拎出来可能都会导致大型社死,此外,更为重要的是,你的手机里还装有你全部的
说说你是什么原因关注了今日头条呢?说说你是什么原因关注了今日头条呢?答因为今日头条含概很多内容,有很多领域如关注推荐热榜本地区问答娱乐国际大小视频军事体育科技健康。那是在2015年,我生日那天,女儿给我买了个生日礼骁龙870独显芯片,2699元起售的它,值不值得买?回答之前先小小的纠正一下,这款870独显芯片的起售价是2499,并不是2699,有两百的差距。言归正传,不用说都知道这手机说得是iQOONeo5,毕竟这独显芯片几乎是独一份的存在的给400万快递小哥涨一毛钱,谁来买单?文王亚琪编辑斯问9月1日开始,中通圆通申通百世汇通韵达四通一达加上极兔,6家快递公司的快递派费一起上涨,每票上调0。1元,用以补贴快递员的收入。派费是快递小哥在基本工资以外,按件计中企收购英国最大芯片厂,芯片研究更上一层楼芯片在各个领域都是非常重要的组成部分,尤其是在电子行业和汽车制造业,而我国因为发展芯片技术较晚,市场受到西方的限制,而近几年,我国崛起手机品牌开始自己研究芯片,打破这种制衡,不久前看完这一篇学会MyBatis就够了前言MyBatis可能很多人都一直在用,但是MyBatis的SQL执行流程可能并不是所有人都清楚了,那么既然进来了,通读本文你将收获如下1Mapper接口和映射文件是如何进行绑定的人机CP完美结合达达携手京东物流为山姆提供无人配送服务文福布斯中国9月1日,达达快送为山姆会员商店部分极速达订单提供的无人配送服务投入运营,京东物流无人车通过接入达达快送开放平台,实现高效配送。用户通过京东到家山姆APP等平台下单,可中国大陆平板市场大洗牌,华为第二,苹果第一,第四暴涨299作为数码产品中的一员,平板电脑的存在感自然是不如手机了,但是由于全球疫情的影响,现在越来越多的消费者都会给自己的小孩买一台平板电脑,方便和老师进行远程网络教育,购买平板电脑的时候,用户好评不断,鸿蒙OS升级用户创新纪录,华为软件实力被忽视没有硬件就要放弃?或许对其它企业来说,这很艰难,但对华为来说,即便是硬件被限制,它仍然还有很多条路可以走。华为鸿蒙OS,发展比预期还要顺利。三百六十行,行行出状元。以通讯业务为主的旗下第三方店铺涉嫌售卖他人论文且泄露个人信息百度文库回应来源北京青年报旗下第三方店铺涉嫌售卖他人论文且泄露个人信息百度文库回应永久封禁涉事店铺近日有媒体报道称,百度文库一店铺学姐帮帮忙涉嫌售卖他人论文,且泄露了论文作者的姓名学号等隐私信国产芯片迎来春天,国产工业软件能否也扛起自主大旗?文章来源百家号,作者万能的大熊国产芯片行业可以说是现在最风口浪尖的领域,每天都能看到国产芯片实现各种巨大突破的新闻,但最后往往又有些不符国情的夸张成分。不过很多业内人士认为,国产芯网络电视机顶盒都有哪些,观看以下内容教你如何选择电视盒子网络机顶盒已经发展了很多年了,现在市场上有些产品举着高质量高标准网络电视机顶盒的名义贩卖二手料电视机顶盒产品,用夸大其词的广告语引起人们的注意,比如4G32G的配置只要99,这种明