Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。对于Seata不太了解的朋友,可以看下我之前写的文章: 微服务开发的最大痛点-分布式事务SEATA入门简介。
AT模式下,每个数据库被当做是一个Resource,Seata 里称为 DataSource Resource。业务通过 JDBC 标准接口访问数据库资源时,Seata 框架会对所有请求进行拦截,做一些操作。
每个本地事务提交时,Seata RM(Resource Manager,资源管理器) 都会向 TC(Transaction Coordinator,事务协调器) 注册一个分支事务。当请求链路调用完成后,发起方通知 TC 提交或回滚分布式事务,进入二阶段调用流程。此时,TC 会根据之前注册的分支事务回调到对应参与者去执行对应资源的第二阶段。
TC 是怎么找到分支事务与资源的对应关系呢?每个资源都有一个全局唯一的资源 ID,并且在初始化时用该 ID 向 TC 注册资源。在运行时,每个分支事务的注册都会带上其资源 ID。这样 TC 就能在二阶段调用时正确找到对应的资源。这就是我们的 AT 模式。简单总结一下,就是把每个数据库当做一个 Resource,在本地事务提交时会去注册一个分支事务。
AT模式是一种无侵入的分布式事务解决方案。在AT模式下,用户只需关注自己的"业务SQL",用户的"业务SQL"作为第一阶段,Seata框架会自动生成事务的二阶段提交和回滚操作。
在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务 SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。
当然官网对AT模式也进行了细致的讲解, 大家可以看下Seata官网的Seata AT模式。
docker-compose.yaml:
version: '3'
services:
zookeeper:
image: zookeeper
ports:
- 2181:2181
admin:
image: apache/dubbo-admin
depends_on:
- zookeeper
ports:
- 8080:8080
environment:
- admin.registry.address=zookeeper://zookeeper:2181
- admin.config-center=zookeeper://zookeeper:2181
- admin.metadata-report.address=zookeeper://zookeeper:2181
docker-compose.yaml:
version: "3"
services:
seata-server:
image: seataio/seata-server
hostname: seata-server
ports:
- "8091:8091"
environment:
- SEATA_PORT=8091
- STORE_MODE=file
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
下面我通过Storage模块来描述Dubbo + Seata的接入,其他模块,例如account
, order
模块的接入都是相同的。
.
├── java
│ └── cn
│ └── mushuwei
│ └── storage
│ ├── SeataStorageApplication.java #应用SpringBoot启动类
│ ├── api
│ │ ├── StorageApi.java #库存调用Dubbo接口
│ │ └── dto
│ │ └── CommodityDTO.java #库存数据传输类
│ ├── config
│ │ └── SeataAutoConfig.java #Seata配置类
│ ├── dao
│ │ └── StorageDao.java #库存持久化类
│ ├── entity
│ │ └── StorageDO.java #库存持久化实体
│ ├── provider
│ │ └── StorageApiImpl.java #库存调用Dubbo接口实现类
│ └── service
│ ├── StorageService.java #库存业务操作逻辑类
│ └── impl
│ └── StorageServiceImpl.java #库存业务操作逻辑实现类
└── resources
├── application.yml #应用配置文件
├── mybatis
│ └── storage.xml #mybatis xml文件
└── sql
└── storage.sql #数据库表结构和初始化数据
15 directories, 12 files
<!-- 日志相关 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<!-- web服务相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mysql数据库连接 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- dubbo微服务框架 -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
</dependency>
<!-- 使用 Zookeeper 作为注册中心 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<!-- seata 相关依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</dependency>
# dubbo配置项,对应DubboConfigurationProperties 配置类
dubbo:
application:
name: ${spring.application.name} #应用名
registry:
address: zookeeper://127.0.0.1:2181 #注册中心地址
timeout: 1000 # 指定注册到zk上超时时间,ms
protocol:
port: 20881 # 协议端口。使用 -1表示随机端口
name: dubbo # 使用 `dubbo://` 协议。更多协议,可见 http://dubbo.apache.org/zh-cn/docs/user/references/protocol/introduction.html 文档
scan:
base-packages: cn.mushuwei.storage # 指定实现服务的包
server:
port: 8081
#数据源配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/storage?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
application:
name: seata-action-storage #应用名
# seata相关配置
seata:
service:
grouplist:
default: 127.0.0.1:8091
vgroup-mapping:
service_tx_group: default
enable-degrade: false
disable-global-transaction: false
enabled: true
application-id: ${spring.application.name}
tx-service-group: service_tx_group
client:
tm:
commit-retry-count: 3
rollback-retry-count: 3
enable-auto-data-source-proxy: false
rm:
report-success-enable: true
table-meta-check-enable: true
report-retry-count: 5
async-commit-buffer-limit: 1000
transport: # Netty相关配置start
type: TCP
server: NIO
heartbeat: true
serialization: seata
compressor: none
enable-client-batch-send-request: true #客户端事务消息请求是否批量合并发送(默认true)
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
server-executor-thread-prefix: NettyServerBizHandler
share-boss-worker: false
client-selector-thread-prefix: NettyClientSelector
client-selector-thread-size: 1
client-worker-thread-prefix: NettyClientWorkerThread
#数据库sql操作打印日志
logging:
level:
cn.mushuwei.storage.dao: debug
# 创建商品库存表
create table if not exists storage.storage
(
id bigint auto_increment
primary key,
commodity_code varchar(50) null comment '商品编码',
name varchar(255) null comment '商品名称',
count int null comment '商品库存数'
);
INSERT INTO storage.storage (id, commodity_code, name, count) VALUES (1, 'cola', '可口可乐', 2000);
# 新建undo_log表
create table if not exists storage.undo_log
(
id bigint auto_increment
primary key,
branch_id bigint not null,
xid varchar(100) not null,
context varchar(128) not null,
rollback_info longblob not null,
log_status int not null,
log_created datetime not null,
log_modified datetime not null,
ext varchar(100) null,
constraint ux_undo_log
unique (xid, branch_id)
)
charset=utf8;
将上面的sql文件导入到新建的storage
数据库中。这个文件地址在resources/sql
下。
package cn.mushuwei.storage.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
/**
* @author jamesmsw
* @date 2020/12/1 11:06 上午
*/
@Configuration
public class SeataAutoConfig {
/**
* autowired datasource config
*/
@Autowired
private DataSourceProperties dataSourceProperties;
/**
* init durid datasource
*
* @Return: druidDataSource datasource instance
*/
@Bean
@Primary
public DruidDataSource druidDataSource(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
}
/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
return new DataSourceProxy(druidDataSource);
}
/**
* init mybatis sqlSessionFactory
* @Param: dataSourceProxy datasource proxy
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mybatis/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
}
}
package cn.mushuwei.storage.dao;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
/**
* @author jamesmsw
* @date 2020/11/30 7:46 下午
*/
@Repository("storageDao")
public interface StorageDao {
/**
* 扣减商品库存
*
* @param commodityCode 商品code
* @param count 扣减数量
* @return
*/
int decreaseStorage(@Param("commodityCode") String commodityCode, @Param("count") Integer count);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.mushuwei.storage.dao.StorageDao">
<update id="decreaseStorage">
update storage set count = count - #{count} where commodity_code = #{commodityCode}
</update>
</mapper>
到此为止,商品库存操作逻辑,就大致介绍完毕了,其他Account模块是扣减用户余额的操作,Order模块是新建订单数据的,具体配置和上述描述的差不懂。
package cn.mushuwei.business.controller;
import cn.mushuwei.business.dto.BusinessDTO;
import cn.mushuwei.business.service.BusinessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author jamesmsw
* @date 2020/12/1 9:48 下午
*/
@RestController
@RequestMapping("/business")
@Slf4j
public class BusinessController {
@Resource(name = "businessService")
private BusinessService businessService;
@PostMapping("/buy")
public String handleBusiness(@RequestBody BusinessDTO businessDTO){
log.info("请求参数:{}",businessDTO.toString());
Boolean result = businessService.handleBusiness(businessDTO);
if (result) {
return "ok";
}
return "fail";
}
}
在business
模块中,我们对外暴露接口/business/buy,用于给用户进行下单操作。
业务逻辑处理
package cn.mushuwei.business.service.impl;
import cn.mushuwei.business.dto.BusinessDTO;
import cn.mushuwei.business.service.BusinessService;
import cn.mushuwei.order.api.OrderApi;
import cn.mushuwei.order.api.dto.OrderDTO;
import cn.mushuwei.storage.api.StorageApi;
import cn.mushuwei.storage.api.dto.CommodityDTO;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.stereotype.Service;
/**
* @author jamesmsw
* @date 2020/12/1 9:37 下午
*/
@Slf4j
@Service("businessService")
public class BusinessServiceImpl implements BusinessService {
@DubboReference
private StorageApi storageApi;
@DubboReference
private OrderApi orderApi;
private boolean flag;
@Override
@GlobalTransactional(timeoutMills = 300000, name = "seata-demo-business")
public Boolean handleBusiness(BusinessDTO businessDTO) {
flag = true;
log.info("开始全局事务,XID = " + RootContext.getXID());
CommodityDTO commodityDTO = new CommodityDTO();
commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
commodityDTO.setCount(businessDTO.getCount());
boolean storageResult = storageApi.decreaseStorage(commodityDTO);
OrderDTO orderDTO = new OrderDTO();
orderDTO.setUserId(businessDTO.getUserId());
orderDTO.setCommodityCode(businessDTO.getCommodityCode());
orderDTO.setOrderCount(businessDTO.getCount());
orderDTO.setOrderAmount(businessDTO.getAmount());
boolean orderResult = orderApi.createOrder(orderDTO);
//打开注释测试事务发生异常后,全局回滚功能
// if (!flag) {
// throw new RuntimeException("测试抛异常后,分布式事务回滚!");
// }
if (!storageResult || !orderResult) {
throw new RuntimeException("失败");
}
return true;
}
}
@DubboReference
分布调用storageApi
和orderApi
, 用于处理库存扣减和订单数据逻辑的操作。@GlobalTransactional()
在发起业务类中是必须要加的,用于全局锁等逻辑操作。第一阶段:在正常的下单流程中,storage、order、account和business应用分别注册到Seata这个事务协调器上,当用户进行下单时,数据更新前后的日志将会别记录到每个数据库下的undo_log
表中,并形成一个全局的锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
第二阶段: 二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
第一阶段:在一阶段下单流程中,storage、order、account和business应用分别注册到Seata这个事务协调器上,当用户进行下单时,数据更新前后的日志将会别记录到每个数据库下的undo_log
表中,并形成一个全局的锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
第二阶段: 当下单出现异常时,Seata将会对数据进行回滚,回滚的逻辑是按照一阶段的日志。
mushuwei@mushuweideMacBook-Pro-2 seata % docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0c9c325a039c mysql:latest "docker-entrypoint.s…" 2 weeks ago Up 7 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp mysql5.7
b8031fa865cd seataio/seata-server "java -Djava.securit…" 2 weeks ago Up 20 seconds 0.0.0.0:8091->8091/tcp seata_seata-server_1
2af927368a15 apache/dubbo-admin "java -XX:+UnlockExp…" 2 weeks ago Up 2 hours 0.0.0.0:8080->8080/tcp dubbo_admin_1
7afec07234c9 zookeeper "/docker-entrypoint.…" 2 weeks ago Up 2 hours 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp dubbo_zookeeper_1
mysql> use storage;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from storage;
+----+----------------+------+-------+
| id | commodity_code | name | count |
+----+----------------+------+-------+
| 1 | cola | ???? | 2000 |
+----+----------------+------+-------+
1 row in set (0.00 sec)
mysql> use account;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from account;
+----+---------+---------+
| id | user_id | amount |
+----+---------+---------+
| 1 | user123 | 1250.00 |
+----+---------+---------+
1 row in set (0.00 sec)
mysql> use order;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from order;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'order' at line 1
mysql> select * from `order`;
Empty set (0.00 sec)
business
模块test/java目录下的business.http文件,对接口发起请求。Content-Type: application/json
{
"userId" : "user123",
"commodityCode" : "cola",
"count" : 2,
"amount" : 5.0
}
以上代码,我已经上传到GitHub中了,大家详见: https://github.com/sanshengshui/seata-dubbo-action,AT模式在master分支上。
下一章将给大家介绍基于Dubbo + Seata的分布式事务 --- TCC模式的实战案例,敬请期待!