Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >seata TM源码分析

seata TM源码分析

作者头像
luoxn28
发布于 2021-01-28 07:56:39
发布于 2021-01-28 07:56:39
1.3K00
代码可运行
举报
文章被收录于专栏:TopCoderTopCoder
运行总次数:0
代码可运行

seata 定义 3 个组件来协调分布式事务的处理过程:

  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

下面就一起来看下TM模块的实现原理,TM模块是seata中全局事务发起者和掌控者,其核心逻辑有:业务逻辑切面代理:对全局事务注册/提交操作。启动netty客户端:会启动TM/RM客户端与TC通信。数据源切面代理:SQL解析、分支事务注册/提交、undolog保存、分支事务状态上报。Rpc代理:在RPC流程中传递seata上下文(xid等,非本文分析重点)。

TM侧的大致执行流程如下所示,下面就按照上述的几个核心逻辑依次进行分析:

业务逻辑代理

TM中业务逻辑一般都是从注解 @GlobalTransactional 开始,比如seata-samples示例中BusinessService业务逻辑就是从该注解开始:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
    LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
    storageService.deduct(commodityCode, orderCount);
    orderService.create(userId, commodityCode, orderCount);
    //throw new RuntimeException("xxx");
}

既然从注解@GlobalTransactional开始,肯定是在spring容器启动过程中针对该注解修饰的方法进行切面代理。

seata中有一个自动配置类SeataAutoConfiguration,其内部会创建bean对象GlobalTransactionScannerSeataAutoDataSourceProxyCreator,二者都继承了类AbstractAutoProxyCreator(spring中类,该类通过BeanPostProcessor扩展的方式,使得bean在创建过程中完成被代理,回调方法wrapIfNecessary),前者就是针对业务逻辑代理,后者是针对sql操作代理。

spring启动流程中会回调GlobalTransactionScanner的方法wrapIfNecessary,该方法会对注解 @GlobalTransactional 和 @GlobalLock 修饰的方法做代理操作,对应的代理类为GlobalTransactionalInterceptor,源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    interceptor = null;
    //check TCC proxy
    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
        //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
        interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
    } else {
        Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
        Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

        // 只代理@GlobalTransactional 和 @GlobalLock修饰的逻辑
        if (!existsAnnotation(new Class[]{serviceInterface})
            && !existsAnnotation(interfacesIfJdk)) {
            return bean;
        }

        if (interceptor == null) {
            if (globalTransactionalInterceptor == null) {
                // AT模式的业务逻辑代理类 GlobalTransactionalInterceptor
                globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                ConfigurationCache.addConfigListener(
                    ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
            }
            interceptor = globalTransactionalInterceptor;
        }
    }
    // ...
}

AT模式的业务逻辑代理类 GlobalTransactionalInterceptor,其核心业务代理逻辑为(GlobalTransactional代理类型):初始化分布式事务所需资源向TC发起开启全局分布式事务请求开始执行业务逻辑成功处理后进行全局事务的提交异常时进行全局事务回滚,核心业务逻辑如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// GlobalTransactionalInterceptor#invoke -> TransactionalTemplate
public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    GlobalTransaction tx = GlobalTransactionContext.createNew();

    // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
    //    开启全局事务
    beginTransaction(txInfo, tx);

    try {
        // Do Your Business
        rs = business.execute();
    } catch (Throwable ex) {
        // 3. 事务回滚
        completeTransactionAfterThrowing(txInfo, tx, ex);
        throw ex;
    }

    // 4. 事务提交
    commitTransaction(tx);
    return rs;

既然知道了业务代理流程是如何实现的,接下来看下netty启动流程。

启动netty客户端

GlobalTransactionScanner实现了InitializingBean,其afterPropertiesSet方法中会执行netty客户端初始化工作,逻辑如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void initClient() {
    //初始化TM
    TMClient.init(applicationId, txServiceGroup);
    ...
    //初始化RM
    RMClient.init(applicationId, txServiceGroup);
    ... 
    // 注册Spring shutdown的回调,用来释放资源
    registerSpringShutdownHook();
 }

注意,TM侧为什么还会初始化RM呢,简单来讲可以理解TM是业务代理逻辑,主要实现了开启/提交全局分布式事务逻辑;TM是资源层代理逻辑,主要实现sql解析/分支事务注册上报等逻辑。一个服务中可能存在既开启了业务代理,也有对应的DB操作,因此是需要初始化RM的。

TM客户端类TmNettyRemotingClient,首先注册一些处理类,主要是针对TC返回结果的处理和心跳处理,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void registerProcessor() {
    // 1.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    // 2.registry heartbeat message processor
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

TmNettyRemotingClient的init主要是初始化一个定时任务,然后就启动netty client:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void init() {
    // 定时重连任务
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

    // 请求future定时器
    super.init();
    // 标准的netty client初始化
    clientBootstrap.start();
}

启动netty client之后,TM和TC建立的连接channel会被添加到netty cilent管理中(Linux下底层基于epoll),当接收到TC响应结果或TC主动发送结果后,就会触发对应的处理器逻辑,也就是在方法registerProcessor中注册的各种处理器。

数据源切面代理

seata的数据源切面代理对应SeataAutoDataSourceProxyCreator类,其会初始化sql代理处理器为SeataAutoDataSourceProxyAdvice,如果是AT模式,会设置DataSourceProxy代理:

和DataSourceProxy相关的有多种类型的代理类,如下:

下面就以常见的update流程来分析下具体的sql代理执行流程,需要从类PreparedStatementProxy的方法execute开始,其最后会调用方法io.seata.rm.datasource.exec.ExecuteTemplate#execute()

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
    switch (sqlRecognizer.getSQLType()) {
        case INSERT:
            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
            break;
        case UPDATE: // 更新操作
            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
            break;
        case DELETE:
            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
            break;
        case SELECT_FOR_UPDATE:
            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
            break;
        default:
            executor = new PlainExecutor<>(statementProxy, statementCallback);
            break;
    }

    return executor.execute(args);
}

UpdateExecutor的处理逻辑,最后会走到方法io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue中,这里首先进行获取前镜像、执行业务sql、获取后镜像,构建undolog数据,然后进行分支事务注册,最后进行写入undolog和事务提交操作,最后上报分支事务状态。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.setAutoCommit(false);
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            // 前处理
            T result = executeAutoCommitFalse(args);
            // 后处理
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

protected T executeAutoCommitFalse(Object[] args) throws Exception {
       // 获取前镜像、执行业务sql、获取后镜像
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

// connectionProxy.commit() --> 最后的处理逻辑
private void processGlobalTransactionCommit() throws SQLException {
    register(); // 分支事务注册

    try {
        // undo log 写入 & 事务提交
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        report(false); // 上报事务状态
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true); // 上报事务状态
    }
    context.reset();
}

注意:分支事务的注册涉及到携带lockKeys,在TC会针对lockKeys进行全局加锁操作,这些锁资源在全局事务提交或者回滚时候才会清除,这样可以在当前全局事务还未执行完成时阻塞另一个分布式事务针对同样的锁资源的加锁操作。针对update操作,使用的是后镜像涉及到的所有记录的主键id信息,lockKeys的构建在方法prepareUndoLog中完成。至此sql代理的核心流程已分析完毕。

