范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

MQ幂等去重有哪些通用的解决方案?

  MQ 幂等、去重 有哪些通用的解决方案?前言 简单的消息去重解决方案 并发重复消息 Exactly Once 基于关系数据库事务插入消息表 更复杂的业务场景 拆解消息执行过程 更通用的解决方案 更灵活的消息表存储媒介 源码:RocketMQDedupListener 这种实现是否一劳永逸? 本实现方式的价值? 一些其他的消息去重的建议 前言
  消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的"AT LEAST ONCE",即消息至少会被"成功消费一遍"。
  举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
  然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件"我已经消费成功了"的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
  这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。
  基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。 简单的消息去重解决方案
  例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存: insert into t_order values .....   update t_inv set count = count-1 where good_id = "good123";
  要实现消息的幂等,我们可能会采取这样的方案: select * from t_order where order_no = "order123"    if(order  != null) {     return ;//消息重复,直接返回  }
  这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。 并发重复消息
  假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),
  那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等) 并发去重的解决方案之一
  要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。 select * from t_order where order_no = "THIS_ORDER_NO" for update  //开启事务 if(order.status != null) {     return ;//消息重复,直接返回 }
  但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。
  当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。
  但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。 Exactly Once
  在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫"Exactly Once",即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:
  Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
  在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。
  但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。 基于关系数据库事务插入消息表
  假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态: update t_order set status = "SUCCESS" where order_no= "order123";
  要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。 开启事务 插入消息表(处理好主键冲突的问题) 更新订单表(原消费逻辑) 提交事务
  说明: 这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。
  事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:https://help.aliyun.com/document_detail/102777.html
  基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。
  但是这里有它的局限性 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。 数据库的数据必须是在一个库,跨库无法解决
  注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。 更复杂的业务场景
  如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。
  例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X): 检查库存(RPC) 锁库存(RPC) 开启事务,插入订单表(MySQL) 调用某些其他下游服务(RPC) 更新订单状态 commit 事务(MySQL)
  这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持"幂等"。
  再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。
  那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢? 拆解消息执行过程
  其中一个思路是把上面的几步,拆解成几个不同的子消息,例如: 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统 订单系统消费消息D:更新订单状态
  注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。
  可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。
  然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。 更通用的解决方案
  上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。
  如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。
  例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢? 基于消息幂等表的非事务方案
  以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)
  上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了: 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。 支持上游业务生产者重发的业务重复的消息幂等问题。
  关于第一个问题已经很明显已经解决了,在此就不讨论了。
  关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。
  关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。 此方案是否有消息丢失的风险?
  如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是 消费中  的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。
  有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于 消费中  超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:
  更灵活的消息表存储媒介
  我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处: 性能上损耗更低 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现
  当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。 源码:RocketMQDedupListener
  以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考https://github.com/Jaskey/RocketMQDedupListener ,
  以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单: //利用Redis做幂等表 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");         consumer.subscribe("TEST-TOPIC", "*");          String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名         StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程         DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);         DedupConcurrentListener messageListener = new SampleListener(dedupConfig);          consumer.registerMessageListener(messageListener);         consumer.start();
  以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个DedupConcurrentListener 示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。
  更多使用详情请参考Github上的说明。 这种实现是否一劳永逸?
  实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?
  很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X: 检查库存(RPC)
  锁库存(RPC)
  开启事务,插入订单表(MySQL)
  调用某些其他下游服务(RPC)
  更新订单状态
  commit 事务(MySQL)
  当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。 本实现方式的价值?
  那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:
  1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题
  2.各种上游生产者导致的业务级别消息重复问题
  3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑 一些其他的消息去重的建议
  也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。
  事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率: 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等) 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。

