dubbo分布式事务解决方案_分布式事务之解决方案(TCC)

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 15:46   3980   0

4. 分布式事务解决方案之TCC

4.1. 什么是TCC事务

TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作 :预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作既回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。

93279d5a37cefafd733e53ed8b3e59f5.png


分支事务失败的情况 :

a1234d32c5632e9931d0b08dc5d19b95.png


TCC分为三个阶段 :

  1. Try阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm一起才能真正构成一个完整的业务逻辑。
  2. Confirm阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行Confirm。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即 :只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
  3. Cancel阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
  4. TM事务管理器
    TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑结构和软件复用。
    TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm和Cancel失败需进行重试,因此需要实现为幂等性是指同一个操作无论请求多少次,其结果都相同。

4.2. TCC解决方案

目前市面上的TCC框架众多比如下面这几种 :

16089d911b1504c263f7f6564bae1b47.png


Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC原理以及事务协调运作的过程,因此更倾向于轻量级易于理解的框架。
Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等RPC框架进行分布式事务。它目前支持以下特性 :

  • 支持嵌套事务(Nested transaction support)。
  • 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
  • 支持SpringBoot-starter项目启动,使用简单。
  • RPC框架支持 :dubbo、motan、springcloud。
  • 本地事务存储支持 :redis、mongodb、zookeeper、file、mysql。
  • 事务日志序列化支持 :java、hessian、kryo、protostuff。
  • 采用Aspect AOP切面思想与Spring无缝集成,天然支持集群。
  • RPC事务恢复,超时异常恢复等。
    Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。
    Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。
    Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。
    官网介绍 :https://dromara.org/website/zh-cn/docs/hmily/index.htmlTCC需要注意三种异常处理分别是空回滚、幂等、悬挂 :空回滚
    在没有调用TCC资源Try方法的情况下,调用来二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。
    出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
    解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行来,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID和分支事务ID,第一阶段Try方法里会插入一条记录,表示一阶段执行来。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。幂等
    通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC的二阶段Try、Confirm和Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
    解决思路在上述 “分支事务记录”中增加执行状态,每次执行前都查询该状态。悬挂
    悬挂就是对于一个分布式事务,其二阶段Cancel接口比Try接口先执行。
    出现原因是在RPC调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时RPC调用的网络发生拥堵,通常RPC调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后无法继续处理。
    解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。举例,场景为A转账30元给B,A和B账户在不同的服务。方案 1
    账户A

try: 检查余额是否够30元 扣减30元 confirm: 空 cancel: 增加30元

账户B

try: 增加30元 confirm: 空 cancel: 减少30元

方案1说明:
1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此, 我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。 Confirm 接口表示正式 提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel 接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。
2)账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把 Try 接口里加的 30 元再减去。方案1的问题分析:
1)如果账户A的try没有执行在cancel则就多加了30元。
2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。
3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。
4)如果账户B的try没有执行在cancel则就多减了30元。问题解决:
1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。
2)try、cancel、confirm方法实现幂等。
3)账户B在try方法中不允许更新账户金额,在confirm中更新账户金额。
4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。优化方案:
账户A :

try: try幂等校验 try悬挂处理 检查余额是否够30元 扣减30元 confirm: 空 cancel: cancel幂等校验 cancel空回滚处理 增加可用余额30元

账户B :

try: 空 confirm: confirm幂等校验 正式增加30元 cancel: 空

4.3. Hmily实现TCC事务

4.3.1.业务说明

通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。
两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账制定金额。
上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性事务。

c0dc50d716bb0f83dcd339d3fa01189a.png

4.3.2. 程序组成部分

数据库:MySQL-5.7.25
JDK:64位 jdk1.8.0_201 微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE Hmily:hmily-springcloud.2.0.4-RELEASE
微服务及数据库的关系 :
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2
服务注册中心:dtx/discover-server

4.3.3. 创建数据库

创建hmily数据库,用于存储hmily框架记录的数据。
CREATE DATABASE hmily CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;创建bank1库,并导入以下表结构和数据(包含张三账户)
CREATE DATABASE bank1 CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;创建bank2库,并导入以下表结构和数据(包含李四账户)
CREATE DATABASE bank2 CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
DROP TABLE IF EXISTS account_info; CREATE TABLE account_info (id bigint(20) NOT NULL AUTO_INCREMENT,account_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户 主姓名’,account_no varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行 卡号’,account_password varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码’,account_balance double NULL DEFAULT NULL COMMENT ‘帐户余额’,
PRIMARY KEY (id) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO account_info VALUES (2, ‘张三的账户’, ‘1’, ‘’, 10000);
每个数据库都创建try、confirm、cancel三张日志表:

CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

4.3.5 工程dtx-tcc-demo

(1)引入maven依赖

<dependency> <groupId>org.dromara</groupId> <artifactId>hmily‐springcloud</artifactId> <version>2.0.4‐RELEASE</version> </dependency>

(2)配置hmily
application.yml :

org: dromara: hmily : serializer : kryo recoverDelayTime : 128 retryMax : 30 scheduledDelay : 128 scheduledThreadMax : 10 repositorySupport : db started: true hmilyDbConfig : driverClassName : com.mysql.jdbc.Driver url : jdbc:mysql://localhost:3306/bank?useUnicode=true username : root password : root

新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:

@Bean public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer")); hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap; }

启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:

@SpringBootApplication @EnableDiscoveryClient @EnableHystrix @EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"}) @ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"}) public class Bank1HmilyServer { public static void main(String[] args) { SpringApplication.run(Bank1HmilyServer.class, args); } }

