前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式事务 TCC-Transaction 源码分析 —— 事务恢复

分布式事务 TCC-Transaction 源码分析 —— 事务恢复

作者头像
芋道源码
发布2018-03-27 15:12:22
1.2K0
发布2018-03-27 15:12:22
举报
文章被收录于专栏:芋道源码1024

1. 概述

本文分享 TCC 恢复。主要涉及如下二个 package 路径下的类:

  • org.mengyun.tcctransaction.recover
    • RecoverConfig,事务恢复配置接口
    • TransactionRecovery,事务恢复逻辑
  • org.mengyun.tcctransaction.spring.recover
    • DefaultRecoverConfig,默认事务恢复配置实现
    • RecoverScheduledJob,事务恢复定时任务

本文涉及到的类关系如下图( 打开大图 ):

在《TCC-Transaction 源码分析 —— 事务存储器》中,事务信息被持久化到外部的存储器中。事务存储是事务恢复的基础。通过读取外部存储器中的异常事务,定时任务会按照一定频率对事务进行重试,直到事务完成或超过最大重试次数。

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 TCC-Transaction 点赞!传送门

ps:笔者假设你已经阅读过《tcc-transaction 官方文档 —— 使用指南1.2.x》。

2. 事务重试配置

org.mengyun.tcctransaction.recover.RecoverConfig,事务恢复配置接口,实现代码如下:

代码语言:javascript
复制
public interface RecoverConfig {

    /**
     * @return 最大重试次数
     */
    int getMaxRetryCount();

    /**
     * @return 恢复间隔时间,单位:秒
     */
    int getRecoverDuration();

    /**
     * @return cron 表达式
     */
    String getCronExpression();

    /**
     * @return 延迟取消异常集合
     */
    Set<Class<? extends Exception>> getDelayCancelExceptions();

    /**
     * 设置延迟取消异常集合
     *
     * @param delayRecoverExceptions 延迟取消异常集合
     */
    void setDelayCancelExceptions(Set<Class<? extends Exception>> delayRecoverExceptions);

}
  • #getMaxRetryCount(),单个事务恢复最大重试次数。超过最大重试次数后,目前仅打出错误日志,下文会看到实现。
  • #getRecoverDuration(),单个事务恢复重试的间隔时间,单位:秒。
  • #getCronExpression(),定时任务 cron 表达式。
  • #getDelayCancelExceptions(),延迟取消异常集合。

org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig默认事务恢复配置实现,实现代码如下:

代码语言:javascript
复制
public class DefaultRecoverConfig implements RecoverConfig {

    public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();

    /**
     * 最大重试次数
     */
    private int maxRetryCount = 30;

    /**
     * 恢复间隔时间,单位:秒
     */
    private int recoverDuration = 120;

    /**
     * cron 表达式
     */
    private String cronExpression = "0 */1 * * * ?";

    /**
     * 延迟取消异常集合
     */
    private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>();

    public DefaultRecoverConfig() {
        delayCancelExceptions.add(OptimisticLockException.class);
        delayCancelExceptions.add(SocketTimeoutException.class);
    }

    @Override
    public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
        this.delayCancelExceptions.addAll(delayCancelExceptions);
    }

}
  • maxRetryCount,单个事务恢复最大重试次数 为 30。
  • recoverDuration,单个事务恢复重试的间隔时间为 120 秒。
  • cronExpression,定时任务 cron 表达式为 "0 */1 * * * ?",每分钟执行一次。如果你希望定时任务执行的更频繁,可以修改 cron 表达式,例如 0/30 * * * * ?,每 30 秒执行一次。
  • delayCancelExceptions,延迟取消异常集合。在 DefaultRecoverConfig 构造方法里,预先添加了 OptimisticLockException / SocketTimeoutException 。
    • 官方解释:事务恢复的疑问
    • 这块笔者还有一些疑问,如果有别的可能性导致这个情况,麻烦告知下笔者。谢谢。
    • 官方解释:为什么 tcc 事务切面中对乐观锁与socket超时异常不做回滚处理,只抛异常?
    • 针对 SocketTimeoutException :try 阶段,本地参与者调用远程参与者( 远程服务,例如 Dubbo,Http 服务),远程参与者 try 阶段的方法逻辑执行时间较长,超过 Socket 等待时长,发生 SocketTimeoutException,如果立刻执行事务回滚,远程参与者 try 的方法未执行完成,可能导致 cancel 的方法实际未执行( try 的方法未执行完成,数据库事务【非 TCC 事务】未提交,cancel 的方法读取数据时发现未变更,导致方法实际未执行,最终 try 的方法执行完后,提交数据库事务【非 TCC 事务】,较为极端 ),最终引起数据不一致。在事务恢复时,会对这种情况的事务进行取消回滚,如果此时远程参与者的 try 的方法还未结束,还是可能发生数据不一致。
    • 针对 OptimisticLockException :还是 SocketTimeoutException 的情况,事务恢复间隔时间小于 Socket 超时时间,此时事务恢复调用远程参与者取消回滚事务,远程参与者下次更新事务时,会因为乐观锁更新失败,抛出 OptimisticLockException。如果 CompensableTransactionInterceptor 此时立刻取消回滚,可能会和定时任务的取消回滚冲突,因此统一交给定时任务处理。

3. 事务重试定时任务

org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事务恢复定时任务,基于 Quartz 实现调度,不断不断不断执行事务恢复。实现代码如下:

代码语言:javascript
复制
public class RecoverScheduledJob {

    private TransactionRecovery transactionRecovery;

    private TransactionConfigurator transactionConfigurator;

    private Scheduler scheduler;