12种美味的淡水鱼,你都吃过吗?最后两种却不受待见冬日美食季给您带来最新的美食资讯,家庭最实用的烹饪技巧,关注辣爸食堂幸福每一天中国,地大物博物产丰富,江河水库海洋湖泊,生活着成千上万种鱼类。这些鱼类有野生有人工养殖,每日不断地丰刘涛玩转大衣造型,混搭卫衣牛仔裤,看似休闲随性却很时髦女人的穿衣风格都会随着年龄而改变,但不论选择哪种时尚风格搭配方式,都应该把得体放在第一位。只要注意到这一点,就算是休闲随性的混搭造型,也能让人看着很舒适充满时尚感。刘涛的呢大衣造型2022年年初爱用沐浴露清单!7款值得无限回购的沐浴露总结现在我对于沐浴露选购要求越来越高,不再是以前的追求洗干净就行,可能是随着生活质量和阅历提高慢慢发现一款好用沐浴露真的可以增添不少沐浴期间内心的愉悦感呢!有些使用感好留香效果在线的沐华为nova系列4G新机入网图公布6。78英寸LCD屏IT之家1月12日消息,一款型号为JLNAL00的华为4G手机此前入网工信部,拥有蓝色黑色白色三款配色,尺寸为164。6475。557。94mm,重量为191g,搭载鸿蒙Harmo小学生作文他的一生,悲伤的故事用错地方,令人捧腹大笑什么是好的学生作业现在的孩子一出生就接触很多外界信息,眼界和见识比前几代的孩子要远得多。所以他们在写作文时也不会像之前的我们,按部就班老老实实的写标准模板。他们的作文天马行空无拘无鱿鱼游戏弹珠游戏中,韩美丽钻了空子,老头本可以不被淘汰鱿鱼游戏中的成奇勋是个十分伪善的人,他本是个烂人,却因为参加了游戏,转变了性格,把自己当成无所不能的人,在所有人都在远离弱者的时候,只有成奇勋一个人,专门拉拢弱者,老头自然是他的囊北京这条老街火了,吸引大批游客到此观光,如今已成打卡圣地说起北京的街道,大家可能最熟悉的就是王府井街道了,王府井街从清朝开始,就是北京最为繁华的地段,虽然历经了百年的时间,王府井依旧人气不减,如今也是北京人气最高的地段。但是随着北京的发最终幻想系列名作TOP10评选!部部经典不容错过最终幻想(简称FF)是玩家们耳熟能详的RPG名作系列,通过拥有高水准画面音乐和故事情节等的系列作品获得了众多粉丝。近日日网投票评选出了最终幻想系列名作排行榜,让我们一起来看看TOP革命圣地,大别山深处的美丽湖泊,你喜欢这里吗?文森安徽是一个人杰地灵的地方,前些年这个在全国相对不知名的省份,近年来,随着不断地深化改革,安徽抓住了发展的机遇,融入进长三角,经济转型升级,开启了高速发展的时代,如今越来越多的人过了腊八就是年,三星品道家宴系列冰箱为你保留最地道的年味俗话说过了腊八就是年。腊八节一过,年味就越来越浓了。春节将至,忙碌了一整年的打工人们终于要迎来彻底的休闲和放松。无论时代如何变迁,在咱们中华儿女心中,一席由爸妈亲手操刀最懂自己味蕾美爆!东莞这些地方藏着绝美的落羽杉林又到冬日落羽杉最美的时候企石有哪里最佳观赏景点呢?今天,小编给大家推荐最佳观赏景区在东清湖湿地公园,落羽杉已变成橘红色,色彩斑斓,与蓝天相映成趣,恍如进入了油画世界。园内的白鹭巡游
如何绘制各类箱式图(箱线图)?山海路研因为热爱,所以想奔赴山海内容箱式图是什么?箱式图箱式图(boxplot)是使用5个统计量反映原始数据的分布特征,即数据分布中心位置分布偏度变异范围和离群值。箱式图的箱子两端明星写真朱珠针织衫抹胸居家纯美摄影,慵懒惬意韵味十足中国内地女明星朱珠一直以她精湛的演技和多变的形象给观众们留下深刻的印象。大秦赋中的赵姬,叛逆者中的蓝心洁,精英律师中的栗娜,还有乔家的儿女中的马素芹,都是朱珠成功塑造的荧幕形象。本藏不住了!央视和省台多个频道被可爱豹击!随着天气回暖,大批国家一级保护动物斑海豹从辽东湾洄游到长岛。这一盛景也被央视财经频道新闻频道少儿频道和山东卫视多个频道播发,接下来,就让我们一起来看看这群可爱的精灵宝贝们吧!长岛地博格巴复出,尤文图斯4比2逆转都灵文羊城晚报全媒体记者刘毅意甲第24轮在3月1日上演都灵德比,尤文图斯主场两次落后,最终仍以4比2逆转都灵,取得联赛四连胜。法国球星博格巴身披尤文10号战袍替补出场,这名问题球员时隔足坛劲爆一夜曼城32遭绝杀,尤文42逆转都灵北京时间3月1日凌晨,英格兰足总杯意甲联赛结束了多场关键对决,曼城在足总杯18决赛中30击败英冠布里斯托尔城晋级8强,尤文图斯在意甲第24轮德比战中42逆转都灵,罗马12客场输给了郑永年新型举国体制新在哪儿?2023年开年后,在中国经济从内到外都面临诸多挑战的背景下,从中央到地方都在全力以赴推动经济全面恢复,高质量发展是当下的高频词汇。广州粤港澳大湾区研究院理事长香港中文大学(深圳)前发运11358标准箱!天津新港海关推动中欧班列跑出加速度3月初的天津新港北铁路集装箱中心站,一派春天的繁忙景象,一个个满载食品电气产品等物资的集装箱正在排队,等待装上中欧班列出境。2023年是中国提出一带一路倡议十周年。天津新港海关立足保定莲池书院博物馆将开展书院文物我来讲学雷锋志愿服务活动3月5日,从保定市文化广电和旅游局获悉,3月15日至6月30日,保定市莲池书院博物馆将开展书院文物我来讲学雷锋志愿服务活动。活动现场据悉,活动为2023年保定市文化和旅游领域学雷锋哲蚌寺为何能后来居上,超越甘丹寺?1409年,由宗喀巴大师亲自筹建了一座寺庙甘丹寺,从此一个新的宗派成立了,它就是格鲁派,甘丹寺1416年宗喀巴大师授意他的一个弟子降央曲杰扎西班丹在拉萨西郊修建了一座寺庙哲蚌寺,很慕田峪长城巴士专线票价多少钱?老人小孩有优惠吗?一慕田峪长城交通方式去往慕田峪长城的交通方式有很多,可以自驾也可以选择坐公交车,但是坐公交车的话弊端也很显而易见,要提前候车就不说了,还要不停的转车换乘,真的很麻烦。所以这里长城旅故宫半日游提示首先穿着冬天早上故宫还是很冷的,尤其是走中轴线的话穿堂风嗖嗖的,一定穿保暖!(我穿的棉服,一整个后悔)其次证件从地铁下来就开始查证件了!去故宫会经过天安门广场,一定要准备好揣兜里!