前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Java学习笔记-微服务(9)-终-分布式事务Seata

Java学习笔记-微服务(9)-终-分布式事务Seata

原创
作者头像
咸鱼程序员
修改2025-03-09 11:06:29
修改2025-03-09 11:06:29
180
举报

Seata

在微服务中,一定存在多个数据库,那么多个数据库间如何处理分布式事务?对于分布式事务问题,有哪些解决方案?

一次业务操作需要跨多个数据源或系统进行远程调用时,就会产生分布式事务问题。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就需要通过其他技术手段来解决事务问题。

单体应用被拆分为微服务应用,原来的多个模块被拆分为多个应用,就有多个数据源。此时每个服务内部的数据一致性由本地事务来保证,全局的数据一致性需要通过其他方案解决。

Seata(incubating) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

学习网址:Apache Seata

本地事务解决采用 @Transactional ,全局事务采用 @GlobalTransational。

扩展问题:@Transactional 失效的场景

Seata 工作流程

Seata 1+3 事务控制:

XID:XID 是全局事务的唯一标识,它可以在服务的调用链路中传递,绑定到服务的事务上下文中。

TC Transaction Coordinator:事务协调者(Seata 本体,负责维护全局事务和分支事务的状态,驱动全局事务提交或回滚)

TM Transaction Manager:事务管理者(标注了 @GlobalTransational 注解的微服务模块,事务的发起者,负责定义全局服务的范围,并根据 TC 维护的全局事务和分支事务的状态做出操作决议)

RM Resource Maganer:资源管理器(数据库本体,可以是多个 RM,负责管理分支事务上的资源,向 TC 注册分支事务,汇报事务状态,驱动分支事务的提交和回滚)

TC 以服务器形式独立部署,TM 和 RM 以 Seata Client 的形式集成在微服务中。

运行流程:

TM 向 TC 申请开启一个全局事务,全局事务创建成功后生成一个全局唯一的 XID。

XID 在微服务调用链路的上下文中传播。

RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。

TM 向 TC 发起针对 XID 的全局提交或回滚决议。

TC 调度 XID 下管辖的全部分支事务,完成提交或回滚请求。

安装 Seata

官网下载

在数据库中新建一个库,名为 seata。

在 seata github 官网,script-server-db 下存放着各数据库的建表脚本,本次采用 MySQL 建表脚本。在 seata 库中运行 mysql.sql 中的初始化脚本。

备份 seata 文件夹 conf 文件夹内的 application.yml 文件。

参考 application.example.yml 模板配置中的样例配置,调整 application.yml 文件。

调整配置时务必参考最新 example 配置。

调整后文件内容如下

代码语言:yml
复制
#  Copyright 1999-2019 Seata.io Group.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${log.home:${user.home}/logs/seata}
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata
seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace:
      group: SEATA_GROUP
      username:
      password:
      context-path:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key:
      #secret-key:
      data-id: seataServer.properties
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace:
      cluster: default
      username:
      password:
      context-path:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key:
      #secret-key:
  store:
    # support: file 、 db 、 redis 、 raft
    mode: db
    db:
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
      user: root
      password: 000000
      min-conn: 10
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 1000
      max-wait: 5000
  #  server:
  #    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**

配置完成后保存,启动 Nacos,启动 Seata。Seata 的启动方式是 bin 文件夹内的 seata-server.bat。

访问端口 127.0.0.1:7091,通过配置的用户名和密码,可以正确登录。在 Nacos 中也能看到 Seata 服务。

Seata 使用

以官网所展示的用户购买商品的 demo 用例为基础进行拓展,我们创建三个服务:订单、库存、账户。

当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户的余额,再在订单服务中修改订单状态为已完成。该操作跨域三个数据库,有两次远程调用,所以一定会存在分布式事务问题。

以下示例均为 Seata 的 AT 模式

在数据库中,创建三个服务对应的数据库

代码语言:sql
复制
create database seata_order;
create database seata_storage;
create database seata_account;

在三个库中,分别建立 client 对应的日志回滚表(来源:github script/client/at/db/mysql.sql)、分别添加对应的业务表:

代码语言:sql
复制
-- seata_order、seata_account、seata_storage

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

-- seata_order.t_order definition

CREATE TABLE `t_order` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) DEFAULT NULL,
  `product_id` bigint(11) DEFAULT NULL,
  `count` int(11) DEFAULT NULL,
  `money` decimal(10,0) DEFAULT NULL,
  `status` int(1) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- seata_account.t_account definition