    public void init() {
        try {
            // Quartz JobDetail
            MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
            jobDetail.setTargetObject(transactionRecovery);
            jobDetail.setTargetMethod("startRecover");
            jobDetail.setName("transactionRecoveryJob");
            jobDetail.setConcurrent(false); // 禁止并发
            jobDetail.afterPropertiesSet();
            // Quartz CronTriggerFactoryBean
            CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
            cronTrigger.setBeanName("transactionRecoveryCronTrigger");
            cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
            cronTrigger.setJobDetail(jobDetail.getObject());
            cronTrigger.afterPropertiesSet();
            // 启动任务调度
            scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
            // 启动 Quartz Scheduler
            scheduler.start();
        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
}
  • 调用 MethodInvokingJobDetailFactoryBean#setConcurrent(false) 方法,禁用任务并发执行。
  • 调用 MethodInvokingJobDetailFactoryBean#setTargetObject(...) + MethodInvokingJobDetailFactoryBean#setTargetMethod(...) 方法,设置任务调用 TransactionRecovery#startRecover(...) 方法执行。

如果应用集群部署,会不会相同事务被多个定时任务同时重试

答案是不会,事务在重试时会乐观锁更新,同时只有一个应用节点能更新成功。

官方解释:多机部署下,所有机器都宕机,从异常中恢复时,所有的机器岂不是都可以查询到所有的需要恢复的服务?

当然极端情况下,Socket 调用超时时间大于事务重试间隔,第一个节点在重试某个事务,一直未执行完成,第二个节点已经可以重试。

ps:建议,Socket 调用超时时间小于事务重试间隔。

是否定时任务和应用服务器解耦

蚂蚁金服的分布式事务服务 DTS 采用 client-server 模式:

  • xts-client :负责事务的创建、提交、回滚、记录。
  • xts-server :负责异常事务的恢复。

FROM 《蚂蚁金融云 DTS 文档》 分布式事务服务 (Distributed Transaction Service, DTS) 是一个分布式事务框架,用来保障在大规模分布式环境下事务的最终一致性。DTS 从架构上分为 xts-client 和 xts-server 两部分,前者是一个嵌入客户端应用的 JAR 包,主要负责事务数据的写入和处理;后者是一个独立的系统,主要负责异常事务的恢复。

4. 异常事务恢复

org.mengyun.tcctransaction.recover.TransactionRecovery,异常事务恢复,实现主体代码如下:

```Java public class TransactionRecovery {

代码语言:javascript
复制
/**
 * 启动恢复事务逻辑
 */
public void startRecover() {
    // 加载异常事务集合
    List<Transaction> transactions = loadErrorTransactions();
    // 恢复异常事务集合
    recoverErrorTransactions(transactions);
}

} ```

4.1 加载异常事务集合

调用 #loadErrorTransactions() 方法,加载异常事务集合。实现代码如下:

代码语言:javascript
复制
private List<Transaction> loadErrorTransactions() {
   TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
   long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
   RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
   return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
  • 异常事务的定义:当前时间超过 - 事务变更时间( 最后执行时间 ) >= 事务恢复间隔( RecoverConfig#getRecoverDuration() )。这里有一点要注意,已完成的事务会从事务存储器删除。

4.2 恢复异常事务集合

调用 #recoverErrorTransactions(...) 方法,恢复异常事务集合。实现代码如下:

代码语言:javascript
复制
private void recoverErrorTransactions(List<Transaction> transactions) {
   for (Transaction transaction : transactions) {
       // 超过最大重试次数
       if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
           logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
           continue;
       }
       // 分支事务超过最大可重试时间
       if (transaction.getTransactionType().equals(TransactionType.BRANCH)
               && (transaction.getCreateTime().getTime() +
               transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
                       transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
               > System.currentTimeMillis())) {
           continue;
       }
       // Confirm / Cancel
       try {
           // 增加重试次数
           transaction.addRetriedCount();
           // Confirm
           if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
               transaction.changeStatus(TransactionStatus.CONFIRMING);
               transactionConfigurator.getTransactionRepository().update(transaction);
               transaction.commit();
               transactionConfigurator.getTransactionRepository().delete(transaction);
           // Cancel
           } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
                   || transaction.getTransactionType().equals(TransactionType.ROOT)) { // 处理延迟取消的情况
               transaction.changeStatus(TransactionStatus.CANCELLING);
               transactionConfigurator.getTransactionRepository().update(transaction);
               transaction.rollback();
               transactionConfigurator.getTransactionRepository().delete(transaction);
           }
       } catch (Throwable throwable) {
           if (throwable instanceof OptimisticLockException
                   || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
               logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
           } else {
               logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
           }
       }
   }
}
  • 当单个事务超过最大重试次数时,不再重试,只打印异常,此时需要人工介入解决。可以接入 ELK 收集日志监控报警。
  • 分支事务超过最大可重试时间时,不再重试。可能有同学和我一开始理解的是相同的,实际分支事务对应的应用服务器也可以重试分支事务,不是必须根事务发起重试,从而一起重试分支事务。这点要注意下。
  • 当事务处于 TransactionStatus.CONFIRMING 状态时,提交事务,逻辑和 TransactionManager#commit() 类似。
  • 当事务处于 TransactionStatus.CONFIRMING 状态,或者事务类型为根事务,回滚事务,逻辑和 TransactionManager#rollback() 类似。这里加判断的事务类型为根事务,用于处理延迟回滚异常的事务的回滚。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 事务重试配置
  • 3. 事务重试定时任务
  • 4. 异常事务恢复
    • 4.1 加载异常事务集合
      • 4.2 恢复异常事务集合
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档