seata这里使用上可能存在这样的问题,比如服务A使用seata方式更新DB,另一个服务B没有通过seata方式而是直接更新DB,这种是不建议的方式,此时服务B可通过加@GlobalLock方式来进行更新操作。该问题就是seata这种在业务层实现分布式事务存在的潜在问题,直接基于DB实现的分布式事务就不存在该问题,比如XA。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
网站地图制作有什么好处?Sitemap地图如何制作?
网站地图又叫站点地图,我们可以理解为一个包含网站中所有页面链接的容器,主要是帮助搜索引擎快速抓取网站的中的所有页面。很多新人可能对地图了解还不够深入,今天小编详细讲下网站地图制作的好处,还有Sitemap地图如何制作?
很酷的站长
2023/11/20
1.5K0
网站地图制作有什么好处?Sitemap地图如何制作?
sitemap网站地图介绍及在线生成器
网站地图是一种格式化的XML文件(sitemap.xml)、HTML文件(sitemap.html)、TXT文件(sitemap.txt)或RSS文件(rss.xml),这是重要的站长工具。 它用于把一个网站的所有(或大多数)网址集中在一个按一定的标准组织的地图文件中。以便于第三方(比如搜索引擎)、新闻订阅软件使用和更新。 目前的网站地图最流行的方式是使用sitemap.xml格式,它被主流搜索引擎(比如:百度、Google、Bing等)所使用。当网站有新网页时,把新的页面的URL地址更新到网站地图有利于搜索引擎的即时抓取。 因此网站地图应该是一个随时保持更新的用于表达一个站点全部网页的文件。
梦溪
2021/08/09
10K0
网站地图有什么用
作为站长,作为以站点SEO为中心的站长,制作站点后开始排名,想尽快获得搜索引擎的排名,其中有很多重要的地方,今天就来谈谈站点地图(sitemap),为什么要制作站点地图接下来,小编将分析网站地图在网站SEO优化中的作用! 网站地图(sitemap)有什么作用? 网站地图又称为站点地图,它虽然就是一个页面,但是上面放置了网站里面的所有页面的链接,网站地图就是根据网站的结构、框架、内容,生成的导航网页文件。 可以说搜索引擎蜘蛛非常喜欢网站地图,现在搜索引擎都有一个提交地图的功能,这就说明了搜索引擎对sitemap的重视。
俗人啊
2021/12/17
4.9K0
网站地图有什么用
新网站 Robots 和 SiteMap 优化
robots.txt是网站管理者写给爬虫的一封信,里面描述了网站管理者不希望爬虫做的事,比如:
Parker
2020/07/22
4K0
新网站 Robots 和 SiteMap 优化
帝国插件添加网站地图(sitemap)与代码添加网站地图教程
Sitemap就是我们站长所说的网站地图,他包含网站中所以的URL链接,这样可以方便搜索引擎识别快捷的抓取和发现你网站中的链接,也就是你说所的URL,从而提高搜索引擎的抓取效率,提高你网站的收录量。在实际中我们最常见的有两种网站地图文件格式:sitemap.xml,sitemap.html,以及还有 sitemap.txt,sitemap.gz 等多中格式,还有给普通的html格式的地图。
夏末浅笑
2020/04/22
2.7K0
帝国插件添加网站地图(sitemap)与代码添加网站地图教程
如何使用Google XML Sitemaps插件生成网站Sitemap网站地图?
如果您尚未设置Sitemap站点地图,请立即进行设置。不仅因为百度搜索推送需要到Sitemap,而大部分搜索引擎都认可sitemap的标准,能够在一定程度上提升搜索引擎发现你的网站的新内容。
开心分享
2020/08/05
2.7K0
如何使用Google XML Sitemaps插件生成网站Sitemap网站地图?
网站导航设计与站内链接优化汇总
网页导航表现为网页的栏目菜单设置、辅助菜单、其他在线帮助等形式。网页导航设置是在网页栏目结构的基础上,进一步为用户浏览网页提供的提示系统,由于各个网页设计并没有统一的标准,不仅菜单设置各不相同,打开网页的方式也有区别,有些是在同一窗口打开新网页,有些在新打开一个浏览器窗口。
茹莱神兽
2022/02/01
1.5K0
网站导航设计与站内链接优化汇总
为什么大型站点要建立网站地图?
有很多大型网站不重视网站地图的建设,不少大型网站的网站地图只是敷衍了事,做一个摆设。其实网站对于大型网站是很重要的,大型网站海量的数据、复杂的网站导航结构、极快的更新频率使得搜索引擎并不能完全抓取所有的网页。
茹莱神兽
2020/09/07
1.8K0
为什么大型站点要建立网站地图?
如何更好地美化Django网站的Sitemap站点地图?
一般在Web网站开发完成之际,如果对搜索引擎优化(SEO)有一定的要求,我们都会为网站添加一个站点地图sitemap,配合robot.txt的使用,以汇总和索引网站上所有允许被搜索引擎搜索、采集和索引的网页,这样搜索引擎可以根据站点地图快速地爬取到一个网站上的所有希望被收录的网址。
州的先生
2020/03/19
1.6K0
如何更好地美化Django网站的Sitemap站点地图?
seo专项优化解决网站收录问题-所有网站通用
什么是seo,即为搜索引擎优化,目的是为了让网站做到更好的收录量,以及排名和提升流量,一个网站单有页面是不够的,必须去很好贴合搜索引擎做好规则,才能在各大搜索引擎取得很好的排名以及收录量,网站搜索引擎优化任务主要是认识与了解其它搜索引擎怎样紧抓网页、怎样索引、怎样确定搜索关键词等相关技术后,以此优化本网页内容,确保其能够与用户浏览习惯相符合,这样,你的网站获得展现量将会有着很大的提升。本人在seo领域摸爬滚打了很多年,总结了一些优化的方法,分享给你们,对你有帮助的话,记得收藏本站哦。
科技怪物君
2021/08/10
7720
seo专项优化解决网站收录问题-所有网站通用
SEO优化实战
IMWeb前端团队
2018/01/08
1.7K0
SEO优化实战
了解sitemap(站点地图)和如何判定你的网站是否需要提交站点地图
一个网站地图是你提供有关的网页,视频和网站上的其他文件,以及它们之间的关系信息的文件。像Google这样的搜索引擎会读取此文件,以更智能地抓取您的网站。站点地图会告诉Google您认为哪些页面和文件对您的网站很重要,并提供有关这些文件的有价值的信息:例如,对于页面,上次更新页面的时间,更改页面的频率以及任何其他语言版本页面。
海拥
2021/08/23
1.9K0
建站SEO优化之站点地图sitemap
站点地图通常在 robots.txt 文件中声明,具体可看之前的文章(一文搞懂SEO优化之站点robots.txt)
村头的猫
2025/06/11
1070
建站SEO优化之站点地图sitemap
Django如何使用sitemap实现网站地图
网站地图是一个网站里所有链接的集合,搜索引擎可以根据网站地图很轻松的抓取你sitemap里面记录的网址,所以把网站地图提交给搜索引擎,让其录入你的内容,是提高自己网站流量很重要的一个手段,尤其是对于新建网站,网站地图是SEO必要的手段,下面就简单介绍下Django项目如何快速生成网站地图sitemap
极简小课
2022/06/21
1.9K0
为什么要做网站地图?
网站地图,又称站点地图,它便是一个页面,上面放置了网站上一切页面的链接。大多数人在网站上找不到自己所需求的信息时,可能会将网站地图作为一种补救措施。搜索引擎蜘蛛非常喜爱网站地图。接下来经过这篇文章为你详细介绍网站地图对SEO优化有什么主要功能。
申霖
2019/12/27
8160
为什么要做网站地图?
搜索引擎排名技术,引爆网站流量,你也可以做到 第一课
对于进行关键词排名,没有固定的模式,仅仅是基于传统经验之上慢慢摸索出来的一条道路,通过网站的一些设置让搜索引擎觉得网站更友好,提升搜索引擎蜘蛛停留时间,增加收录。
做全栈攻城狮
2018/12/20
1.3K0
跨境电商网站做Google SEO的5个要点!
跨境电商网站的结构优化是非常重要的,一定要易于Google蜘蛛抓取,又便于用户浏览,这是为访客提供优质的用户体验不可或缺的内容,一个跨境电商网站,肯定会有大量的页面,那么一个有规则条理的链接结构,会让你的Google SEO优化更顺利,也能让你的业务走的更远。
一尘SEO
2020/07/31
1.4K0
跨境电商网站做Google SEO的5个要点!
wordpress站点到底要不要做sitemap网站地图,为什么?
一般来说,几乎所有的网站都需要网站地图的,这个能一方面来说方便搜索引擎的收录和抓取,一方面一些用户可能也会看站点地图了解网站内容和结构的,所以一般来说的话站点地图还是有一定的必要性的。
wordpress建站吧
2019/06/19
1.2K0
SEO杂谈(2)
RP道貌不岸然
2017/11/23
6090
wordpress站点到底要不要做sitemap网站地图,为什么?
一般来说,几乎所有的网站都需要网站地图的,这个能一方面来说方便搜索引擎的收录和抓取,一方面一些用户可能也会看站点地图了解网站内容和结构的,所以一般来说的话站点地图还是有一定的必要性的。
wordpress建站吧
2019/06/19
9280
推荐阅读
相关推荐
网站地图制作有什么好处?Sitemap地图如何制作?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验