上一篇文章我们使用Atomikos实现了分布式事务:
但是里面有个问题:可能会对同一个表生成两份mapper相关的内容。
于是,我们继续探索其他分布式事务组件。
常见分布式事务组件:Seata 、Atomikos、bitronix、Narayana等
下面是它们的优缺点和适用场景:
优点:
缺点:
适用场景:
优点:
缺点:
适用场景:
优点:
缺点:
适用场景:
还有Narayana也能实现分布式事务,一种开源的Java事务管理器,支持XA协议和非XA事务管理,也支持TCC和Saga事务管理。
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
1、典型的场景就是微服务架构 微服务之间通过远程调用完成事务操作。比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。简言之:跨JVM进程产生分布式事务。
2、单体系统访问多个数据库实例 当单体系统需要访问多个数据库(实例)时就会产生分布式事务。比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。简言之:跨数据库实例产生分布式事务。
3、多服务访问同一个数据库实例 比如:订单微服务和库存微服务即使访问同一个数据库也会产生分布式事务,原因就是跨JVM进程,两个微服务持有了不同的数据库链接进行数据库操作,此时产生分布式事务。
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,其内部版本在阿里系内部一直扮演着应用架构层数据一致性的中间件角色,帮助经济体平稳的度过历年的双11,对上层业务进行了有力的技术支撑。经过多年沉淀与积累,其商业化产品先后在阿里云、金融云上售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,未来 Seata 将以社区共建的形式帮助用户快速落地分布式事务解决方案。 Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 目前已支持Dubbo、Spring Cloud、Sofa-RPC、Motan 和 gRPC 等RPC框架,其他框架持续集成中 --------来自官网
在分布式系统中要实现分布式事务,常见的解决方案有两段提交(2PC)、三段提交(3PC)、事务补偿(TCC)、本地消息表(异步确保)、MQ事务方案(可靠消息事务)、最大努力通知和Saga事务。
本案例使用Nacos作为服务注册中心和分布式配置中心,请先准备好Nacos环境。 seata-server为release版本1.4.2,demo采用本地单机部署 本文采用seata的默认模式:AT模式
Seata下载地址:https://github.com/seata/seata/releases
下载到本地后解压,接着修改相关配置:
第一步:conf/registry.conf
registry {
type = "nacos"
nacos {
serverAddr = "127.0.0.1"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1"
namespace = ""
cluster = "default"
}
}
不想相关的都删掉。
第二步:在conf目录仙剑文件:config.txt,内容:
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#service.vgroupMapping.order-service-group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
log.exceptionRate=100
store.mode=file
store.lock.mode=file
store.session.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000#
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
注意把这个配置文件汇总的mysql相关配置改成你的mysql相关的配置。
这里的seata数据库包含三张表:
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`status` tinyint NOT NULL,
`application_id` varchar(64) DEFAULT NULL,
`transaction_service_group` varchar(64) DEFAULT NULL,
`transaction_name` varchar(64) DEFAULT NULL,
`timeout` int DEFAULT NULL,
`begin_time` bigint DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(32) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
第四步:修改conf配置文件,file.conf
修改:mode = "db"
再把数据库相关配置项修改为自己数据库的配置信息。
第五步:在Nacos上添加配置:
Data Id=seataServer.properties Group=SEATA_GROUP
配置内容:
store.publicKey=123456
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://localhost:3306/seata?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
store.db.user=root
store.db.password=123456
记得选properties文件类型。
第六步:项目中配置resources目录下创建registry.conf
文件。
文件内容:
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
注意:这里面的group、dataId对应Nacos中的分布式配置。
第七步:在application.properties
配置文件中。
# Nacos 注册中心地址
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# seata 服务分组,要与服务端nacos-config.txt中service.vgroup_mapping的后缀对应
seata.tx-service-group=stock-service-group
seata.service.vgroup-mapping.stock-service-group=default
logging.level.io.seata=debug
第八步,启动seata
。双击seata-server-1.4.2/bin
下面的seata-server.bat
。
用户下单:
stock-service 服务
service
@Transactional(rollbackFor = Exception.class)
public void deduct(String commodityCode, int count) {
if (commodityCode.equals("product-2")) {
throw new RuntimeException("异常:模拟业务异常:stock branch exception");
}
QueryWrapper<Stock> wrapper = new QueryWrapper<>();
wrapper.setEntity(new Stock().setCommodityCode(commodityCode));
Stock stock = stockDAO.selectOne(wrapper);
stock.setCount(stock.getCount() - count);
stockDAO.updateById(stock);
}
controller
@RestController
@RequestMapping("stock")
public class StockController {
@Resource
private StockService stockService;
/**
* 减库存
*
* @param commodityCode 商品代码
* @param count 数量
* @return
*/
@RequestMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count) {
stockService.deduct(commodityCode, count);
return true;
}
}
order-service服务:
@FeignClient(name = "stock-service")
public interface StockFeignClient {
@GetMapping("stock/deduct")
Boolean deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}
@Service
public class OrderService {
@Resource
private StockFeignClient stockFeignClient;
@Resource
private OrderDAO orderDAO;
/**
* 下单:创建订单、减库存,涉及到两个服务
*
* @param userId
* @param commodityCode
* @param count
*/
@GlobalTransactional
@Transactional(rollbackFor = Exception.class)
public void placeOrder(String userId, String commodityCode, Integer count) {
BigDecimal orderMoney = new BigDecimal(count).multiply(new BigDecimal(5));
Order order = new Order().setUserId(userId).setCommodityCode(commodityCode).setCount(count).setMoney(
orderMoney);
orderDAO.insert(order);
stockFeignClient.deduct(commodityCode, count);
}
}
@GlobalTransactional
我们在业务代码部分,只需要关注这个注解就行,在你需要实现分布式事务的地方加上。
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private OrderService orderService;
@Resource
private StockFeignClient stockFeignClient;
/**
* 下单:插入订单表、扣减库存,模拟回滚
*/
@RequestMapping("/placeOrder/commit")
public Boolean placeOrderCommit() {
orderService.placeOrder("1", "product-1", 1);
return true;
}
/**
* 下单:插入订单表、扣减库存,模拟回滚
*/
@RequestMapping("/placeOrder/rollback")
public Boolean placeOrderRollback() {
// product-2 扣库存时模拟了一个业务异常,
try {
orderService.placeOrder("1", "product-2", 1);
}catch (Exception ex){
log.error("----------",ex);
}
return true;
}
}
每个数据库都必须见一张表:
CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
其他的两个表:
CREATE TABLE `stock_tbl` (
`id` int NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
CREATE TABLE `order_tbl` (
`id` int NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int DEFAULT '0',
`money` int DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
库存:
然后我们启动项目,测试;
访问正常:
GET http://localhost:9091/order/placeOrder/commit
订单库数据库订单表插入成功:
库存扣除成功:
再来掉一个模仿失败的场景,然数据库回滚;
GET http://localhost:9091/order/placeOrder/rollback
再去查订单表和库存表完全没有变化。
到此,咱们使用seata实现的分布式事务就这么“轻松的”完成了。