专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

基于RocketMQ的分布式事务解决方案

  前言什么分布式事务?
  随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用,分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。分布式事务产生的场景
  例1:典型的场景就是微服务架构微服务之间通过远程调用完成事务操作。
  比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。简言之:跨JVM进程产生分布式事务。
  例2:单体系统访问多个数据库实例当单体系统需要访问多个数据库(实例)时就会产生分布式事务。
  比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。简言之:跨数据库实例产生分布式事务。如何解决分布式事务
  根据CAP和BASE理论,分布式事务解决的核心思想主要是:无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性,而最终一致性涉及到方式主要是二阶段提交和三阶段提交。
  目前主要解决分布式事务的方式是通过RocketMQ或者阿里推出的SEATA框架解决,本文主要是通过RocketMQ实操来处理分布式事务的场景。RocketMQ基本使用
  关于RocketMQ的基本的消息发送方式和消息类型,大家可以到官网自行学习:RocketMQ官网文档,MQ解决分布式事务主要是通过事务消息的方式来解决。实操
  因为这篇文章主要是针对分布式事务,所以建表和业务逻辑不是十分严谨,不过作者会尽可能的保证分布式事务、MQ使用的严谨性。场景
  场景比较简单,主要是一个跨行转账的操作,例:手机号为XXX的用户将钱从农行转到华夏银行。建表
  数据库test:存在表abcperson、transferdetail
  建表sql:CREATETABLEabcperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,2)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULT0COMMENT删除状态,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8COMMENT用户表;CREATETABLEtransferdetail(idint(11)NOTNULLAUTOINCREMENTCOMMENT明细ID,useridint(11)NOTNULLDEFAULT0COMMENT用户ID,moneydecimal(10,2)DEFAULT0。00COMMENT转账金额,msgidvarchar(50)DEFAULTCOMMENT消息ID,deleteflgchar(1)DEFAULT0COMMENT是否删除状态,createtimetimestampNULLDEFAULTCURRENTTIMESTAMP,PRIMARYKEY(id))ENGINEInnoDBAUTOINCREMENT7DEFAULTCHARSETutf8COMMENT消息发送表;
  数据库test1:存在表hxperson
  建表sql:CREATETABLEhxperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,0)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULTNULLCOMMENT0,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8;生产者PostMapping(abcToHx)publicStringabcToHx(Stringmobile,BigDecimaltransferMoney){AbcPersonabcPersonuserService。getByMobile(mobile);if(ObjectUtil。isNotEmpty(abcPerson)ObjectUtil。isNotEmpty(transferMoney)abcPerson。getBanlance()。doubleValue()transferMoney。doubleValue()){TransferDtotransferDtonewTransferDto();transferDto。setMobile(mobile);transferDto。setMoney(transferMoney);transferDto。setUserId(abcPerson。getUserId());transferDto。setDistributedId(snowFlakeUtil。snowflakeId());1发送半消息Stringdestinationtransfertopic:toHx;MessagemessageMessageBuilder。withPayload(JSON。toJSONString(transferDto))。build();TransactionSendResultresultrocketMQTemplate。sendMessageInTransaction(destination,message,null);log。warn(发送半消息:message,响应内容:result);returnSUCCESS;}returnFAIL;}解释:
  标注1:之所以使用雪花算法生成唯一ID,是为了消费者消费时,确保消息不会重复消费,所以通过唯一ID确定(虽然消息ID通常是唯一的,不过在特定情况下可能会出现消息ID不同,但实际消息内容一样的情况(消费者主动重发、因客户端重投机制导致的重复等),这样会出现重复消费的问题,所以需要其他唯一标识保证消息消费的幂等性问题。)详情可看RocketMQ官网的最佳实践的解释,解释如下:牛逼啊!接私活必备的N个开源项目!赶快收藏2。1消费过程幂等
  RocketMQ无法避免消息重复(ExactlyOnce),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。
  首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
  实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过
  msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
  RocektMQ在发送半消息时,会调用我们重写的监听器的executeLocalTransaction(Messagemsg,Objectarg)方法来执行本地事务OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){StringjsonStrnewString((byte〔〕)msg。getPayload());log。error(arg:arg执行本地事务:JSON。toJSONString(msg),传输字段:jsonStr);TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);booleanflaguserService。transferMoney(transferDto。getUserId(),transferDto。getMoney(),transferDto。getDistributedId()。toString());在提交本地事务到return期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况if(flag){log。warn(executeLocalTransaction本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{log。warn(executeLocalTransaction本地事务执行失败,ROLLBACK);本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringjsonStrnewString((byte〔〕)msg。getPayload());TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);TransferDetailtransferDetailtransferDetailService。getByMsgId(transferDto。getDistributedId()。toString());if(ObjectUtil。isNotEmpty(transferDetail)){log。warn(本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}
  UserService的transferMoney方法:TransactionalOverridepublicbooleantransferMoney(IntegeruserId,BigDecimalmoney,StringmsgId){booleanflagfalse;TransferDetailtransferDetailnewTransferDetail();transferDetail。setMoney(money);transferDetail。setUserId(userId);transferDetail。setMsgId(msgId);flagtransferDetailDao。insert(transferDetail)0;throwRuntimeException(flag,消息存储失败,异常回滚。。。);加悲观锁AbcPersonlockAbcPersongetByIdForUpdate(userId);if(ObjectUtil。isNotEmpty(lockAbcPerson)lockAbcPerson。getBanlance()。doubleValue()money。doubleValue()){修改用户金额BigDecimalcurrentBanlancenewBigDecimal(lockAbcPerson。getBanlance()。doubleValue()money。doubleValue());lockAbcPerson。setBanlance(currentBanlance);flagupdate(lockAbcPerson)0?true:false;throwRuntimeException(flag,修改用户金额失败,异常回滚。。。);}returnflag;}publicvoidthrowRuntimeException(booleanflag,Stringmsg){if(!flag){thrownewRuntimeException(msg);}}
  解释:在执行完transferMoney方法到returnRocketMQLocalTransactionState期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况TransferDetail主要是将本地事务执行情况落磁盘,保证后续的checkLocalTransaction()可以通过回查数据,来确定消息是提交还是回滚。transferMoney方法之所以先加明细再加悲观锁,是为了降低不必要的加锁时间,提升性能。另外,搜索公众号顶级科技后台回复API接口,获取一份惊喜礼包。消费者
  主要是通过实现RocketMQListener接口,监听响应的消息,来给出响应。OverridepublicvoidonMessage(MessageExtmessage){Stringkeynull;Stringvaluenull;try{构建存储redis的key、value,目的是为了保证消息不会被重复消费StringmsgIdmessage。getMsgId();TransferDtotransferDtoJSON。parseObject(newString(message。getBody()),TransferDto。class);keymq:transferDto。getDistributedId();log。warn(获取到当前消息的msgId:msgId);valueThread。currentThread()。getId():System。currentTimeMillis();加分布式锁booleanflagredisTemplate。opsForValue()。setIfAbsent(key,value,1,TimeUnit。HOURS);if(flag){try{正常业务流程执行,完成后该消息会自动完成,期间有其他消费者执行该消息,也无法拿到锁flaguserService。transferMoney(transferDto。getMobile(),transferDto。getMoney());if(!flag){thrownewRuntimeException(没有添加金额成功,抛出异常);}log。warn(成功消费);}catch(Exceptione){执行业务出现异常,释放分布式锁,通过value验证,保证不会错误的释放锁通过watch机制保证原子性操作,若watch被打断,则说明该key已经被修改,当然也就无需当前线程释放锁redisTemplate。watch(key);redisTemplate。multi();StringlockValue(String)redisTemplate。opsForValue()。get(key);if(StrUtil。isNotBlank(lockValue)lockValuevalue){redisTemplate。delete(key);}redisTemplate。exec();thrownewRuntimeException(释放分布式锁,因消费失败,故抛出异常);}}else{未拿到锁,抛出异常,则该消息便不会被成功消费thrownewRuntimeException(未拿到锁,不进行消费);}}catch(Exceptione){log。warn(消费异常);thrownewRuntimeException(消费异常);}}
  消费者的UserService,转账操作TransactionalOverridepublicbooleantransferMoney(Stringmobile,BigDecimalmoney){booleanflagfalse;加悲观锁HxPersonhxPersonhxPersonDao。getByIdOrMobileForUpdate(null,mobile);if(ObjectUtil。isNotEmpty(hxPerson)){hxPerson。setBanlance(newBigDecimal(hxPerson。getBanlance()。doubleValue()money。doubleValue()));flaghxPersonDao。update(hxPerson)0?true:false;}returnflag;}验证
  操作前test数据库的表数据
  test1数据库的表数据
  通过调用生产者的转账接口:
  生产消费完成、再看下两个库的数据情况:
  正常的分布式事务流程就走完了,大家有什么改进或疑问的点可以提出来,一起进步,共同学习!!!
  原文链接:https:mp。weixin。qq。comsPPHbOtfFX5naG9cvxSk7Zg

哈弗h6新能源上市,纯电续航高达110公里,售15。98万起本地车圈百事通,解析车市新鲜事第140期,本期文章共951字阅读2分钟北京时间2022年9月28日晚,哈弗H6新能源正式上市,共推出三款配置,指导价在15。98万至17。38万区间几百元捡漏一只汤丽柏琦风琴包汤丽柏琦风琴包原来这么好看的现在才发现哈哈哈包包原本是帆布肩带的被我替换成皮的了虽然价格便宜但百分百是正品的还出具了中检的鉴定证书呢收到后果然是我喜欢的包包呀包包是海蓝色为主调老花油价调整消息今天10月1日,全国各地加油站9295汽油价格今天是国庆假日的第一天,最新的数据显示油价预计调整幅度达到了0。08元升,但是结合现在原油市场的情况,油价预计调整幅度一直出现缩窄,虽然现在本次调价周期已经到了后期,但是因为节假日2款低速电动车新品,外观新潮,最大续航180公里,9000块起阅读本文前,请您先点击上面的关注二字,可以快速订阅我们的最新内容,感谢支持还有几个月就过年了,很多用户正在考虑入手一辆低速四轮电动车,主要给家里老人出行代步使用,也有部分是为了接娃官方火箭雷霆多人交易正式达成,火箭得到次轮为2026年火箭官方今日正式宣布,球队与雷霆的多人交易正式达成。据悉,这笔交易的具体内容为,火箭送出斯特林布朗特雷伯克马基斯克里斯和大卫努瓦巴,从雷霆得到了德里克费沃斯莫哈克利斯泰杰罗姆泰奥马为什么我说,这次小米Civi2有点争议之前文章提过,小米Civi几乎就是小米家的颜值天花板。不管是设计和用料,还是轻薄的表现,我都相当满意。这个刚满1岁的系列的手机迎来了更新。但是嘛,这次Civi2有点争议。具体啥情况任泽平9天6次炮轰苹果,不要再沦为高端韭菜,支持国产近日,一学者任泽平9天6次对苹果发起了冲击,表示所谓的高端PRO版不过是割高端韭菜的专机,阉割了很多的功能,还把配件价格定的虚高,国内竟然还有那么多人无脑地崇拜,一出新机就掏空积蓄并发编程CompletableFuture异步编程详解前言在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。今天的分享主要带大家从一个实消息今天下午5点开抢!德安政府消费券又又又来啦!约惠金秋乐购德安2022德安县消费券又又又来啦!为进一步激发德安县消费市场活力,丰富市民消费选择,提升市民消费热情,德安县开展2022年国庆促消费商贸活动,发放通用券文旅券华夏阳光爆款小米显示器只要599元IPS面板买显示器怎么选,如果你不是富豪,当然是选择经济实惠具有高性价比的产品了。尤其是主流尺寸主流规格的显示器,普遍价格非常亲民。今天给大家推荐的是小米Redmi1A显示器,这款显示器拥有2022年最值得购买的5款手机,内行人强烈推荐,跟着选不吃亏您在阅读前请点击上面的关注二字,后续会第一时间为您提供更多有价值的相关内容,感谢您的支持。每年手机厂商都会发布一些非常经典的机型,这些机型不一定是高端旗舰机,也可能是一些千元机,这
对话前NV主帅廖良光从执教选手到执教自己踏上新旅程的廖良光虽然还无法确定自己的归期,但是他的只言片语中仍然透露着对这个行业的不舍。作者二闹图片来自网络将时间退回到上个月21号,与不少城市都在经历难耐的炙热时,当时的深圳可王俊凯希望EDG今年继续冲击S冠LPLS12加油近日王俊凯受邀参加了英雄联盟嘉年华明星表演赛,之后在采访中王俊凯回应了JieJie对他的评价,并表示希望今年EDG也能继续冲击S冠。之前JieJie曾在采访中提到有看过王俊凯的刀妹明早4点迪士尼ampampamp漫威游戏发布会时长23分钟据推主PlayStationGameSize爆料,明天早上4点举行的迪士尼漫威游戏发布会仅仅持续23分钟。此前,迪士尼在新闻稿中指出,除了全新公布外,粉丝们可以期待迪士尼梦幻星谷漫轻松学名方小建中汤(一)血虚重用哪两味药?熟地白芍。如果血淤呢?当归川芎。当归川芎它是行血。熟地白芍,它是养血,所以秋冬养阴熟地白芍。春夏养阳,当归川芎直达头面,春夏养阳,。好!我们回过来看小建中汤,今天答疑解惑中秋赏月,夜宵咋吃更安全中秋赏月,有些人习惯在正餐后再吃一顿丰盛的夜宵,待抬头看圆月低头看圆肚后再去休息。然而,吃夜宵无益健康的常识,却也让人着实纠结。中秋夜宵怎么吃更安全更健康呢?成都市食品检验研究院副我的三十岁有时候感觉真的很神奇,人一旦迈过三十岁的门槛,感觉跟二十多岁完全不一样了。首先是自身的变化,回溯过往,感觉二十多岁的时光很久远,甚至可以叫做前尘往事了,而事实上不过才几年而已。思维空空空今天我回到了家乡,感觉凄凉万分,原来的大村庄没炊烟升起,家家都是紧闭了大门,不见金灿灿稻田,四处杂草丛生,只有希少的几位老人妇女寒喧说着远方的孩子,脸上挂着无奈的笑容。我走进了儿时女人记住这6个穿衣公式,毫不费劲穿出时髦感,还美得很气质一个时髦的造型是有很多环节构成,服装风格颜色搭配和穿衣技巧的运用,都对时尚感高低有着影响。而这些环节中,服装搭配占据了相当大一部分因素,只要服装搭配方式正确,很容易打造出时髦的造型苹果创始人乔布斯的初代苹果竟拍出天价?据报道,在最近的一次,苹果硬件拍卖会上。一部全新未拆封的苹果iphone1代机竟然拍出了超过35000美元折合人民币高达23。94万。这还没完,而乔布斯拥有的Apple1原型机拍出一加手机是OPPO的吗?一加成为OPPO旗下子品牌对于一加手机,许多消费者都并不陌生,不将就的产品理念让一加手机在消费者心中留下了深刻的印象。从品牌创立以来,历经9年沉淀,一加手机一路高歌猛进,在国内海外市场都有众多忠实拥趸。在2iPhone14手机30W快充,国产手机200W,苹果快充技术为何那么差?备受关注的iPhone14手机将在9月6日正式发布,关于这款智能手机的一些相关信息也相继出来了,其中,关于这款智能手机的充电技术也在网上出现了。根据在网上获取到的信息来看,iPho
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网