4.3.7 dtx-tcc-demo-bank1

dtx-tcc-demo-bank1实现try和cancel方法,如下 :

try: try幂等校验 try悬挂处理 检查余额是够扣减金额 扣减金额 confirm: 空 cancel: cancel幂等校验 cancel空回滚处理 增加可用余额

  1. Dao

@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ") int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo); @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo); /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); }

2)try和cancel方法

@Slf4j @Service public class AccountInfoServiceImpl implements AccountInfoService { @Autowired private AccountInfoDao accountInfoDao; @Autowired private Bank2Client bank2Client; /** * 只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字 * * @param accountNo * @param amount */ @Hmily(confirmMethod = "commit", cancelMethod = "rollback") @Transactional(rollbackFor = Exception.class) @Override public void updateAccountBalance(String accountNo, Double amount) { // 事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank1 Service begin try ..." + transId); int existTry = accountInfoDao.isExistTry(transId); // 幂等判断 判断local_try_log表中是否有try日志记录,如果有不再执行 // try幂等校验 if (existTry > 0) { http://log.info("Bank1 Service 已经执行try,无需重复执行,事务id :{}", transId); return; } // try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行 if (accountInfoDao.isExistCancel(transId) > 0 || accountInfoDao.isExistConfirm(transId) > 0) { http://log.info("Bank1 Service 已经执行confirm或cancel,悬挂处理,事务id :{}", transId); return; } // 从账户扣减 if (accountInfoDao.subtractAccountBalance(accountNo, amount) <= 0) { // 扣减失败 throw new HmilyRuntimeException("bank1 exception, 扣减失败,事务id :{}" + transId); } // 增加本地事务try成功记录,用于幂等性控制标识 accountInfoDao.addTry(transId); // 远程调用bank2 if (bank2Client.transfer(amount)) { throw new HmilyRuntimeException("bank2Client exception,事务id:{}"+transId); } // 异常一定要抛在Hmily里面 if (amount == 10) { throw new RuntimeException("Bank2 make exception 10"); } http://log.info("Bank2 Service end try .." + transId); } @Transactional(rollbackFor = Exception.class) public void commit(String accountNo, double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank1 Service begin commit .." + transId); } @Transactional(rollbackFor = Exception.class) public void rollback(String accountNo, double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank1 Service begin rollback..." + transId); // 空回滚处理,try阶段没有执行什么也不用做。 if (accountInfoDao.isExistTry(transId) == 0) { http://log.info("Bank1 try 阶段失败 。。无需rollback " + transId); return; } // 幂等性校验,已经执行过了,什么也不用做 if (accountInfoDao.isExistCancel(transId) > 0) { http://log.info("Bank1 已经执行过rollback 。。无需再次rollback " + transId); return; } // 再将金额加回账户 accountInfoDao.addAccountBalance(accountNo, amount); // 添加cancel日志,用于幂等性控制标识 accountInfoDao.addCancel(transId); http://log.info("Bank1 Service end rollback ... " + transId); } }

3)feignClient

@FeignClient(value = "seata-demo-bank2", fallback = Bank2Client.class) public interface Bank2Client { @GetMapping("/bank2/transfer") @Hmily Boolean transfer(@RequestParam("amount") Double amount); }

  1. Controller

@RestController public class Bank1Controller { @Autowired private AccountInfoService accountInfoService; @RequestMapping("/transfer") public String test(@RequestParam("amount") Double amount) { accountInfoService.updateAccountBalance("1", amount); return "bank1" + amount; } }

4.3.8 dtx-tcc-demo-bank2

dtx-tcc-demo-bank2实现如下功能 :

try: 空 confirm: confirm幂等校验 正式增加金额 cancel: 空

1)Dao

@Component @Mapper public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo); @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo); /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); }

2)实现confirm方法

@Slf4j @Service public class AccountInfoServiceImpl implements AccountInfoService { @Autowired private AccountInfoDao accountInfoDao; @Transactional(rollbackFor = Exception.class) @Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod") @Override public void updateAccountBalance(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank2 Service Begin try ... " + transId); } @Transactional(rollbackFor = Exception.class) public void confirmMethod(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank2 Service commit ..." + transId); // 幂等性校验,已经执行过了,什么也不用做 if (accountInfoDao.isExistConfirm(transId) > 0) { http://log.info("Bank2 已经执行过confirm 。。无需再次confirm " + transId); return; } // 正式增加金额 accountInfoDao.addAccountBalance(accountNo, amount); // 添加confirm日志 accountInfoDao.addConfirm(transId); } @Transactional(rollbackFor = Exception.class) public void cancelMethod(String accountNo, Double amount) { String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); http://log.info("Bank2 Service begin cancel ... " + transId); } }

3)Controller

@RestController public class Bank2Controller { @Autowired private AccountInfoService accountInfoService; @RequestMapping("/transfer") public Boolean test2(@RequestParam("amount") Double amount) { accountInfoService.updateAccountBalance("2", amount); return true; } }

3.3.9 测试场景

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四分支事务回滚成功。
  • 分支事务超时测试。

4.4. 小结

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP