感谢大佬
https://blog.csdn.net/bjweimengshu/article/details/79607522
https://www.cnblogs.com/savorboard/p/distributed-system-transaction-consistency.html
https://www.cnblogs.com/monkeyblog/p/10449363.html
当然这里绝对不是打广告!
而且,我看的可不止这么多,这个是我看的比较好的… 为了方便我后面自己忘记了可以回顾的。举个生活中的例子:
任一个活动失败,事务将撤销所有已成功的活动。
A(Atomic):原子性
C(Consistency):一致性
I(Isolation):隔离性
D(Durability):持久性
Local Transaction
本地事务
**单体应用
转变 为**分布式应用
**不同的服务还会有不同的库
因此需要服务与服务之间远程协作才能完成事务操作由不同的服务之间通过网络远程协作,在不同的数据库之间,完成事务
**称之为分布式事务服务内操作涉及到对多个数据库资源的访问。
这时的事务就需要跨越多个服务了
**好牛啊!
保证每个事务的数据一致性。
一致性
、可用性
、分区容忍性
C (一致性)
读取到最新的数据状态
** 实时更新!A (可用性)
最终一致性
这是事务必须的!
非故障的节点在合理的时间内返回合理的响应 (不是错误和超时的响应)所以分布式系统理论上不可能选择 CA 架构, 只能选择 CP 或者 AP 架构。
P (分区容错性)
网络分区
不可避免的会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这叫**分区容忍性
分区容忍性分是布式系统具备的基本能力。
AP:
CP:
CA:
关系型数据就满足了CA
“柔性事务”
2pc传统方案
**
2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议Open Group
定义了 分布式事务处理模型 DTP** Distributed Transaction Processing Reference Model
DTP模型定义如下角色:
两阶段提交协议 Two Phase Commitment Protocol
涉及到两种角色
事务协调者(coordinator)
** TM事务管理器
负责协调多个参与者进行事务投票及提交(回滚)事务参与者(participants)
** RM资源管理器
即本地事务执行者总共处理步骤有两个
(voting phase)
参与者操作
协调者将通知,事务参与者 准备提交或取消事务,然后进入表决过程投票阶段
参与者将告知协调者自己的决策**
true: 事务参与者,本地事务执行成功,但未提交
flase: 本地事务执行故障(commit phase)
协调者操作
收到参与者的通知后,协调者再向参与者发出通知,根据反馈**投票
**情况决定,各参与者是否要提交还是回滚
多个参与者,只要有一个false , 就表示事务执行失败,通知所有的参与者未提交的事务进行回滚!**缺点:
投票阶段
**,RM执行实际的业务操作,但不提交事务,**资源锁定
3PC阶段解此问题!
事务协调者是整个XA模型的核心,
一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知。参与者会一直处于中间状态无法完成事务。Simple Extensible Autonomous Transaction Architecture
一套一站式分布式事务解决方案。解决分布式事务问题,有两个设计初衷
@Configuration
**本地放心提交事务,可以依靠undo_log进行回滚处理..
**seata中有两种分布式事务实现方案:**AT
** 及**TCC
**
Seata AT模式是基于 XA事务演进而来的一个分布式事务中间件
Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手 在传统2PC的基础上演进,并解决 2PC方案面临的问题, 第二阶段资源占用!
与 传统2PC 的模型类似,Seata定义了3个组件来协议分布式事务的处理过程:
决议。
**协调并驱动
**全局事务的提交或回滚。第一阶段
数据库行锁
:确保多线程情况下,改记录只能由一个线程操作!确保后面执行...
第二阶段
提交
极大提高了第二阶段的执行性能!
回滚
事务执行后的数据是否被更改过!
如果两份数据完全一致就说明没有脏写,可以还原业务数据
如果不一致就说明有脏写,出现脏写就需要转人工处理。
架构层次方面
两阶段提交方面
事务性资源的锁都要保持到
第二阶段完成才释放。将提交前的数据信息,保存在undo_log表中...
,这样就可以省去阶段二持锁的时间,整体提高效率。目前会用即可!
**util微服里了!
** <dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
或
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
因为:
spring-cloud-starter-alibaba-seata
这个依赖中只依赖了spring-cloud-alibaba-seata
所以在项目中添加spring-cloud-starter-alibaba-seata
和spring-cloud-alibaba-seata
是一样的
util微服模块的, resourece资源文件目录下:
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file" #默认 file文件类型
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
lock {
## the lock store mode: local、remote
mode = "remote"
local {
## store locks in user's database
}
remote {
## store locks in the seata's server
}
}
recovery {
committing-retry-delay = 30
asyn-committing-retry-delay = 30
rollbacking-retry-delay = 30
timeout-retry-delay = 30
}
transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
}
## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}
my_test_tx_group
.yml
spring:
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
一般直接放在公共的util 模块中
io.seata.rm.datasource.DataSourceProxy
org.apache.ibatis.session.SqlSessionFactory
DataSourceProxyConfig.Java
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
//JPA 不需要注入sqlSessionFactoryBean
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
return sqlSessionFactoryBean.getObject();
}
}
用于保存需要回滚的数据
** 每个参与分布式事务都要加这个库!给每个数据库加入一个
undo_log
**日志表!**undo_log.sql
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8
sh ./bin/seata-server.sh
@GlobalTransactional开启全局事务
** 就是你业务执行的总方法!全局事务注解
3PC三阶段,提交是在二阶段提交上的改进版本,主要是加入了**超时机制
**。同时在 协调者和参与者中都引入超时机制
需要开发者介入!
三阶段将二阶段 准备阶段
**拆分为2个阶段**
导致参与者无法知晓是否提交或回滚的不确定状态所引起的延时问题。
两种可能
**情况 1:阶段 1 所有参与者均反馈 yes,参与者预执行事务
情况 2:阶段 1 任何一个参与者反馈 no,或者等待超时后协调者尚无法收到所有参与者的反馈,即中断事务
阶段一返回一个 no 中断所有事务
参与者 未接收到 协调者消息 中断本身事务!
解决了协调者突然挂了的情况!也可以分为以下两种情况。
情况一 阶段 2 所有参与者均反馈 ack 响应,执行真正的事务提交
情况二 阶段 2 任何一个参与者反馈 no,或者 等待超时后 协调者尚无法收到所有参与者的反馈
**,即中断事务**
继续执行事务提交。
**即:阶段三如果 协调者出现故障,仍会导致数据事务不一致!!需要开发者介入!
尝试
**、Confirm**确认
**、Cancel**撤销
**三个词语的缩写
Try操作做业务检查及资源预留
Confirm做业务确认操作
Cancel实现一个与Try相反的操作 即回滚操作
其实从思想上看和 2PC 差不多,都是先试探性的执行,如果都可以那就真正的执行,如果不行就回滚。
流程还是很简单的
Try - Confirm - Cancel
因此 TCC 对业务的侵入较大和业务紧耦合 需要根据特定的场景和业务逻辑来设计相应的操作。
很多时候需要手动补偿代码! 幂等: 无论程序执行n次,保证最终执行结果唯一!
2PC 和 3PC 都是数据库层面的,而 TCC 是**业务层面的分布式事务
**
补偿代码
发送短信等…最终一致性
这个方式避免了像XA协议那样的性能问题。
方案简介:
核心思路是将分布式事务拆分成
一个个,本地事务进行处理。事务执行消息表
开启事务
,并记录消息在 事务消息表中
定时查看,事务消息表的数据发送事务消息
进行处理!
优点:
业务处理成功 + 事务消息发送失败
或 业务处理失败 + 事务消息发送成功
多个系统事务的数据 最终
一致性缺点:
随便编的业务别杠
消息表
写入“添加选课消息” task_his.sql:任务信息
mq 交换机/队列
version版本...
这里的 操作是本地事务执行 要么全部失败,要么全部成功! 且
如果这里,就事务执行失败了,直接回滚 事务失败!
定时框架
定时扫描 task_his.sql 表信息,向MQ中发送消息
避免了如果在发送消息时候,网络动荡消息发送失败!**定时发送...
**Confirm 消息确认
100%发送消息
确保消息一定会发送到MQ 上!
对于这里,因为定时框架会每隔一段时间,就扫描 消息表
循环向mq 上发消息!
mq 发送成功就成功! 删除,消息表中的任务...
mq 发送失败, 也不会立刻,重新发送**等待下次定时任务...
当然如果直接重新发送也行
不同场景不同处理!
反正无论如何都会发的MQ 上!**同时获取自己的结果消息
事务成功/失败
根据MQ的 消息确认接收机制(ACK)
消息一旦被消费者接收返回 ack,队列中的消息就会被删除。
使用手动ACK: 消息接收后,不会立刻发送ACK , 学习模块事务执行完毕才返回 ack
确保了消息的消费!
学习模块的事务无论执行成功/失败,都会在像MQ发送一条消息! 注意 这里还是要做 幂等的操作! 无论程序执行n次,对于相同的请求保证结果唯一!
方式很多,可以是根据任务请求内容,获取订单id 判断是否执行过…
消息表的消息!
就避免了消息在次 定时发送!**ack 保证消息一定被消费
Confirm 保证消息一定发送成功
回调方法!
任务
表:task_his.sql
DROP TABLE IF EXISTS `task_his`;
CREATE TABLE `task_his` (
`id` varchar(32) NOT NULL COMMENT '任务id',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`delete_time` datetime DEFAULT NULL,
`task_type` varchar(32) DEFAULT NULL COMMENT '任务类型',
`mq_exchange` varchar(64) DEFAULT NULL COMMENT '交换机名称',
`mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
`request_body` varchar(512) DEFAULT NULL COMMENT '任务请求的内容',
`version` int(10) DEFAULT '0' COMMENT '乐观锁版本号',
`status` varchar(32) DEFAULT NULL COMMENT '任务状态',
`errormsg` varchar(512) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
要操作MQ 交换机 队列信息!
MessageTaskJob.Java
@Component
public class MessageTaskJob {
@Autowired
private TbTaskService taskService; //消息表业务对象!
@Scheduled(cron = "0/30 * * * * ?") //定时每三十秒扫描一次 消息表!
public void showTask() {
System.out.println("查询任务数据!");
try {
//获取一分钟前消息表数据!
List<TbTask> list = taskService.getBeforTaskList();
//循环变量发送MQ消息...
for (TbTask tbTask : list) {
//使用乐观锁解决高并发下的信息发送...后面解释!
if (taskService.updateVersionLock(tbTask.getId(), tbTask.getVersion()) > 0) {
//发送MQ...
taskService.publishTaskMessage(tbTask);
//发送之后修改,当前 订单消息表的记录时间..当前时间即可!
//因为定时会循环,如果消息发送失败..则直接下次在轮询重新发送即可!
taskService.updateTaskUpdateTime(tbTask.getId());
System.out.println("发送消息并,修改时间!");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
定时操作执行在次发送...
** 确保最终一致性!Mybatis sql参考
<!-- 获取一分钟前消息表的所有数据! -->
<select id="getBeforTaskList" resultType="com.zb.entity.TbTask">
select
id as id,
create_time as createTime,
update_time as updateTime,
delete_time as deleteTime,
task_type as taskType,getBeforTaskList
mq_exchange as mqExchange,
mq_routingkey as mqRoutingkey,
request_body as requestBody,
status as status,
errormsg as errormsg,
version as version
from tb_task
WHERE TIMESTAMPDIFF(MINUTE,update_time, NOW())>1
</select>
<!-- 发送消息成功更改时间... -->
<update id="updateNowTime" >
update tb_task set update_time =now() where id=#{id}
</update>
<!-- 乐观锁:多线程情况下防止,统一消息发送多次处理...
根据 id version版本 来进行修改..
-->
<update id="updateVersioLock">
update tb_task set version =#{version}+1 where id=#{id} and version=#{version}
</update>
定时任务
内重复执行,这里使用乐观锁
A B 线程执行任务都查到了 消息集合
A 发送了任务1 B 也发送了任务1 重复操作!消息集合
A 先执行并带着版本号+id去给版本+1 B因为查的 消息集合
并不是最新的id+版本号去修改影响行数小于0 不执行发送任务!但如果:A线程执行特别快修改了版本,但是B执行慢才查到,**获取了最新的 消息集合
**