在微服务中,一定存在多个数据库,那么多个数据库间如何处理分布式事务?对于分布式事务问题,有哪些解决方案?
一次业务操作需要跨多个数据源或系统进行远程调用时,就会产生分布式事务问题。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就需要通过其他技术手段来解决事务问题。
单体应用被拆分为微服务应用,原来的多个模块被拆分为多个应用,就有多个数据源。此时每个服务内部的数据一致性由本地事务来保证,全局的数据一致性需要通过其他方案解决。
Seata(incubating) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
学习网址:Apache Seata
本地事务解决采用 @Transactional ,全局事务采用 @GlobalTransational。
扩展问题:@Transactional 失效的场景
Seata 1+3 事务控制:
XID:XID 是全局事务的唯一标识,它可以在服务的调用链路中传递,绑定到服务的事务上下文中。
TC Transaction Coordinator:事务协调者(Seata 本体,负责维护全局事务和分支事务的状态,驱动全局事务提交或回滚)
TM Transaction Manager:事务管理者(标注了 @GlobalTransational 注解的微服务模块,事务的发起者,负责定义全局服务的范围,并根据 TC 维护的全局事务和分支事务的状态做出操作决议)
RM Resource Maganer:资源管理器(数据库本体,可以是多个 RM,负责管理分支事务上的资源,向 TC 注册分支事务,汇报事务状态,驱动分支事务的提交和回滚)
TC 以服务器形式独立部署,TM 和 RM 以 Seata Client 的形式集成在微服务中。
运行流程:
TM 向 TC 申请开启一个全局事务,全局事务创建成功后生成一个全局唯一的 XID。
XID 在微服务调用链路的上下文中传播。
RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
TM 向 TC 发起针对 XID 的全局提交或回滚决议。
TC 调度 XID 下管辖的全部分支事务,完成提交或回滚请求。
官网下载
在数据库中新建一个库,名为 seata。
在 seata github 官网,script-server-db 下存放着各数据库的建表脚本,本次采用 MySQL 建表脚本。在 seata 库中运行 mysql.sql 中的初始化脚本。
备份 seata 文件夹 conf 文件夹内的 application.yml 文件。
参考 application.example.yml 模板配置中的样例配置,调整 application.yml 文件。
调整配置时务必参考最新 example 配置。
调整后文件内容如下
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
username:
password:
context-path:
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key:
#secret-key:
data-id: seataServer.properties
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
cluster: default
username:
password:
context-path:
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key:
#secret-key:
store:
# support: file 、 db 、 redis 、 raft
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
user: root
password: 000000
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**
配置完成后保存,启动 Nacos,启动 Seata。Seata 的启动方式是 bin 文件夹内的 seata-server.bat。
访问端口 127.0.0.1:7091,通过配置的用户名和密码,可以正确登录。在 Nacos 中也能看到 Seata 服务。
以官网所展示的用户购买商品的 demo 用例为基础进行拓展,我们创建三个服务:订单、库存、账户。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户的余额,再在订单服务中修改订单状态为已完成。该操作跨域三个数据库,有两次远程调用,所以一定会存在分布式事务问题。
以下示例均为 Seata 的 AT 模式
在数据库中,创建三个服务对应的数据库
create database seata_order;
create database seata_storage;
create database seata_account;
在三个库中,分别建立 client 对应的日志回滚表(来源:github script/client/at/db/mysql.sql)、分别添加对应的业务表:
-- seata_order、seata_account、seata_storage
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
-- seata_order.t_order definition
CREATE TABLE `t_order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) DEFAULT NULL,
`product_id` bigint(11) DEFAULT NULL,
`count` int(11) DEFAULT NULL,
`money` decimal(10,0) DEFAULT NULL,
`status` int(1) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- seata_account.t_account definition
CREATE TABLE `t_account` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) DEFAULT NULL,
`total` decimal(10,0) DEFAULT NULL,
`used` decimal(10,0) DEFAULT NULL,
`residue` decimal(10,0) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO seata_account.t_account
(id, user_id, total, used, residue)
VALUES(1, 1, 1000, 0, 1000);
-- seata_storage.t_storage definition
CREATE TABLE `t_storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`product_id` bigint(11) DEFAULT NULL,
`total` int(11) DEFAULT NULL,
`used` int(11) DEFAULT NULL,
`residue` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO seata_storage.t_storage
(id, product_id, total, used, residue)
VALUES(1, 1, 100, 0, 100);
在 api 模块,新增扣减账户余额 Api 和 扣减库存 Api:
@Component
@FeignClient(value = "seata-account-service")
public interface AccountFeignApi {
/**
* 扣减账户余额
*/
@PostMapping("account/decrease")
ResultVO decrease(@RequestParam("userId") Long userId, @RequestParam("money") Integer money);
}
@Component
@FeignClient(value = "seata-storage-service")
public interface StorageFeignApi {
/**
* 扣减库存
* @param productId
* @param count
* @return
*/
@PostMapping("storage/decrease")
ResultVO decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
新增订单模块,添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example.cloud</groupId>
<artifactId>SpringCloud2024</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>seata-order-service2001</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--alibaba-seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--loadbalancer-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!--cloud-api-->
<dependency>
<groupId>org.atguigu.cloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--web+actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<!--mybasisplus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!--mysql8-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
</dependency>
<!--mapper4-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
</dependency>
</dependencies>
</project>
新建 application.yml
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
nacos:
discovery:
server-addr: localhost:8848 #Nacos服务注册中心地址
# ==========applicationName + druid-mysql8 driver===================
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
username: root
password: 000000
# mybatis-plus
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
configuration:
map-underscore-to-camel-case: true
# ========================seata===================
seata:
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: ""
group: SEATA_GROUP
application: seata-server
tx-service-group: default_tx_group # 事务组,由它获得TC服务的集群名称
service:
vgroup-mapping: # 点击源码分析
default_tx_group: default # 事务组与TC服务集群的映射关系
data-source-proxy-mode: AT
logging:
level:
io:
seata: info
新建业务类
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.atguigu.cloud.api.AccountFeignApi;
import org.atguigu.cloud.api.StorageFeignApi;
import org.atguigu.cloud.entities.TOrder;
import org.atguigu.cloud.mapper.TOrderMapper;
import org.atguigu.cloud.service.TOrderService;
import org.springframework.stereotype.Service;
/**
* @author salt fish
* @description 针对表【t_order】的数据库操作Service实现
*/
@Service
@Slf4j
public class TOrderServiceImpl extends ServiceImpl<TOrderMapper, TOrder>
implements TOrderService{
@Resource
private StorageFeignApi storageFeignApi;
@Resource
private AccountFeignApi accountFeignApi;
@Resource
private TOrderMapper orderMapper;
@Override
public void create(TOrder order) {
//xid全局事务id的检查,重要
String xid = RootContext.getXID();
//1 新建订单
log.info("---------------开始新建订单: "+"\t"+"xid: "+xid);
//订单新建时默认初始订单状态是零
order.setStatus(0);
int result = orderMapper.insert(order);
// int result = orderMapper.insertSelective(order);
// 插入订单成功后获得插入mysql的实体对象
TOrder orderFromDB = null;
if(result > 0)
{
// 从mysql里面查出刚插入的记录
orderFromDB = orderMapper.selectOne(new QueryWrapper<TOrder>(order));
// orderFromDB = orderMapper.selectOne(order);
log.info("-----> 新建订单成功,orderFromDB info: "+orderFromDB);
System.out.println();
//2 扣减库存
log.info("-------> 订单微服务开始调用Storage库存,做扣减count");
storageFeignApi.decrease(orderFromDB.getProductId(),orderFromDB.getCount());
log.info("-------> 订单微服务结束调用Storage库存,做扣减完成");
System.out.println();
//3 扣减账户余额
log.info("-------> 订单微服务开始调用Account账号,做扣减money");
accountFeignApi.decrease(orderFromDB.getUserId(),orderFromDB.getMoney());
log.info("-------> 订单微服务结束调用Account账号,做扣减完成");
System.out.println();
//4 修改订单状态
//将订单状态从零修改为1,表示已经完成
log.info("-------> 修改订单状态");
orderFromDB.setStatus(1);
// mybatis 条件拼接
// Example whereCondition = new Example(TOrder.class);
// Example.Criteria criteria = whereCondition.createCriteria();
// criteria.andEqualTo("userId",orderFromDB.getUserId());
// criteria.andEqualTo("status",0);
// 使用mybatisplus更新订单状态
int updateResult = orderMapper.updateById(orderFromDB);
// int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);
log.info("-------> 修改订单状态完成"+"\t"+updateResult);
log.info("-------> orderFromDB info: "+orderFromDB);
}
System.out.println();
log.info("---------------结束新建订单: "+"\t"+"xid: "+xid);
}
}
Account 与 Storage :
与前文 Api 模块中添加的接口和主业务实现保持一致,完成对表数据的增删改查(可以自行修改,代码仅作展示)。
注意,此时是未添加全局事务的场景。
启动三个模块,对主业务 2001 模块进行测试,此时有可能会报错,提示请求不通过。此时,我们检查 Springboot 与 SpringCloud 的版本,查看 Seata 是否支持。若版本不匹配,则降低 boot 与 cloud 的版本直到与 Seata 匹配(前文遇到过的问题,OpenFeign 集成 Sentinel)。
再次进行测试,结果发现三个模块能够正常运行,数据能够正常更新。此时,我们手动的向 2002 与 2003 模块(Account 与 Storage)在返回前添加等待时间,时长为请求的 TIme Out 时长,此时可以发现,数据库数据发生变化,但主业务 2001 模块的数据在报错行之后的增删改未生效。
此时就出现了分布式事务问题,当用户下单时,金额和库存的扣减已生效,由于 Time Out 导致主业务应用未能收到正确返回,导致更新订单状态的操作未能正确执行。
我们在主业务 2001 模块(order)的 create 方法上添加 @GlobalTransational 注解,标识开启分布式事务
@GlobalTransational(name = "order-create", rollbackFor = Exception.class) // AT模式
重启 2001 模块,再次进行访问,当 2002、2003 模块中出现错误时,发现 2001 模块被正确回滚,数据库中不存在多余的错误数据。
此时,我们在 2001 模块中打入一个断点,断点位置在 2001 模块插入数据之后,请求其余模块之前。再次发起请求,当代码行至断点位置时,我们观察 2001 模块所在数据库的 undo_log 表,可以发现表中被插入了一条数据。当我们将断点位置移至远程调用的 2003 模块,我们可以发现 2003 模块数据库存在一条数据,该数据的插入即为 seata 全局事务的控制方式。
在这种断点进入但未通过的请款该我们查看 seata 页面,页面中能够观察到该次事务的 XID、transationId 事务全局 id、服务名、所在组、当前状态、请求时间等参数信息。当断点放开,请求通过后,再次查询发现事务成功,信息消失。
XA 模式:强一致性的两阶段提交协议,需要数据库支持XA接口,牺牲了一定的可用性,无业务侵入。
AT 模式:最终一致性的两阶段提交协议,通过自动补偿机制实现数据回滚,无业务侵入,也是 Seata 的默认模式。
TCC 模式:最终一致性的两阶段提交协议,需要业务实现 Try、Confirm 和 Cancel 三个操作,有业务侵入,灵活度高。
SAGA 模式:长事务模式,通过状态机编排或者注解方式实现业务逻辑,需要业务实现正向和反向两个操作,有业务侵入。
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
Seata 会拦截业务 SQL,并解析语义。在业务数据更新前将其保存为 "Before Image"。在业务数据更新后,将其更新为 "After Image",生成行锁。
二阶段:提交异步化,回滚通过一阶段日志进行反向补偿。
正常情况时,请求顺利结束,Seata 只需要将一阶段保存的快照数据和行锁删除,完成数据清理。
若发生异常情况,Seata 需要还原一阶段执行的业务 SQL,方式是使用 "Before Image"。还原前校验脏写,对比数据库当前业务数据与 "After Image" 是否一致,一致直接还原,不一致则表示存在脏写,需要人工处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。