CREATE TABLE `t_account` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) DEFAULT NULL,
  `total` decimal(10,0) DEFAULT NULL,
  `used` decimal(10,0) DEFAULT NULL,
  `residue` decimal(10,0) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO seata_account.t_account
(id, user_id, total, used, residue)
VALUES(1, 1, 1000, 0, 1000);

-- seata_storage.t_storage definition

CREATE TABLE `t_storage` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) DEFAULT NULL,
  `total` int(11) DEFAULT NULL,
  `used` int(11) DEFAULT NULL,
  `residue` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO seata_storage.t_storage
(id, product_id, total, used, residue)
VALUES(1, 1, 100, 0, 100);

在 api 模块,新增扣减账户余额 Api 和 扣减库存 Api:

代码语言:java
复制
@Component
@FeignClient(value = "seata-account-service")
public interface AccountFeignApi {

    /**
     * 扣减账户余额
     */
    @PostMapping("account/decrease")
    ResultVO decrease(@RequestParam("userId") Long userId, @RequestParam("money") Integer money);
}
代码语言:java
复制
@Component
@FeignClient(value = "seata-storage-service")
public interface StorageFeignApi {

    /**
     * 扣减库存
     * @param productId
     * @param count
     * @return
     */
    @PostMapping("storage/decrease")
    ResultVO decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}

新增订单模块,添加依赖

代码语言:xml
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.example.cloud</groupId>
        <artifactId>SpringCloud2024</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>seata-order-service2001</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!--nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--alibaba-seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        <!--openfeign-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--loadbalancer-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>
        <!--cloud-api-->
        <dependency>
            <groupId>org.atguigu.cloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!--web+actuator-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>
        <!--mybasisplus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <!--mysql8-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--persistence-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
        </dependency>
        <!--mapper4-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper</artifactId>
        </dependency>
    </dependencies>
</project>

新建 application.yml

代码语言:yml
复制
server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848         #Nacos服务注册中心地址
  # ==========applicationName + druid-mysql8 driver===================
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_order?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
    username: root
    password: 000000
# mybatis-plus
mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    map-underscore-to-camel-case: true
# ========================seata===================
seata:
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: ""
      group: SEATA_GROUP
      application: seata-server
  tx-service-group: default_tx_group # 事务组,由它获得TC服务的集群名称
  service:
    vgroup-mapping: # 点击源码分析
      default_tx_group: default # 事务组与TC服务集群的映射关系
  data-source-proxy-mode: AT

logging:
  level:
    io:
      seata: info

新建业务类

代码语言:java
复制
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.atguigu.cloud.api.AccountFeignApi;
import org.atguigu.cloud.api.StorageFeignApi;
import org.atguigu.cloud.entities.TOrder;
import org.atguigu.cloud.mapper.TOrderMapper;
import org.atguigu.cloud.service.TOrderService;
import org.springframework.stereotype.Service;


/**
* @author salt fish
* @description 针对表【t_order】的数据库操作Service实现
*/
@Service
@Slf4j
public class TOrderServiceImpl extends ServiceImpl<TOrderMapper, TOrder>
    implements TOrderService{
    @Resource
    private StorageFeignApi storageFeignApi;
    @Resource
    private AccountFeignApi accountFeignApi;
    @Resource
    private TOrderMapper orderMapper;

    @Override
    public void create(TOrder order) {
        //xid全局事务id的检查,重要
        String xid = RootContext.getXID();
        //1 新建订单
        log.info("---------------开始新建订单: "+"\t"+"xid: "+xid);
        //订单新建时默认初始订单状态是零
        order.setStatus(0);
        int result = orderMapper.insert(order);
        // int result = orderMapper.insertSelective(order);
        // 插入订单成功后获得插入mysql的实体对象
        TOrder orderFromDB = null;
        if(result > 0)
        {
            // 从mysql里面查出刚插入的记录
            orderFromDB = orderMapper.selectOne(new QueryWrapper<TOrder>(order));
            // orderFromDB = orderMapper.selectOne(order);
            log.info("-----> 新建订单成功,orderFromDB info: "+orderFromDB);
            System.out.println();
            //2 扣减库存
            log.info("-------> 订单微服务开始调用Storage库存,做扣减count");
            storageFeignApi.decrease(orderFromDB.getProductId(),orderFromDB.getCount());
            log.info("-------> 订单微服务结束调用Storage库存,做扣减完成");
            System.out.println();

            //3 扣减账户余额
            log.info("-------> 订单微服务开始调用Account账号,做扣减money");
            accountFeignApi.decrease(orderFromDB.getUserId(),orderFromDB.getMoney());
            log.info("-------> 订单微服务结束调用Account账号,做扣减完成");
            System.out.println();

            //4 修改订单状态
            //将订单状态从零修改为1,表示已经完成
            log.info("-------> 修改订单状态");
            orderFromDB.setStatus(1);
            // mybatis 条件拼接
//            Example whereCondition = new Example(TOrder.class);
//            Example.Criteria criteria = whereCondition.createCriteria();
//            criteria.andEqualTo("userId",orderFromDB.getUserId());
//            criteria.andEqualTo("status",0);
            // 使用mybatisplus更新订单状态
            int updateResult = orderMapper.updateById(orderFromDB);
            // int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);
            log.info("-------> 修改订单状态完成"+"\t"+updateResult);
            log.info("-------> orderFromDB info: "+orderFromDB);
        }
        System.out.println();
        log.info("---------------结束新建订单: "+"\t"+"xid: "+xid);
    }
}

