基于RocketMQ的分布式事务解决方案
前言什么分布式事务?
随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用,分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。分布式事务产生的场景
例1:典型的场景就是微服务架构微服务之间通过远程调用完成事务操作。
比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。简言之:跨JVM进程产生分布式事务。
例2:单体系统访问多个数据库实例当单体系统需要访问多个数据库(实例)时就会产生分布式事务。
比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。简言之:跨数据库实例产生分布式事务。如何解决分布式事务
根据CAP和BASE理论,分布式事务解决的核心思想主要是:无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性,而最终一致性涉及到方式主要是二阶段提交和三阶段提交。
目前主要解决分布式事务的方式是通过RocketMQ或者阿里推出的SEATA框架解决,本文主要是通过RocketMQ实操来处理分布式事务的场景。RocketMQ基本使用
关于RocketMQ的基本的消息发送方式和消息类型,大家可以到官网自行学习:RocketMQ官网文档,MQ解决分布式事务主要是通过事务消息的方式来解决。实操
因为这篇文章主要是针对分布式事务,所以建表和业务逻辑不是十分严谨,不过作者会尽可能的保证分布式事务、MQ使用的严谨性。场景
场景比较简单,主要是一个跨行转账的操作,例:手机号为XXX的用户将钱从农行转到华夏银行。建表
数据库test:存在表abcperson、transferdetail
建表sql:CREATETABLEabcperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,2)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULT0COMMENT删除状态,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8COMMENT用户表;CREATETABLEtransferdetail(idint(11)NOTNULLAUTOINCREMENTCOMMENT明细ID,useridint(11)NOTNULLDEFAULT0COMMENT用户ID,moneydecimal(10,2)DEFAULT0。00COMMENT转账金额,msgidvarchar(50)DEFAULTCOMMENT消息ID,deleteflgchar(1)DEFAULT0COMMENT是否删除状态,createtimetimestampNULLDEFAULTCURRENTTIMESTAMP,PRIMARYKEY(id))ENGINEInnoDBAUTOINCREMENT7DEFAULTCHARSETutf8COMMENT消息发送表;
数据库test1:存在表hxperson
建表sql:CREATETABLEhxperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,0)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULTNULLCOMMENT0,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8;生产者PostMapping(abcToHx)publicStringabcToHx(Stringmobile,BigDecimaltransferMoney){AbcPersonabcPersonuserService。getByMobile(mobile);if(ObjectUtil。isNotEmpty(abcPerson)ObjectUtil。isNotEmpty(transferMoney)abcPerson。getBanlance()。doubleValue()transferMoney。doubleValue()){TransferDtotransferDtonewTransferDto();transferDto。setMobile(mobile);transferDto。setMoney(transferMoney);transferDto。setUserId(abcPerson。getUserId());transferDto。setDistributedId(snowFlakeUtil。snowflakeId());1发送半消息Stringdestinationtransfertopic:toHx;MessagemessageMessageBuilder。withPayload(JSON。toJSONString(transferDto))。build();TransactionSendResultresultrocketMQTemplate。sendMessageInTransaction(destination,message,null);log。warn(发送半消息:message,响应内容:result);returnSUCCESS;}returnFAIL;}解释:
标注1:之所以使用雪花算法生成唯一ID,是为了消费者消费时,确保消息不会重复消费,所以通过唯一ID确定(虽然消息ID通常是唯一的,不过在特定情况下可能会出现消息ID不同,但实际消息内容一样的情况(消费者主动重发、因客户端重投机制导致的重复等),这样会出现重复消费的问题,所以需要其他唯一标识保证消息消费的幂等性问题。)详情可看RocketMQ官网的最佳实践的解释,解释如下:牛逼啊!接私活必备的N个开源项目!赶快收藏2。1消费过程幂等
RocketMQ无法避免消息重复(ExactlyOnce),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。
首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
RocektMQ在发送半消息时,会调用我们重写的监听器的executeLocalTransaction(Messagemsg,Objectarg)方法来执行本地事务OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){StringjsonStrnewString((byte〔〕)msg。getPayload());log。error(arg:arg执行本地事务:JSON。toJSONString(msg),传输字段:jsonStr);TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);booleanflaguserService。transferMoney(transferDto。getUserId(),transferDto。getMoney(),transferDto。getDistributedId()。toString());在提交本地事务到return期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况if(flag){log。warn(executeLocalTransaction本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{log。warn(executeLocalTransaction本地事务执行失败,ROLLBACK);本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringjsonStrnewString((byte〔〕)msg。getPayload());TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);TransferDetailtransferDetailtransferDetailService。getByMsgId(transferDto。getDistributedId()。toString());if(ObjectUtil。isNotEmpty(transferDetail)){log。warn(本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}
UserService的transferMoney方法:TransactionalOverridepublicbooleantransferMoney(IntegeruserId,BigDecimalmoney,StringmsgId){booleanflagfalse;TransferDetailtransferDetailnewTransferDetail();transferDetail。setMoney(money);transferDetail。setUserId(userId);transferDetail。setMsgId(msgId);flagtransferDetailDao。insert(transferDetail)0;throwRuntimeException(flag,消息存储失败,异常回滚。。。);加悲观锁AbcPersonlockAbcPersongetByIdForUpdate(userId);if(ObjectUtil。isNotEmpty(lockAbcPerson)lockAbcPerson。getBanlance()。doubleValue()money。doubleValue()){修改用户金额BigDecimalcurrentBanlancenewBigDecimal(lockAbcPerson。getBanlance()。doubleValue()money。doubleValue());lockAbcPerson。setBanlance(currentBanlance);flagupdate(lockAbcPerson)0?true:false;throwRuntimeException(flag,修改用户金额失败,异常回滚。。。);}returnflag;}publicvoidthrowRuntimeException(booleanflag,Stringmsg){if(!flag){thrownewRuntimeException(msg);}}
解释:在执行完transferMoney方法到returnRocketMQLocalTransactionState期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况TransferDetail主要是将本地事务执行情况落磁盘,保证后续的checkLocalTransaction()可以通过回查数据,来确定消息是提交还是回滚。transferMoney方法之所以先加明细再加悲观锁,是为了降低不必要的加锁时间,提升性能。另外,搜索公众号顶级科技后台回复API接口,获取一份惊喜礼包。消费者
主要是通过实现RocketMQListener接口,监听响应的消息,来给出响应。OverridepublicvoidonMessage(MessageExtmessage){Stringkeynull;Stringvaluenull;try{构建存储redis的key、value,目的是为了保证消息不会被重复消费StringmsgIdmessage。getMsgId();TransferDtotransferDtoJSON。parseObject(newString(message。getBody()),TransferDto。class);keymq:transferDto。getDistributedId();log。warn(获取到当前消息的msgId:msgId);valueThread。currentThread()。getId():System。currentTimeMillis();加分布式锁booleanflagredisTemplate。opsForValue()。setIfAbsent(key,value,1,TimeUnit。HOURS);if(flag){try{正常业务流程执行,完成后该消息会自动完成,期间有其他消费者执行该消息,也无法拿到锁flaguserService。transferMoney(transferDto。getMobile(),transferDto。getMoney());if(!flag){thrownewRuntimeException(没有添加金额成功,抛出异常);}log。warn(成功消费);}catch(Exceptione){执行业务出现异常,释放分布式锁,通过value验证,保证不会错误的释放锁通过watch机制保证原子性操作,若watch被打断,则说明该key已经被修改,当然也就无需当前线程释放锁redisTemplate。watch(key);redisTemplate。multi();StringlockValue(String)redisTemplate。opsForValue()。get(key);if(StrUtil。isNotBlank(lockValue)lockValuevalue){redisTemplate。delete(key);}redisTemplate。exec();thrownewRuntimeException(释放分布式锁,因消费失败,故抛出异常);}}else{未拿到锁,抛出异常,则该消息便不会被成功消费thrownewRuntimeException(未拿到锁,不进行消费);}}catch(Exceptione){log。warn(消费异常);thrownewRuntimeException(消费异常);}}
消费者的UserService,转账操作TransactionalOverridepublicbooleantransferMoney(Stringmobile,BigDecimalmoney){booleanflagfalse;加悲观锁HxPersonhxPersonhxPersonDao。getByIdOrMobileForUpdate(null,mobile);if(ObjectUtil。isNotEmpty(hxPerson)){hxPerson。setBanlance(newBigDecimal(hxPerson。getBanlance()。doubleValue()money。doubleValue()));flaghxPersonDao。update(hxPerson)0?true:false;}returnflag;}验证
操作前test数据库的表数据
test1数据库的表数据
通过调用生产者的转账接口:
生产消费完成、再看下两个库的数据情况:
正常的分布式事务流程就走完了,大家有什么改进或疑问的点可以提出来,一起进步,共同学习!!!
原文链接:https:mp。weixin。qq。comsPPHbOtfFX5naG9cvxSk7Zg
对话前NV主帅廖良光从执教选手到执教自己踏上新旅程的廖良光虽然还无法确定自己的归期,但是他的只言片语中仍然透露着对这个行业的不舍。作者二闹图片来自网络将时间退回到上个月21号,与不少城市都在经历难耐的炙热时,当时的深圳可
王俊凯希望EDG今年继续冲击S冠LPLS12加油近日王俊凯受邀参加了英雄联盟嘉年华明星表演赛,之后在采访中王俊凯回应了JieJie对他的评价,并表示希望今年EDG也能继续冲击S冠。之前JieJie曾在采访中提到有看过王俊凯的刀妹
明早4点迪士尼ampampamp漫威游戏发布会时长23分钟据推主PlayStationGameSize爆料,明天早上4点举行的迪士尼漫威游戏发布会仅仅持续23分钟。此前,迪士尼在新闻稿中指出,除了全新公布外,粉丝们可以期待迪士尼梦幻星谷漫
轻松学名方小建中汤(一)血虚重用哪两味药?熟地白芍。如果血淤呢?当归川芎。当归川芎它是行血。熟地白芍,它是养血,所以秋冬养阴熟地白芍。春夏养阳,当归川芎直达头面,春夏养阳,。好!我们回过来看小建中汤,今天
答疑解惑中秋赏月,夜宵咋吃更安全中秋赏月,有些人习惯在正餐后再吃一顿丰盛的夜宵,待抬头看圆月低头看圆肚后再去休息。然而,吃夜宵无益健康的常识,却也让人着实纠结。中秋夜宵怎么吃更安全更健康呢?成都市食品检验研究院副
我的三十岁有时候感觉真的很神奇,人一旦迈过三十岁的门槛,感觉跟二十多岁完全不一样了。首先是自身的变化,回溯过往,感觉二十多岁的时光很久远,甚至可以叫做前尘往事了,而事实上不过才几年而已。思维
空空空今天我回到了家乡,感觉凄凉万分,原来的大村庄没炊烟升起,家家都是紧闭了大门,不见金灿灿稻田,四处杂草丛生,只有希少的几位老人妇女寒喧说着远方的孩子,脸上挂着无奈的笑容。我走进了儿时
女人记住这6个穿衣公式,毫不费劲穿出时髦感,还美得很气质一个时髦的造型是有很多环节构成,服装风格颜色搭配和穿衣技巧的运用,都对时尚感高低有着影响。而这些环节中,服装搭配占据了相当大一部分因素,只要服装搭配方式正确,很容易打造出时髦的造型
苹果创始人乔布斯的初代苹果竟拍出天价?据报道,在最近的一次,苹果硬件拍卖会上。一部全新未拆封的苹果iphone1代机竟然拍出了超过35000美元折合人民币高达23。94万。这还没完,而乔布斯拥有的Apple1原型机拍出
一加手机是OPPO的吗?一加成为OPPO旗下子品牌对于一加手机,许多消费者都并不陌生,不将就的产品理念让一加手机在消费者心中留下了深刻的印象。从品牌创立以来,历经9年沉淀,一加手机一路高歌猛进,在国内海外市场都有众多忠实拥趸。在2
iPhone14手机30W快充,国产手机200W,苹果快充技术为何那么差?备受关注的iPhone14手机将在9月6日正式发布,关于这款智能手机的一些相关信息也相继出来了,其中,关于这款智能手机的充电技术也在网上出现了。根据在网上获取到的信息来看,iPho