前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Seata的AT模式深入理解

Seata的AT模式深入理解

作者头像
路行的亚洲
发布2024-06-17 15:43:55
660
发布2024-06-17 15:43:55
举报
文章被收录于专栏:后端技术学习后端技术学习

一、Seata的实现思路

如果要你实现一个简单的Seata的AT模式,你会如何实现?

1.需要对数据源扩展,为什么?

因为数据源是事务提交和回滚的关键,只有对它进行扩展,才有后面的增强。 2.如何增强?

一个简单的做法,也是Spring常用的手法,采用Aop,进行前后逻辑的增强操作,同时进行手动提交事务。

3.增强的过程中,如何对数据进行回滚和提交的操作?

因为我们的做法需要在事务提交或者回滚前,实现对各个分支事务的提交或者回滚,因此不可避免,这个操作,必须可逆,因此需要保留好事务提交或者回滚前的日志操作,类似mysql的undo/redo log惯用的手法。

4.如何执行统一的提交和回滚?

进行提交情况的上报,进行最终的提交和回滚,完成上面的操作,也即所有操作都成功或者失败。

5.如果提交成功,需要删除相关日志。如果失败,则需要对相关分支执行回滚操作。

6.如何保证上面的效率更加高效性?

为了seata的交互的高效性,所有的交互操作都基于Netty的事件驱动完成。

下面的操作都是基于Seata的官方demo的debug和Seata官网学习完成。

二、Seata中数据源的增强

Seata的AT模式的理解可以从Seata的demo中找到答案。seata经过SeataAutoDataSourceProxyCreator是实现Aop数据源增强的关键步骤。

可以看到经过wrapIfNecessary会进行代理的构建,也即

代码语言:javascript
复制
 SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);

创建新的数据源代理,从构造函数中,可以看到分为XA和AT模式。由于我们关注的是AT模式,下面我们来看AT模式:

代码语言:javascript
复制
 public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        // 初始化操作
        init(targetDataSource, resourceGroupId);
    }

从上面的代码,我们可以看到这里会先判断数据源是否为Seata数据源代理,如果不是,则执行数据源代理的转换。同时初始化数据源,注册数据源RM信息到Netty中,同时设置分支类型AT模式。

三、一阶段操作

代码语言:javascript
复制
 private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            // 执行提交操作
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

执行本地事务提交会判断是否处在全局事务中,如果是,则执行处理全局事务提交,如果是在全局锁中,则执行处理本地提交带全局锁。否则执行不增强的提交。由于前期,我们属于分支事务,因此我们会执行分支事务提交操作processGlobalTransactionCommit。

而在这个过程中,我们可以看到分支事务提交的几个阶段:

注册分支事务、刷新undo log日志、上报分支执行情况

代码语言:javascript
复制
 private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 此时完成本地事务的提交
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        // 进行本地事务提交情况的上报
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

而我们关心的前后镜像生成又是在哪里呢?而在生成前后镜像前,我们需要判断当前执行的sql是哪种类型的执行器,才能放心生成前后镜像执行业务系统的提交操作。

我们会先经过执行模板

代码语言:javascript
复制
   switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case DELETE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case SELECT_FOR_UPDATE:
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor =
                                        new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor =
                                        new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    case UPDATE_JOIN:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.MARIADB:
                                executor = new MariadbUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            case JdbcConstants.POLARDBX:
                                executor = new PolarDBXUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;

生成对应的数据执行器,然后执行生成镜像的操作。

代码语言:javascript
复制
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        try {
            // 这里会生成前镜像和后镜像,在执行业务操作之前
            TableRecords beforeImage = beforeImage();
            // 执行业务操作
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            // 准备undolog信息
            prepareUndoLog(beforeImage, afterImage);
            return result;
        } catch (TableMetaException e) {
            LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
                e.getTableName(), e.getColumnName());
            statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
            throw e;
        }
    }

完成这个操作之后之后,会执行最终分支事务的完成操作,此时操作的是mysql的原生操作。

四、二阶段操作

完成提交/回滚操作之后,进行上报动作。完成上报之后,执行二阶段提交/回滚操作。也即branchCommit或者branchRollback操作。也即二阶段的关键在branchCommit或者branchRollback上。

二阶段提交操作:思路是快速响应成功,然后异步删除日志。

二阶段回滚操作:则根据对应的日志,进行恢复sql执行操作。

我理解:生成sql和回滚的这个思路和mysql的操作是类似的。

参考资料:

https://github.com/apache/incubator-seata

https://seata.apache.org

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Seata的实现思路
  • 二、Seata中数据源的增强
  • 三、一阶段操作
  • 四、二阶段操作
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档