Account 与 Storage :

与前文 Api 模块中添加的接口和主业务实现保持一致,完成对表数据的增删改查(可以自行修改,代码仅作展示)。

注意,此时是未添加全局事务的场景。

启动三个模块,对主业务 2001 模块进行测试,此时有可能会报错,提示请求不通过。此时,我们检查 Springboot 与 SpringCloud 的版本,查看 Seata 是否支持。若版本不匹配,则降低 boot 与 cloud 的版本直到与 Seata 匹配(前文遇到过的问题,OpenFeign 集成 Sentinel)。

再次进行测试,结果发现三个模块能够正常运行,数据能够正常更新。此时,我们手动的向 2002 与 2003 模块(Account 与 Storage)在返回前添加等待时间,时长为请求的 TIme Out 时长,此时可以发现,数据库数据发生变化,但主业务 2001 模块的数据在报错行之后的增删改未生效。

此时就出现了分布式事务问题,当用户下单时,金额和库存的扣减已生效,由于 Time Out 导致主业务应用未能收到正确返回,导致更新订单状态的操作未能正确执行。

我们在主业务 2001 模块(order)的 create 方法上添加 @GlobalTransational 注解,标识开启分布式事务

代码语言:java
复制
@GlobalTransational(name = "order-create", rollbackFor = Exception.class) // AT模式

重启 2001 模块,再次进行访问,当 2002、2003 模块中出现错误时,发现 2001 模块被正确回滚,数据库中不存在多余的错误数据。

此时,我们在 2001 模块中打入一个断点,断点位置在 2001 模块插入数据之后,请求其余模块之前。再次发起请求,当代码行至断点位置时,我们观察 2001 模块所在数据库的 undo_log 表,可以发现表中被插入了一条数据。当我们将断点位置移至远程调用的 2003 模块,我们可以发现 2003 模块数据库存在一条数据,该数据的插入即为 seata 全局事务的控制方式。

在这种断点进入但未通过的请款该我们查看 seata 页面,页面中能够观察到该次事务的 XID、transationId 事务全局 id、服务名、所在组、当前状态、请求时间等参数信息。当断点放开,请求通过后,再次查询发现事务成功,信息消失。

Seata 总结

Seata 的各事务模式

XA 模式:强一致性的两阶段提交协议,需要数据库支持XA接口,牺牲了一定的可用性,无业务侵入。

AT 模式:最终一致性的两阶段提交协议,通过自动补偿机制实现数据回滚,无业务侵入,也是 Seata 的默认模式。

TCC 模式:最终一致性的两阶段提交协议,需要业务实现 Try、Confirm 和 Cancel 三个操作,有业务侵入,灵活度高。

SAGA 模式:长事务模式,通过状态机编排或者注解方式实现业务逻辑,需要业务实现正向和反向两个操作,有业务侵入。

AT 模式如何做到对业务的无侵入

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

Seata 会拦截业务 SQL,并解析语义。在业务数据更新前将其保存为 "Before Image"。在业务数据更新后,将其更新为 "After Image",生成行锁。

二阶段:提交异步化,回滚通过一阶段日志进行反向补偿。

正常情况时,请求顺利结束,Seata 只需要将一阶段保存的快照数据和行锁删除,完成数据清理。

若发生异常情况,Seata 需要还原一阶段执行的业务 SQL,方式是使用 "Before Image"。还原前校验脏写,对比数据库当前业务数据与 "After Image" 是否一致,一致直接还原,不一致则表示存在脏写,需要人工处理。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Seata
    • Seata 工作流程
    • 安装 Seata
    • Seata 使用
    • Seata 总结
      • Seata 的各事务模式
      • AT 模式如何做到对业务的无侵入
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档