

在微服务架构日益普及的今天,分布式事务已成为后端开发中不可回避的技术挑战。当业务逻辑跨越多个服务、多个数据库时,如何保证数据的一致性和事务的完整性,成为了架构师和开发者必须深入理解和掌握的核心技能。
本文将从理论基础到实战应用,全面解析分布式事务的核心概念、实现原理和最佳实践。我们将深入探讨2PC、3PC、TCC、Saga等经典算法,结合Spring Cloud Alibaba Seata、Apache ShardingSphere等主流框架,通过丰富的代码示例,帮助读者构建完整的分布式事务知识体系。
无论你是初入分布式系统的新手,还是希望深化理解的资深开发者,这篇文章都将为你提供有价值的技术洞察和实践指导。

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单来说,就是一次大的操作由不同的小操作组成,这些小操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
// 典型的分布式事务场景:电商下单
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private AccountService accountService;
/**
* 下单操作涉及多个服务的数据变更
* 1. 扣减库存 (inventory-service)
* 2. 扣减账户余额 (account-service)
* 3. 创建支付记录 (payment-service)
* 4. 创建订单记录 (order-service)
*/
@GlobalTransactional // Seata全局事务注解
public void createOrder(OrderRequest request) {
// 1. 扣减库存
inventoryService.deductStock(request.getProductId(), request.getQuantity());
// 2. 扣减账户余额
accountService.deductBalance(request.getUserId(), request.getAmount());
// 3. 创建支付记录
paymentService.createPayment(request.getUserId(), request.getAmount());
// 4. 创建订单
createOrderRecord(request);
// 如果任何一步失败,整个事务都会回滚
}
private void createOrderRecord(OrderRequest request) {
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
}
}// 单机事务的ACID特性示例
@Service
@Transactional
public class BankTransferService {
/**
* 传统的银行转账操作
* ACID特性在单机环境下很容易保证
*/
public void transfer(String fromAccount, String toAccount, BigDecimal amount) {
// Atomicity: 原子性 - 要么全部成功,要么全部失败
Account from = accountRepository.findByAccountNo(fromAccount);
Account to = accountRepository.findByAccountNo(toAccount);
// Consistency: 一致性 - 转账前后总金额不变
if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientBalanceException("余额不足");
}
// Isolation: 隔离性 - 并发事务不会相互影响
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
// Durability: 持久性 - 事务提交后数据持久化
}
}在分布式环境下,传统的ACID特性面临以下挑战:
// 分布式环境下的转账操作
@Service
public class DistributedBankTransferService {
@Autowired
private AccountServiceA accountServiceA; // 银行A的账户服务
@Autowired
private AccountServiceB accountServiceB; // 银行B的账户服务
/**
* 跨银行转账 - 分布式事务场景
* 挑战:如何保证两个不同银行系统的数据一致性
*/
public void crossBankTransfer(String fromAccount, String toAccount, BigDecimal amount) {
try {
// 步骤1:从银行A扣款
accountServiceA.debit(fromAccount, amount);
// 网络可能在这里出现问题...
// 步骤2:向银行B转账
accountServiceB.credit(toAccount, amount);
// 问题:如果步骤2失败,步骤1已经执行,如何回滚?
// 问题:如何保证两个操作的原子性?
// 问题:如何处理网络分区和节点故障?
} catch (Exception e) {
// 分布式环境下的异常处理变得复杂
// 需要考虑补偿操作
handleDistributedTransactionFailure(fromAccount, toAccount, amount, e);
}
}
private void handleDistributedTransactionFailure(String fromAccount, String toAccount,
BigDecimal amount, Exception e) {
// 实现补偿逻辑
// 这里需要复杂的状态管理和重试机制
}
}CAP定理指出,在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)三者不可兼得,最多只能同时保证其中两个。
/**
* CAP定理在分布式事务中的体现
*/
public class CAPTheoryExample {
/**
* CP系统:保证一致性和分区容错性,牺牲可用性
* 例如:传统的2PC协议
*/
public class CPSystem {
public void processTransaction() {
// 在网络分区时,系统会阻塞等待
// 保证数据一致性,但牺牲了可用性
waitForAllNodesResponse(); // 可能长时间阻塞
}
}
/**
* AP系统:保证可用性和分区容错性,牺牲强一致性
* 例如:最终一致性系统
*/
public class APSystem {
public void processTransaction() {
// 即使在网络分区时也能继续服务
// 但可能出现数据不一致的情况
processWithEventualConsistency();
}
private void processWithEventualConsistency() {
// 立即响应用户请求
// 通过异步方式最终达到一致性
}
}
}BASE理论是对CAP定理的延伸,提出了基本可用(Basically Available)、软状态(Soft state)和最终一致性(Eventually consistent)的概念。
/**
* BASE理论的实现示例
*/
@Service
public class BaseTheoryExample {
@Autowired
private MessageQueue messageQueue;
/**
* 基于BASE理论的订单处理
* BA: 基本可用 - 系统在出现故障时仍能提供基本功能
* S: 软状态 - 系统状态可以有一段时间的不一致
* E: 最终一致性 - 系统最终会达到一致状态
*/
public OrderResult processOrder(OrderRequest request) {
try {
// 1. 立即创建订单(基本可用)
Order order = createOrderImmediately(request);
// 2. 异步处理其他操作(软状态)
sendAsyncMessage(new InventoryDeductionMessage(request));
sendAsyncMessage(new PaymentProcessMessage(request));
sendAsyncMessage(new NotificationMessage(request));
// 3. 返回处理中状态(软状态)
return OrderResult.builder()
.orderId(order.getId())
.status(OrderStatus.PROCESSING)
.message("订单创建成功,正在处理中...")
.build();
} catch (Exception e) {
// 即使部分功能失败,仍提供基本服务
return OrderResult.builder()
.status(OrderStatus.FAILED)
.message("订单创建失败,请稍后重试")
.build();
}
}
/**
* 异步消息处理器 - 实现最终一致性
*/
@EventListener
public void handleInventoryDeduction(InventoryDeductionMessage message) {
try {
inventoryService.deductStock(message.getProductId(), message.getQuantity());
// 成功后发送确认消息
messageQueue.send(new InventoryDeductionConfirmMessage(message.getOrderId()));
} catch (Exception e) {
// 失败后发送补偿消息
messageQueue.send(new OrderCancellationMessage(message.getOrderId()));
}
}
private Order createOrderImmediately(OrderRequest request) {
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setStatus(OrderStatus.PROCESSING);
return orderRepository.save(order);
}
private void sendAsyncMessage(Object message) {
messageQueue.send(message);
}
}/**
* 刚性事务:严格遵循ACID特性
* 适用场景:对数据一致性要求极高的场景,如金融交易
*/
@Service
public class RigidTransactionExample {
/**
* 使用2PC协议实现的刚性事务
*/
@GlobalTransactional(rollbackFor = Exception.class)
public void rigidTransaction(TransferRequest request) {
// 阶段1:准备阶段
// 所有参与者都必须准备好才能提交
// 阶段2:提交阶段
// 要么全部提交,要么全部回滚
accountService.debit(request.getFromAccount(), request.getAmount());
accountService.credit(request.getToAccount(), request.getAmount());
// 如果任何操作失败,整个事务回滚
// 保证强一致性,但可能影响性能和可用性
}
}
/**
* 柔性事务:放宽一致性要求,追求最终一致性
* 适用场景:对性能和可用性要求较高的场景
*/
@Service
public class FlexibleTransactionExample {
/**
* 使用Saga模式实现的柔性事务
*/
public void flexibleTransaction(OrderRequest request) {
// 将大事务拆分为多个小事务
// 每个小事务都有对应的补偿操作
SagaTransaction saga = SagaTransaction.builder()
.addStep(new DeductInventoryStep(request), new CompensateInventoryStep(request))
.addStep(new DeductAccountStep(request), new CompensateAccountStep(request))
.addStep(new CreateOrderStep(request), new CancelOrderStep(request))
.build();
saga.execute();
// 如果某个步骤失败,执行补偿操作
// 最终达到一致性,但过程中可能存在不一致状态
}
}两阶段提交协议是最经典的分布式事务解决方案,分为准备阶段和提交阶段。
/**
* 2PC协议的协调者实现
*/
@Component
public class TwoPhaseCommitCoordinator {
private List<TransactionParticipant> participants;
private TransactionLog transactionLog;
/**
* 执行两阶段提交
*/
public boolean executeTransaction(String transactionId, TransactionContext context) {
// 阶段1:准备阶段(Prepare Phase)
boolean prepareResult = preparePhase(transactionId, context);
if (prepareResult) {
// 阶段2:提交阶段(Commit Phase)
return commitPhase(transactionId);
} else {
// 如果准备阶段失败,执行回滚
return abortPhase(transactionId);
}
}
/**
* 准备阶段:询问所有参与者是否可以提交
*/
private boolean preparePhase(String transactionId, TransactionContext context) {
log.info("开始准备阶段,事务ID: {}", transactionId);
// 记录事务开始日志
transactionLog.logTransactionStart(transactionId, participants);
List<CompletableFuture<Boolean>> prepareFutures = new ArrayList<>();
// 并行向所有参与者发送准备请求
for (TransactionParticipant participant : participants) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
// 发送准备请求
PrepareRequest request = new PrepareRequest(transactionId, context);
PrepareResponse response = participant.prepare(request);
if (response.isSuccess()) {
log.info("参与者 {} 准备成功", participant.getId());
return true;
} else {
log.warn("参与者 {} 准备失败: {}", participant.getId(), response.getErrorMessage());
return false;
}
} catch (Exception e) {
log.error("参与者 {} 准备阶段异常", participant.getId(), e);
return false;
}
});
prepareFutures.add(future);
}
// 等待所有参与者响应
try {
List<Boolean> results = prepareFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 只有所有参与者都准备成功才能进入提交阶段
boolean allPrepared = results.stream().allMatch(Boolean::booleanValue);
if (allPrepared) {
transactionLog.logPrepareSuccess(transactionId);
log.info("所有参与者准备成功,事务ID: {}", transactionId);
} else {
transactionLog.logPrepareFailure(transactionId);
log.warn("部分参与者准备失败,事务ID: {}", transactionId);
}
return allPrepared;
} catch (Exception e) {
log.error("准备阶段异常,事务ID: {}", transactionId, e);
transactionLog.logPrepareFailure(transactionId);
return false;
}
}
/**
* 提交阶段:通知所有参与者提交事务
*/
private boolean commitPhase(String transactionId) {
log.info("开始提交阶段,事务ID: {}", transactionId);
transactionLog.logCommitStart(transactionId);
List<CompletableFuture<Boolean>> commitFutures = new ArrayList<>();
// 并行向所有参与者发送提交请求
for (TransactionParticipant participant : participants) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
CommitRequest request = new CommitRequest(transactionId);
CommitResponse response = participant.commit(request);
if (response.isSuccess()) {
log.info("参与者 {} 提交成功", participant.getId());
return true;
} else {
log.error("参与者 {} 提交失败: {}", participant.getId(), response.getErrorMessage());
// 注意:在2PC中,如果准备阶段成功但提交阶段失败,
// 这是一个严重问题,需要人工干预
return false;
}
} catch (Exception e) {
log.error("参与者 {} 提交阶段异常", participant.getId(), e);
return false;
}
});
commitFutures.add(future);
}
// 等待所有参与者提交完成
try {
List<Boolean> results = commitFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
boolean allCommitted = results.stream().allMatch(Boolean::booleanValue);
if (allCommitted) {
transactionLog.logCommitSuccess(transactionId);
log.info("所有参与者提交成功,事务ID: {}", transactionId);
} else {
transactionLog.logCommitFailure(transactionId);
log.error("部分参与者提交失败,事务ID: {}", transactionId);
// 这种情况下需要人工干预或重试机制
}
return allCommitted;
} catch (Exception e) {
log.error("提交阶段异常,事务ID: {}", transactionId, e);
transactionLog.logCommitFailure(transactionId);
return false;
}
}
/**
* 回滚阶段:通知所有参与者回滚事务
*/
private boolean abortPhase(String transactionId) {
log.info("开始回滚阶段,事务ID: {}", transactionId);
transactionLog.logAbortStart(transactionId);
// 向所有参与者发送回滚请求
for (TransactionParticipant participant : participants) {
try {
AbortRequest request = new AbortRequest(transactionId);
participant.abort(request);
log.info("参与者 {} 回滚成功", participant.getId());
} catch (Exception e) {
log.error("参与者 {} 回滚失败", participant.getId(), e);
// 回滚失败也需要记录,可能需要人工处理
}
}
transactionLog.logAbortComplete(transactionId);
return true;
}
}/**
* 2PC协议的参与者实现
*/
@Component
public class TwoPhaseCommitParticipant implements TransactionParticipant {
private String participantId;
private DataSource dataSource;
private Map<String, Connection> transactionConnections = new ConcurrentHashMap<>();
@Override
public PrepareResponse prepare(PrepareRequest request) {
String transactionId = request.getTransactionId();
try {
log.info("参与者 {} 收到准备请求,事务ID: {}", participantId, transactionId);
// 1. 开启本地事务
Connection connection = dataSource.getConnection();
connection.setAutoCommit(false);
// 2. 执行业务逻辑
boolean businessResult = executeBusinessLogic(connection, request.getContext());
if (businessResult) {
// 3. 业务逻辑执行成功,保存连接等待提交指令
transactionConnections.put(transactionId, connection);
log.info("参与者 {} 准备成功,事务ID: {}", participantId, transactionId);
return PrepareResponse.success();
} else {
// 4. 业务逻辑执行失败,回滚并关闭连接
connection.rollback();
connection.close();
log.warn("参与者 {} 准备失败,业务逻辑执行失败,事务ID: {}", participantId, transactionId);
return PrepareResponse.failure("业务逻辑执行失败");
}
} catch (Exception e) {
log.error("参与者 {} 准备阶段异常,事务ID: {}", participantId, transactionId, e);
return PrepareResponse.failure("准备阶段异常: " + e.getMessage());
}
}
@Override
public CommitResponse commit(CommitRequest request) {
String transactionId = request.getTransactionId();
Connection connection = transactionConnections.get(transactionId);
if (connection == null) {
log.error("参与者 {} 未找到事务连接,事务ID: {}", participantId, transactionId);
return CommitResponse.failure("未找到事务连接");
}
try {
log.info("参与者 {} 开始提交,事务ID: {}", participantId, transactionId);
// 提交本地事务
connection.commit();
connection.close();
// 清理事务连接
transactionConnections.remove(transactionId);
log.info("参与者 {} 提交成功,事务ID: {}", participantId, transactionId);
return CommitResponse.success();
} catch (Exception e) {
log.error("参与者 {} 提交失败,事务ID: {}", participantId, transactionId, e);
try {
connection.rollback();
connection.close();
} catch (SQLException rollbackException) {
log.error("回滚失败", rollbackException);
}
transactionConnections.remove(transactionId);
return CommitResponse.failure("提交失败: " + e.getMessage());
}
}
@Override
public void abort(AbortRequest request) {
String transactionId = request.getTransactionId();
Connection connection = transactionConnections.get(transactionId);
if (connection != null) {
try {
log.info("参与者 {} 开始回滚,事务ID: {}", participantId, transactionId);
connection.rollback();
connection.close();
transactionConnections.remove(transactionId);
log.info("参与者 {} 回滚成功,事务ID: {}", participantId, transactionId);
} catch (Exception e) {
log.error("参与者 {} 回滚失败,事务ID: {}", participantId, transactionId, e);
}
}
}
/**
* 执行具体的业务逻辑
*/
private boolean executeBusinessLogic(Connection connection, TransactionContext context) {
try {
// 这里实现具体的业务逻辑
// 例如:扣减库存、扣减余额等
if (context.getOperation().equals("DEDUCT_INVENTORY")) {
return deductInventory(connection, context);
} else if (context.getOperation().equals("DEDUCT_BALANCE")) {
return deductBalance(connection, context);
}
return true;
} catch (Exception e) {
log.error("业务逻辑执行异常", e);
return false;
}
}
private boolean deductInventory(Connection connection, TransactionContext context) throws SQLException {
String sql = "UPDATE inventory SET quantity = quantity - ? WHERE product_id = ? AND quantity >= ?";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setInt(1, context.getQuantity());
stmt.setString(2, context.getProductId());
stmt.setInt(3, context.getQuantity());
int affectedRows = stmt.executeUpdate();
return affectedRows > 0;
}
}
private boolean deductBalance(Connection connection, TransactionContext context) throws SQLException {
String sql = "UPDATE account SET balance = balance - ? WHERE user_id = ? AND balance >= ?";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setBigDecimal(1, context.getAmount());
stmt.setString(2, context.getUserId());
stmt.setBigDecimal(3, context.getAmount());
int affectedRows = stmt.executeUpdate();
return affectedRows > 0;
}
}
}优点:
缺点:

三阶段提交协议是对2PC的改进,增加了CanCommit阶段,并引入了超时机制。
/**
* 3PC协议的协调者实现
*/
@Component
public class ThreePhaseCommitCoordinator {
private List<TransactionParticipant> participants;
private TransactionLog transactionLog;
private final int TIMEOUT_SECONDS = 30;
/**
* 执行三阶段提交
*/
public boolean executeTransaction(String transactionId, TransactionContext context) {
// 阶段1:CanCommit阶段
boolean canCommitResult = canCommitPhase(transactionId, context);
if (!canCommitResult) {
return false;
}
// 阶段2:PreCommit阶段
boolean preCommitResult = preCommitPhase(transactionId, context);
if (preCommitResult) {
// 阶段3:DoCommit阶段
return doCommitPhase(transactionId);
} else {
// 如果PreCommit失败,执行回滚
return abortPhase(transactionId);
}
}
/**
* 阶段1:CanCommit - 询问参与者是否可以执行事务
*/
private boolean canCommitPhase(String transactionId, TransactionContext context) {
log.info("开始CanCommit阶段,事务ID: {}", transactionId);
List<CompletableFuture<Boolean>> canCommitFutures = new ArrayList<>();
for (TransactionParticipant participant : participants) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
CanCommitRequest request = new CanCommitRequest(transactionId, context);
CanCommitResponse response = participant.canCommit(request);
return response.isCanCommit();
} catch (Exception e) {
log.error("参与者 {} CanCommit阶段异常", participant.getId(), e);
return false;
}
});
canCommitFutures.add(future);
}
try {
// 设置超时时间
List<Boolean> results = canCommitFutures.stream()
.map(future -> {
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("CanCommit阶段超时");
return false;
} catch (Exception e) {
log.error("CanCommit阶段异常", e);
return false;
}
})
.collect(Collectors.toList());
boolean allCanCommit = results.stream().allMatch(Boolean::booleanValue);
if (allCanCommit) {
log.info("所有参与者CanCommit成功,事务ID: {}", transactionId);
} else {
log.warn("部分参与者CanCommit失败,事务ID: {}", transactionId);
}
return allCanCommit;
} catch (Exception e) {
log.error("CanCommit阶段异常,事务ID: {}", transactionId, e);
return false;
}
}
/**
* 阶段2:PreCommit - 执行事务但不提交
*/
private boolean preCommitPhase(String transactionId, TransactionContext context) {
log.info("开始PreCommit阶段,事务ID: {}", transactionId);
List<CompletableFuture<Boolean>> preCommitFutures = new ArrayList<>();
for (TransactionParticipant participant : participants) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
PreCommitRequest request = new PreCommitRequest(transactionId, context);
PreCommitResponse response = participant.preCommit(request);
return response.isSuccess();
} catch (Exception e) {
log.error("参与者 {} PreCommit阶段异常", participant.getId(), e);
return false;
}
});
preCommitFutures.add(future);
}
try {
List<Boolean> results = preCommitFutures.stream()
.map(future -> {
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("PreCommit阶段超时");
return false;
} catch (Exception e) {
log.error("PreCommit阶段异常", e);
return false;
}
})
.collect(Collectors.toList());
boolean allPreCommitted = results.stream().allMatch(Boolean::booleanValue);
if (allPreCommitted) {
transactionLog.logPreCommitSuccess(transactionId);
log.info("所有参与者PreCommit成功,事务ID: {}", transactionId);
} else {
transactionLog.logPreCommitFailure(transactionId);
log.warn("部分参与者PreCommit失败,事务ID: {}", transactionId);
}
return allPreCommitted;
} catch (Exception e) {
log.error("PreCommit阶段异常,事务ID: {}", transactionId, e);
return false;
}
}
/**
* 阶段3:DoCommit - 正式提交事务
*/
private boolean doCommitPhase(String transactionId) {
log.info("开始DoCommit阶段,事务ID: {}", transactionId);
// 3PC的改进:即使协调者故障,参与者也会在超时后自动提交
// 因为能到达DoCommit阶段说明所有参与者都已经PreCommit成功
for (TransactionParticipant participant : participants) {
// 异步发送DoCommit请求,不等待响应
CompletableFuture.runAsync(() -> {
try {
DoCommitRequest request = new DoCommitRequest(transactionId);
participant.doCommit(request);
log.info("参与者 {} DoCommit成功", participant.getId());
} catch (Exception e) {
log.error("参与者 {} DoCommit失败", participant.getId(), e);
// 在3PC中,DoCommit失败的影响较小,因为参与者会自动提交
}
});
}
transactionLog.logDoCommitComplete(transactionId);
return true;
}
private boolean abortPhase(String transactionId) {
log.info("开始Abort阶段,事务ID: {}", transactionId);
for (TransactionParticipant participant : participants) {
try {
AbortRequest request = new AbortRequest(transactionId);
participant.abort(request);
log.info("参与者 {} Abort成功", participant.getId());
} catch (Exception e) {
log.error("参与者 {} Abort失败", participant.getId(), e);
}
}
return true;
}
}TCC是一种柔性事务解决方案,将业务逻辑分为Try、Confirm、Cancel三个阶段。
/**
* TCC模式的业务接口定义
*/
public interface TccTransactionService {
/**
* Try阶段:尝试执行业务逻辑,预留资源
* @param context 事务上下文
* @return 是否成功
*/
boolean tryExecute(TccTransactionContext context);
/**
* Confirm阶段:确认执行,提交业务逻辑
* @param context 事务上下文
* @return 是否成功
*/
boolean confirmExecute(TccTransactionContext context);
/**
* Cancel阶段:取消执行,回滚业务逻辑
* @param context 事务上下文
* @return 是否成功
*/
boolean cancelExecute(TccTransactionContext context);
}
/**
* 库存服务的TCC实现
*/
@Service
public class InventoryTccService implements TccTransactionService {
@Autowired
private InventoryRepository inventoryRepository;
@Autowired
private InventoryFreezeRepository freezeRepository;
/**
* Try阶段:冻结库存
*/
@Override
@Transactional
public boolean tryExecute(TccTransactionContext context) {
String productId = context.getProductId();
int quantity = context.getQuantity();
String transactionId = context.getTransactionId();
try {
// 1. 检查库存是否充足
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory == null || inventory.getAvailableQuantity() < quantity) {
log.warn("库存不足,产品ID: {}, 需要数量: {}, 可用数量: {}",
productId, quantity, inventory != null ? inventory.getAvailableQuantity() : 0);
return false;
}
// 2. 冻结库存
inventory.setAvailableQuantity(inventory.getAvailableQuantity() - quantity);
inventory.setFrozenQuantity(inventory.getFrozenQuantity() + quantity);
inventoryRepository.save(inventory);
// 3. 记录冻结记录
InventoryFreeze freeze = new InventoryFreeze();
freeze.setTransactionId(transactionId);
freeze.setProductId(productId);
freeze.setQuantity(quantity);
freeze.setStatus(FreezeStatus.FROZEN);
freeze.setCreateTime(LocalDateTime.now());
freezeRepository.save(freeze);
log.info("库存冻结成功,事务ID: {}, 产品ID: {}, 数量: {}", transactionId, productId, quantity);
return true;
} catch (Exception e) {
log.error("库存冻结失败,事务ID: {}", transactionId, e);
return false;
}
}
/**
* Confirm阶段:确认扣减库存
*/
@Override
@Transactional
public boolean confirmExecute(TccTransactionContext context) {
String transactionId = context.getTransactionId();
String productId = context.getProductId();
int quantity = context.getQuantity();
try {
// 1. 查找冻结记录
InventoryFreeze freeze = freezeRepository.findByTransactionId(transactionId);
if (freeze == null) {
log.warn("未找到冻结记录,事务ID: {}", transactionId);
return false;
}
if (freeze.getStatus() == FreezeStatus.CONFIRMED) {
log.info("库存已确认扣减,事务ID: {}", transactionId);
return true; // 幂等性处理
}
// 2. 确认扣减库存
Inventory inventory = inventoryRepository.findByProductId(productId);
inventory.setFrozenQuantity(inventory.getFrozenQuantity() - quantity);
// 注意:这里不需要再减少totalQuantity,因为在Try阶段已经从available转移到frozen
inventoryRepository.save(inventory);
// 3. 更新冻结记录状态
freeze.setStatus(FreezeStatus.CONFIRMED);
freeze.setUpdateTime(LocalDateTime.now());
freezeRepository.save(freeze);
log.info("库存确认扣减成功,事务ID: {}, 产品ID: {}, 数量: {}", transactionId, productId, quantity);
return true;
} catch (Exception e) {
log.error("库存确认扣减失败,事务ID: {}", transactionId, e);
return false;
}
}
/**
* Cancel阶段:取消冻结,恢复库存
*/
@Override
@Transactional
public boolean cancelExecute(TccTransactionContext context) {
String transactionId = context.getTransactionId();
String productId = context.getProductId();
int quantity = context.getQuantity();
try {
// 1. 查找冻结记录
InventoryFreeze freeze = freezeRepository.findByTransactionId(transactionId);
if (freeze == null) {
log.info("未找到冻结记录,可能已经取消,事务ID: {}", transactionId);
return true; // 幂等性处理
}
if (freeze.getStatus() == FreezeStatus.CANCELLED) {
log.info("库存已取消冻结,事务ID: {}", transactionId);
return true; // 幂等性处理
}
// 2. 恢复库存
Inventory inventory = inventoryRepository.findByProductId(productId);
inventory.setAvailableQuantity(inventory.getAvailableQuantity() + quantity);
inventory.setFrozenQuantity(inventory.getFrozenQuantity() - quantity);
inventoryRepository.save(inventory);
// 3. 更新冻结记录状态
freeze.setStatus(FreezeStatus.CANCELLED);
freeze.setUpdateTime(LocalDateTime.now());
freezeRepository.save(freeze);
log.info("库存取消冻结成功,事务ID: {}, 产品ID: {}, 数量: {}", transactionId, productId, quantity);
return true;
} catch (Exception e) {
log.error("库存取消冻结失败,事务ID: {}", transactionId, e);
return false;
}
}
}/**
* TCC事务管理器
*/
@Component
public class TccTransactionManager {
private Map<String, List<TccTransactionService>> transactionServices = new ConcurrentHashMap<>();
private TccTransactionLog transactionLog;
/**
* 执行TCC事务
*/
public boolean executeTransaction(String transactionId, List<TccTransactionContext> contexts) {
log.info("开始执行TCC事务,事务ID: {}", transactionId);
List<TccTransactionService> services = new ArrayList<>();
// Try阶段:尝试执行所有业务逻辑
boolean tryResult = tryPhase(transactionId, contexts, services);
if (tryResult) {
// Confirm阶段:确认执行
return confirmPhase(transactionId, contexts, services);
} else {
// Cancel阶段:取消执行
return cancelPhase(transactionId, contexts, services);
}
}
/**
* Try阶段
*/
private boolean tryPhase(String transactionId, List<TccTransactionContext> contexts,
List<TccTransactionService> services) {
log.info("开始Try阶段,事务ID: {}", transactionId);
transactionLog.logTryStart(transactionId);
for (TccTransactionContext context : contexts) {
try {
TccTransactionService service = getServiceByType(context.getServiceType());
services.add(service);
boolean result = service.tryExecute(context);
if (!result) {
log.warn("Try阶段失败,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId);
transactionLog.logTryFailure(transactionId, context.getServiceType());
return false;
}
log.info("Try阶段成功,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId);
} catch (Exception e) {
log.error("Try阶段异常,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId, e);
transactionLog.logTryFailure(transactionId, context.getServiceType());
return false;
}
}
transactionLog.logTrySuccess(transactionId);
transactionServices.put(transactionId, services);
return true;
}
/**
* Confirm阶段
*/
private boolean confirmPhase(String transactionId, List<TccTransactionContext> contexts,
List<TccTransactionService> services) {
log.info("开始Confirm阶段,事务ID: {}", transactionId);
transactionLog.logConfirmStart(transactionId);
boolean allConfirmed = true;
for (int i = 0; i < contexts.size(); i++) {
TccTransactionContext context = contexts.get(i);
TccTransactionService service = services.get(i);
try {
boolean result = service.confirmExecute(context);
if (!result) {
log.error("Confirm阶段失败,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId);
allConfirmed = false;
// 注意:即使某个服务Confirm失败,也要继续尝试其他服务
// 因为Try阶段已经成功,需要尽力保证最终一致性
}
} catch (Exception e) {
log.error("Confirm阶段异常,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId, e);
allConfirmed = false;
}
}
if (allConfirmed) {
transactionLog.logConfirmSuccess(transactionId);
log.info("Confirm阶段全部成功,事务ID: {}", transactionId);
} else {
transactionLog.logConfirmFailure(transactionId);
log.error("Confirm阶段部分失败,事务ID: {}", transactionId);
// 启动重试机制
scheduleRetryConfirm(transactionId, contexts, services);
}
return allConfirmed;
}
/**
* Cancel阶段
*/
private boolean cancelPhase(String transactionId, List<TccTransactionContext> contexts,
List<TccTransactionService> services) {
log.info("开始Cancel阶段,事务ID: {}", transactionId);
transactionLog.logCancelStart(transactionId);
// 逆序执行Cancel操作
Collections.reverse(contexts);
Collections.reverse(services);
boolean allCancelled = true;
for (int i = 0; i < contexts.size(); i++) {
TccTransactionContext context = contexts.get(i);
TccTransactionService service = services.get(i);
try {
boolean result = service.cancelExecute(context);
if (!result) {
log.error("Cancel阶段失败,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId);
allCancelled = false;
}
} catch (Exception e) {
log.error("Cancel阶段异常,服务类型: {}, 事务ID: {}", context.getServiceType(), transactionId, e);
allCancelled = false;
}
}
if (allCancelled) {
transactionLog.logCancelSuccess(transactionId);
log.info("Cancel阶段全部成功,事务ID: {}", transactionId);
} else {
transactionLog.logCancelFailure(transactionId);
log.error("Cancel阶段部分失败,事务ID: {}", transactionId);
// 启动重试机制
scheduleRetryCancel(transactionId, contexts, services);
}
return allCancelled;
}
/**
* 重试Confirm操作
*/
private void scheduleRetryConfirm(String transactionId, List<TccTransactionContext> contexts,
List<TccTransactionService> services) {
// 实现重试逻辑,可以使用定时任务或消息队列
CompletableFuture.runAsync(() -> {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
Thread.sleep(5000 * (retryCount + 1)); // 指数退避
boolean result = confirmPhase(transactionId, contexts, services);
if (result) {
log.info("重试Confirm成功,事务ID: {}, 重试次数: {}", transactionId, retryCount + 1);
return;
}
} catch (Exception e) {
log.error("重试Confirm异常,事务ID: {}, 重试次数: {}", transactionId, retryCount + 1, e);
}
retryCount++;
}
log.error("重试Confirm最终失败,事务ID: {}, 需要人工干预", transactionId);
// 发送告警或记录到死信队列
});
}
private void scheduleRetryCancel(String transactionId, List<TccTransactionContext> contexts,
List<TccTransactionService> services) {
// 类似的重试Cancel逻辑
// ...
}
private TccTransactionService getServiceByType(String serviceType) {
// 根据服务类型获取对应的TCC服务实现
// 可以使用Spring的ApplicationContext或服务注册表
return null;
}
}本地消息表是一种常用的分布式事务解决方案,通过将业务操作和消息发送放在同一个本地事务中来保证一致性。
/**
* 本地消息表实现
*/
@Service
@Transactional
public class LocalMessageTableService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private LocalMessageRepository messageRepository;
@Autowired
private MessagePublisher messagePublisher;
/**
* 创建订单并发送消息
* 使用本地事务保证订单创建和消息记录的一致性
*/
public void createOrderWithMessage(OrderRequest request) {
String transactionId = UUID.randomUUID().toString();
try {
// 1. 创建订单
Order order = createOrder(request, transactionId);
// 2. 在同一个事务中记录待发送的消息
recordLocalMessage(transactionId, "INVENTORY_DEDUCTION",
createInventoryDeductionMessage(order));
recordLocalMessage(transactionId, "PAYMENT_PROCESS",
createPaymentMessage(order));
recordLocalMessage(transactionId, "NOTIFICATION_SEND",
createNotificationMessage(order));
log.info("订单创建成功,事务ID: {}, 订单ID: {}", transactionId, order.getId());
} catch (Exception e) {
log.error("订单创建失败,事务ID: {}", transactionId, e);
throw e;
}
}
private Order createOrder(OrderRequest request, String transactionId) {
Order order = new Order();
order.setTransactionId(transactionId);
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(LocalDateTime.now());
return orderRepository.save(order);
}
private void recordLocalMessage(String transactionId, String messageType, Object messageContent) {
LocalMessage message = new LocalMessage();
message.setTransactionId(transactionId);
message.setMessageType(messageType);
message.setMessageContent(JSON.toJSONString(messageContent));
message.setStatus(MessageStatus.PENDING);
message.setRetryCount(0);
message.setCreateTime(LocalDateTime.now());
message.setNextRetryTime(LocalDateTime.now().plusSeconds(5));
messageRepository.save(message);
}
private InventoryDeductionMessage createInventoryDeductionMessage(Order order) {
return InventoryDeductionMessage.builder()
.transactionId(order.getTransactionId())
.orderId(order.getId())
.productId(order.getProductId())
.quantity(order.getQuantity())
.build();
}
private PaymentMessage createPaymentMessage(Order order) {
return PaymentMessage.builder()
.transactionId(order.getTransactionId())
.orderId(order.getId())
.userId(order.getUserId())
.amount(order.getAmount())
.build();
}
private NotificationMessage createNotificationMessage(Order order) {
return NotificationMessage.builder()
.transactionId(order.getTransactionId())
.orderId(order.getId())
.userId(order.getUserId())
.messageType("ORDER_CREATED")
.build();
}
}
/**
* 消息发送定时任务
*/
@Component
public class MessagePublishScheduler {
@Autowired
private LocalMessageRepository messageRepository;
@Autowired
private MessagePublisher messagePublisher;
private final int MAX_RETRY_COUNT = 5;
/**
* 定时扫描并发送待发送的消息
*/
@Scheduled(fixedDelay = 5000) // 每5秒执行一次
public void publishPendingMessages() {
List<LocalMessage> pendingMessages = messageRepository.findPendingMessages(
LocalDateTime.now(), PageRequest.of(0, 100));
for (LocalMessage message : pendingMessages) {
try {
publishMessage(message);
} catch (Exception e) {
log.error("消息发送失败,消息ID: {}", message.getId(), e);
handlePublishFailure(message);
}
}
}
private void publishMessage(LocalMessage message) {
log.info("开始发送消息,消息ID: {}, 类型: {}", message.getId(), message.getMessageType());
// 根据消息类型发送到不同的队列
switch (message.getMessageType()) {
case "INVENTORY_DEDUCTION":
messagePublisher.publishToInventoryQueue(message.getMessageContent());
break;
case "PAYMENT_PROCESS":
messagePublisher.publishToPaymentQueue(message.getMessageContent());
break;
case "NOTIFICATION_SEND":
messagePublisher.publishToNotificationQueue(message.getMessageContent());
break;
default:
log.warn("未知的消息类型: {}", message.getMessageType());
return;
}
// 更新消息状态为已发送
message.setStatus(MessageStatus.SENT);
message.setSentTime(LocalDateTime.now());
messageRepository.save(message);
log.info("消息发送成功,消息ID: {}", message.getId());
}
private void handlePublishFailure(LocalMessage message) {
message.setRetryCount(message.getRetryCount() + 1);
if (message.getRetryCount() >= MAX_RETRY_COUNT) {
// 超过最大重试次数,标记为失败
message.setStatus(MessageStatus.FAILED);
log.error("消息发送最终失败,消息ID: {}, 重试次数: {}", message.getId(), message.getRetryCount());
// 可以发送告警或记录到死信队列
sendFailureAlert(message);
} else {
// 计算下次重试时间(指数退避)
long delaySeconds = (long) Math.pow(2, message.getRetryCount()) * 5;
message.setNextRetryTime(LocalDateTime.now().plusSeconds(delaySeconds));
log.warn("消息发送失败,将在{}秒后重试,消息ID: {}, 重试次数: {}",
delaySeconds, message.getId(), message.getRetryCount());
}
messageRepository.save(message);
}
private void sendFailureAlert(LocalMessage message) {
// 发送告警通知
AlertMessage alert = AlertMessage.builder()
.level(AlertLevel.ERROR)
.title("分布式事务消息发送失败")
.content(String.format("消息ID: %s, 事务ID: %s, 消息类型: %s",
message.getId(), message.getTransactionId(), message.getMessageType()))
.build();
// 发送告警(邮件、短信、钉钉等)
alertService.sendAlert(alert);
}
}/**
* 库存扣减消息消费者
*/
@Component
@RabbitListener(queues = "inventory.deduction.queue")
public class InventoryDeductionConsumer {
@Autowired
private InventoryService inventoryService;
@Autowired
private MessagePublisher messagePublisher;
@RabbitHandler
public void handleInventoryDeduction(String messageContent,
@Header Map<String, Object> headers,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
InventoryDeductionMessage message = JSON.parseObject(messageContent, InventoryDeductionMessage.class);
String transactionId = message.getTransactionId();
try {
log.info("开始处理库存扣减,事务ID: {}, 订单ID: {}", transactionId, message.getOrderId());
// 执行库存扣减
boolean success = inventoryService.deductStock(message.getProductId(), message.getQuantity());
if (success) {
// 库存扣减成功,发送成功消息
messagePublisher.publishInventoryDeductionResult(transactionId, true, "库存扣减成功");
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("库存扣减成功,事务ID: {}", transactionId);
} else {
// 库存不足,发送失败消息
messagePublisher.publishInventoryDeductionResult(transactionId, false, "库存不足");
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
log.warn("库存不足,事务ID: {}", transactionId);
}
} catch (Exception e) {
log.error("库存扣减处理异常,事务ID: {}", transactionId, e);
try {
// 发送异常消息
messagePublisher.publishInventoryDeductionResult(transactionId, false, "系统异常: " + e.getMessage());
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("消息确认失败", ioException);
}
}
}
}通过本地消息表模式,我们实现了最终一致性的分布式事务。这种模式的优点是实现简单、性能较好,缺点是需要额外的存储空间和定时任务来处理消息发送。
Seata是阿里巴巴开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata 官网

/**
* Seata配置类
*/
@Configuration
public class SeataConfiguration {
/**
* 配置数据源代理,支持分布式事务
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return new DataSourceProxy(druidDataSource);
}
/**
* 配置事务管理器
*/
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
/**
* 配置全局事务扫描器
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("order-service", "my_test_tx_group");
}
}
/**
* 订单服务 - 事务发起者
*/
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
/**
* 创建订单的全局事务
* @GlobalTransactional 注解开启全局事务
*/
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(OrderRequest request) {
log.info("开始创建订单,用户ID: {}, 商品ID: {}, 数量: {}, 金额: {}",
request.getUserId(), request.getProductId(), request.getCount(), request.getMoney());
// 1. 创建订单记录
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setCount(request.getCount());
order.setMoney(request.getMoney());
order.setStatus(0); // 0: 创建中
orderMapper.create(order);
log.info("订单创建成功,订单ID: {}", order.getId());
// 2. 扣减库存 - 远程调用
log.info("开始扣减库存");
storageService.decrease(request.getProductId(), request.getCount());
log.info("库存扣减成功");
// 3. 扣减账户余额 - 远程调用
log.info("开始扣减账户余额");
accountService.decrease(request.getUserId(), request.getMoney());
log.info("账户余额扣减成功");
// 4. 更新订单状态
log.info("开始更新订单状态");
orderMapper.update(order.getId(), 1); // 1: 已完成
log.info("订单状态更新成功");
log.info("订单创建完成,订单ID: {}", order.getId());
}
}
/**
* 账户服务 - 事务参与者
*/
@Service
public class AccountService {
@Autowired
private AccountMapper accountMapper;
/**
* 扣减账户余额
* 使用 @Transactional 注解,会自动加入全局事务
*/
@Transactional(rollbackFor = Exception.class)
public void decrease(Long userId, BigDecimal money) {
log.info("开始扣减账户余额,用户ID: {}, 扣减金额: {}", userId, money);
// 检查账户余额
Account account = accountMapper.selectByUserId(userId);
if (account == null) {
throw new RuntimeException("账户不存在,用户ID: " + userId);
}
if (account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户余额不足,当前余额: " + account.getResidue() + ", 需要扣减: " + money);
}
// 扣减余额
accountMapper.decrease(userId, money);
log.info("账户余额扣减成功,用户ID: {}, 扣减金额: {}", userId, money);
// 模拟业务异常
if (money.compareTo(new BigDecimal("1000")) > 0) {
throw new RuntimeException("单笔交易金额不能超过1000元");
}
}
}
/**
* 库存服务 - 事务参与者
*/
@Service
public class StorageService {
@Autowired
private StorageMapper storageMapper;
/**
* 扣减库存
*/
@Transactional(rollbackFor = Exception.class)
public void decrease(Long productId, Integer count) {
log.info("开始扣减库存,商品ID: {}, 扣减数量: {}", productId, count);
// 检查库存
Storage storage = storageMapper.selectByProductId(productId);
if (storage == null) {
throw new RuntimeException("商品不存在,商品ID: " + productId);
}
if (storage.getResidue() < count) {
throw new RuntimeException("库存不足,当前库存: " + storage.getResidue() + ", 需要扣减: " + count);
}
// 扣减库存
storageMapper.decrease(productId, count);
log.info("库存扣减成功,商品ID: {}, 扣减数量: {}", productId, count);
}
}Seata支持四种事务模式:AT、TCC、SAGA、XA。
/**
* TCC模式实现示例
*/
@LocalTCC
public interface AccountTccService {
/**
* Try阶段:尝试执行业务
* 预留资源,但不提交
*/
@TwoPhaseBusinessAction(name = "accountTcc", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money);
/**
* Confirm阶段:确认执行业务
* 真正执行业务逻辑
*/
boolean commit(BusinessActionContext context);
/**
* Cancel阶段:取消执行业务
* 释放预留资源
*/
boolean rollback(BusinessActionContext context);
}
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountFreezeMapper freezeMapper;
@Override
@Transactional
public boolean prepare(Long userId, BigDecimal money) {
log.info("TCC Try阶段 - 冻结账户余额,用户ID: {}, 金额: {}", userId, money);
// 1. 检查账户余额
Account account = accountMapper.selectByUserId(userId);
if (account == null || account.getResidue().compareTo(money) < 0) {
throw new RuntimeException("账户余额不足");
}
// 2. 冻结金额
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setFreezeMoney(money);
freeze.setState(AccountFreeze.State.TRY);
freezeMapper.insert(freeze);
// 3. 扣减可用余额
accountMapper.deduct(userId, money);
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext context) {
Long userId = Long.valueOf(context.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
log.info("TCC Confirm阶段 - 确认扣减,用户ID: {}, 金额: {}", userId, money);
// 1. 查找冻结记录
AccountFreeze freeze = freezeMapper.selectByUserIdAndMoney(userId, money);
if (freeze == null) {
return true; // 幂等处理
}
// 2. 删除冻结记录
freezeMapper.deleteById(freeze.getId());
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext context) {
Long userId = Long.valueOf(context.getActionContext("userId").toString());
BigDecimal money = new BigDecimal(context.getActionContext("money").toString());
log.info("TCC Cancel阶段 - 回滚操作,用户ID: {}, 金额: {}", userId, money);
// 1. 查找冻结记录
AccountFreeze freeze = freezeMapper.selectByUserIdAndMoney(userId, money);
if (freeze == null) {
return true; // 幂等处理
}
// 2. 恢复可用余额
accountMapper.restore(userId, money);
// 3. 删除冻结记录
freezeMapper.deleteById(freeze.getId());
return true;
}
}/**
* ShardingSphere分布式事务配置
*/
@Configuration
@EnableTransactionManagement
public class ShardingSphereTransactionConfiguration {
/**
* 配置分片数据源
*/
@Bean
public DataSource shardingDataSource() throws SQLException {
// 配置真实数据源
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("ds0", createDataSource("jdbc:mysql://localhost:3306/demo_ds_0"));
dataSourceMap.put("ds1", createDataSource("jdbc:mysql://localhost:3306/demo_ds_1"));
// 配置分片规则
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
// 配置分布式事务
Properties props = new Properties();
props.setProperty("sql-show", "true");
return ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, props);
}
private DataSource createDataSource(String url) {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(url);
dataSource.setUsername("root");
dataSource.setPassword("password");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
return dataSource;
}
private TableRuleConfiguration getOrderTableRuleConfiguration() {
TableRuleConfiguration result = new TableRuleConfiguration("t_order", "ds${0..1}.t_order_${0..1}");
result.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "ds${user_id % 2}"));
result.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_id", "t_order_${order_id % 2}"));
return result;
}
private TableRuleConfiguration getOrderItemTableRuleConfiguration() {
TableRuleConfiguration result = new TableRuleConfiguration("t_order_item", "ds${0..1}.t_order_item_${0..1}");
result.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "ds${user_id % 2}"));
result.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_id", "t_order_item_${order_id % 2}"));
return result;
}
}
/**
* 使用ShardingSphere分布式事务的服务
*/
@Service
public class ShardingOrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OrderItemRepository orderItemRepository;
/**
* 创建订单 - 跨分片事务
* 使用XA事务保证强一致性
*/
@ShardingTransactionType(TransactionType.XA)
@Transactional
public void createOrder(CreateOrderRequest request) {
log.info("开始创建分片订单,用户ID: {}", request.getUserId());
// 1. 创建主订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setOrderId(generateOrderId());
order.setStatus("CREATED");
order.setTotalAmount(request.getTotalAmount());
order.setCreateTime(new Date());
orderRepository.save(order);
log.info("主订单创建成功,订单ID: {}", order.getOrderId());
// 2. 创建订单明细(可能分布在不同分片)
for (OrderItemRequest itemRequest : request.getItems()) {
OrderItem item = new OrderItem();
item.setOrderId(order.getOrderId());
item.setUserId(request.getUserId());
item.setProductId(itemRequest.getProductId());
item.setQuantity(itemRequest.getQuantity());
item.setPrice(itemRequest.getPrice());
item.setCreateTime(new Date());
orderItemRepository.save(item);
log.info("订单明细创建成功,商品ID: {}", itemRequest.getProductId());
}
// 3. 模拟业务异常测试事务回滚
if (request.getTotalAmount().compareTo(new BigDecimal("10000")) > 0) {
throw new RuntimeException("订单金额超过限制");
}
log.info("分片订单创建完成,订单ID: {}", order.getOrderId());
}
/**
* 使用BASE事务处理最终一致性场景
*/
@ShardingTransactionType(TransactionType.BASE)
@Transactional
public void createOrderWithEventualConsistency(CreateOrderRequest request) {
log.info("开始创建订单(最终一致性),用户ID: {}", request.getUserId());
try {
// 业务逻辑处理
createOrderInternal(request);
} catch (Exception e) {
log.error("订单创建失败,将进行补偿处理", e);
// 在BASE模式下,异常会触发补偿逻辑
throw e;
}
}
private void createOrderInternal(CreateOrderRequest request) {
// 具体的订单创建逻辑
// ...
}
private Long generateOrderId() {
return System.currentTimeMillis();
}
}/**
* RocketMQ事务消息配置
*/
@Configuration
public class RocketMQTransactionConfiguration {
@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer("order_transaction_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new OrderTransactionListener());
// 设置线程池
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
return producer;
}
}
/**
* 事务消息监听器
*/
@Component
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private TransactionLogService transactionLogService;
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
log.info("执行本地事务,事务ID: {}", transactionId);
try {
// 执行本地业务逻辑
OrderTransactionContext context = (OrderTransactionContext) arg;
orderService.createOrderLocal(context.getOrderRequest());
// 记录事务日志
transactionLogService.recordTransaction(transactionId, "COMMIT", context);
log.info("本地事务执行成功,事务ID: {}", transactionId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败,事务ID: {}", transactionId, e);
// 记录失败日志
transactionLogService.recordTransaction(transactionId, "ROLLBACK", arg);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 检查本地事务状态(消息回查)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
log.info("检查本地事务状态,事务ID: {}", transactionId);
// 查询事务日志
TransactionLog transactionLog = transactionLogService.getByTransactionId(transactionId);
if (transactionLog == null) {
log.warn("未找到事务日志,事务ID: {}", transactionId);
return LocalTransactionState.UNKNOW;
}
switch (transactionLog.getStatus()) {
case "COMMIT":
return LocalTransactionState.COMMIT_MESSAGE;
case "ROLLBACK":
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
}
}
/**
* 订单事务消息服务
*/
@Service
public class OrderTransactionMessageService {
@Autowired
private TransactionMQProducer transactionMQProducer;
/**
* 发送事务消息
*/
public void sendTransactionMessage(OrderRequest orderRequest) {
String transactionId = UUID.randomUUID().toString();
// 构造消息
Message message = new Message();
message.setTopic("ORDER_TOPIC");
message.setTags("CREATE_ORDER");
message.setKeys(orderRequest.getUserId().toString());
message.setBody(JSON.toJSONString(orderRequest).getBytes());
message.setTransactionId(transactionId);
// 构造事务上下文
OrderTransactionContext context = new OrderTransactionContext();
context.setTransactionId(transactionId);
context.setOrderRequest(orderRequest);
try {
log.info("开始发送事务消息,事务ID: {}", transactionId);
// 发送事务消息
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, context);
log.info("事务消息发送结果,事务ID: {}, 结果: {}", transactionId, sendResult.getSendStatus());
} catch (Exception e) {
log.error("事务消息发送失败,事务ID: {}", transactionId, e);
throw new RuntimeException("事务消息发送失败", e);
}
}
}
/**
* 订单消息消费者
*/
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class OrderMessageConsumer implements RocketMQListener<String> {
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Override
public void onMessage(String message) {
log.info("收到订单消息: {}", message);
try {
OrderRequest orderRequest = JSON.parseObject(message, OrderRequest.class);
// 处理下游业务
processDownstreamBusiness(orderRequest);
log.info("订单消息处理成功,订单ID: {}", orderRequest.getOrderId());
} catch (Exception e) {
log.error("订单消息处理失败: {}", message, e);
throw e; // 抛出异常触发重试
}
}
private void processDownstreamBusiness(OrderRequest orderRequest) {
// 1. 扣减库存
inventoryService.deductStock(orderRequest.getProductId(), orderRequest.getQuantity());
// 2. 处理支付
paymentService.processPayment(orderRequest.getUserId(), orderRequest.getAmount());
// 3. 发送通知
// notificationService.sendOrderNotification(orderRequest);
}
}/**
* 分布式事务框架选型决策器
*/
@Component
public class TransactionFrameworkSelector {
/**
* 根据业务场景选择合适的分布式事务方案
*/
public TransactionSolution selectSolution(BusinessScenario scenario) {
// 高一致性要求场景
if (scenario.getConsistencyLevel() == ConsistencyLevel.STRONG) {
if (scenario.getPerformanceRequirement() == PerformanceLevel.HIGH) {
return TransactionSolution.SEATA_AT;
} else {
return TransactionSolution.SEATA_XA;
}
}
// 最终一致性场景
if (scenario.getConsistencyLevel() == ConsistencyLevel.EVENTUAL) {
if (scenario.hasComplexCompensation()) {
return TransactionSolution.SEATA_SAGA;
} else {
return TransactionSolution.ROCKETMQ_TRANSACTION;
}
}
// 分库分表场景
if (scenario.hasSharding()) {
return TransactionSolution.SHARDING_SPHERE_XA;
}
// 默认推荐
return TransactionSolution.SEATA_AT;
}
/**
* 框架特性对比
*/
public void compareFrameworks() {
log.info("=== 分布式事务框架对比 ===");
// Seata AT模式
log.info("Seata AT模式:");
log.info("- 优点: 无侵入、性能好、使用简单");
log.info("- 缺点: 依赖数据库行锁、不支持跨数据库类型");
log.info("- 适用场景: 关系型数据库、高性能要求");
// Seata TCC模式
log.info("Seata TCC模式:");
log.info("- 优点: 性能最好、支持跨数据库");
log.info("- 缺点: 代码侵入性强、开发复杂");
log.info("- 适用场景: 对性能要求极高、业务逻辑复杂");
// RocketMQ事务消息
log.info("RocketMQ事务消息:");
log.info("- 优点: 最终一致性、高可用、解耦");
log.info("- 缺点: 不支持强一致性、消息可能重复");
log.info("- 适用场景: 异步处理、最终一致性要求");
// ShardingSphere
log.info("ShardingSphere分布式事务:");
log.info("- 优点: 与分库分表完美结合");
log.info("- 缺点: 主要针对分片场景");
log.info("- 适用场景: 分库分表环境");
}
}
/**
* 业务场景枚举
*/
public class BusinessScenario {
private ConsistencyLevel consistencyLevel;
private PerformanceLevel performanceRequirement;
private boolean hasSharding;
private boolean hasComplexCompensation;
// getters and setters...
}
enum ConsistencyLevel {
STRONG, // 强一致性
EVENTUAL // 最终一致性
}
enum PerformanceLevel {
HIGH, // 高性能要求
MEDIUM, // 中等性能要求
LOW // 低性能要求
}
enum TransactionSolution {
SEATA_AT,
SEATA_TCC,
SEATA_SAGA,
SEATA_XA,
ROCKETMQ_TRANSACTION,
SHARDING_SPHERE_XA,
LOCAL_MESSAGE_TABLE
}/**
* 分布式事务性能监控器
*/
@Component
public class TransactionPerformanceMonitor {
private final MeterRegistry meterRegistry;
private final Timer transactionTimer;
private final Counter successCounter;
private final Counter failureCounter;
public TransactionPerformanceMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.transactionTimer = Timer.builder("distributed.transaction.duration")
.description("分布式事务执行时间")
.register(meterRegistry);
this.successCounter = Counter.builder("distributed.transaction.success")
.description("分布式事务成功次数")
.register(meterRegistry);
this.failureCounter = Counter.builder("distributed.transaction.failure")
.description("分布式事务失败次数")
.register(meterRegistry);
}
/**
* 监控事务执行性能
*/
public <T> T monitorTransaction(String transactionName, Supplier<T> transactionLogic) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
log.info("开始执行分布式事务: {}", transactionName);
long startTime = System.currentTimeMillis();
T result = transactionLogic.get();
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
// 记录成功指标
successCounter.increment();
sample.stop(transactionTimer);
log.info("分布式事务执行成功: {}, 耗时: {}ms", transactionName, duration);
// 性能告警
if (duration > 5000) {
log.warn("分布式事务执行时间过长: {}, 耗时: {}ms", transactionName, duration);
sendPerformanceAlert(transactionName, duration);
}
return result;
} catch (Exception e) {
// 记录失败指标
failureCounter.increment();
sample.stop(transactionTimer);
log.error("分布式事务执行失败: {}", transactionName, e);
throw e;
}
}
/**
* 分析性能瓶颈
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void analyzePerformanceBottlenecks() {
// 获取事务执行统计
Timer.Snapshot snapshot = transactionTimer.takeSnapshot();
double avgDuration = snapshot.mean(TimeUnit.MILLISECONDS);
double maxDuration = snapshot.max(TimeUnit.MILLISECONDS);
double p95Duration = snapshot.percentile(0.95, TimeUnit.MILLISECONDS);
log.info("分布式事务性能统计 - 平均耗时: {}ms, 最大耗时: {}ms, P95耗时: {}ms",
avgDuration, maxDuration, p95Duration);
// 性能瓶颈分析
if (p95Duration > 3000) {
log.warn("检测到性能瓶颈 - P95耗时超过3秒: {}ms", p95Duration);
analyzeBottleneckCauses();
}
}
private void analyzeBottleneckCauses() {
log.info("开始分析性能瓶颈原因:");
log.info("1. 检查数据库连接池配置");
log.info("2. 检查网络延迟");
log.info("3. 检查事务参与者数量");
log.info("4. 检查业务逻辑复杂度");
log.info("5. 检查锁竞争情况");
}
private void sendPerformanceAlert(String transactionName, long duration) {
// 发送性能告警
// alertService.sendAlert(...);
}
}
/**
* 分布式事务优化器
*/
@Component
public class TransactionOptimizer {
/**
* 连接池优化配置
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariConfig hikariConfig() {
HikariConfig config = new HikariConfig();
// 优化连接池配置
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接数
config.setConnectionTimeout(30000); // 连接超时时间
config.setIdleTimeout(600000); // 空闲超时时间
config.setMaxLifetime(1800000); // 连接最大生命周期
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值
// 优化SQL执行
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
return config;
}
/**
* Seata客户端优化配置
*/
@Bean
public GlobalTransactionScanner optimizedGlobalTransactionScanner() {
GlobalTransactionScanner scanner = new GlobalTransactionScanner("order-service", "my_test_tx_group");
// 优化配置
System.setProperty("client.tm.commitRetryCount", "3");
System.setProperty("client.tm.rollbackRetryCount", "3");
System.setProperty("client.rm.asyncCommitBufferLimit", "10000");
System.setProperty("client.rm.reportRetryCount", "5");
System.setProperty("client.rm.tableMetaCheckEnable", "false");
return scanner;
}
/**
* 批量操作优化
*/
@Service
public static class BatchTransactionService {
@Autowired
private OrderMapper orderMapper;
/**
* 批量创建订单 - 优化版本
*/
@GlobalTransactional
public void createOrdersBatch(List<OrderRequest> requests) {
log.info("开始批量创建订单,数量: {}", requests.size());
// 1. 批量插入订单
List<Order> orders = requests.stream()
.map(this::convertToOrder)
.collect(Collectors.toList());
orderMapper.batchInsert(orders);
// 2. 批量处理库存扣减
Map<Long, Integer> productStockMap = requests.stream()
.collect(Collectors.groupingBy(
OrderRequest::getProductId,
Collectors.summingInt(OrderRequest::getCount)
));
for (Map.Entry<Long, Integer> entry : productStockMap.entrySet()) {
storageService.decrease(entry.getKey(), entry.getValue());
}
// 3. 批量处理账户扣减
Map<Long, BigDecimal> userAmountMap = requests.stream()
.collect(Collectors.groupingBy(
OrderRequest::getUserId,
Collectors.reducing(BigDecimal.ZERO, OrderRequest::getMoney, BigDecimal::add)
));
for (Map.Entry<Long, BigDecimal> entry : userAmountMap.entrySet()) {
accountService.decrease(entry.getKey(), entry.getValue());
}
log.info("批量订单创建完成,数量: {}", requests.size());
}
private Order convertToOrder(OrderRequest request) {
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setCount(request.getCount());
order.setMoney(request.getMoney());
order.setStatus(0);
return order;
}
}
}/**
* 分布式事务缓存优化
*/
@Component
public class TransactionCacheOptimizer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String ACCOUNT_CACHE_PREFIX = "account:";
private static final String INVENTORY_CACHE_PREFIX = "inventory:";
private static final int CACHE_EXPIRE_SECONDS = 300;
/**
* 账户信息缓存
*/
public Account getAccountWithCache(Long userId) {
String cacheKey = ACCOUNT_CACHE_PREFIX + userId;
// 先从缓存获取
Account account = (Account) redisTemplate.opsForValue().get(cacheKey);
if (account != null) {
log.debug("从缓存获取账户信息,用户ID: {}", userId);
return account;
}
// 缓存未命中,从数据库获取
account = accountMapper.selectByUserId(userId);
if (account != null) {
// 写入缓存
redisTemplate.opsForValue().set(cacheKey, account, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("账户信息写入缓存,用户ID: {}", userId);
}
return account;
}
/**
* 库存信息缓存
*/
public Storage getStorageWithCache(Long productId) {
String cacheKey = INVENTORY_CACHE_PREFIX + productId;
Storage storage = (Storage) redisTemplate.opsForValue().get(cacheKey);
if (storage != null) {
return storage;
}
storage = storageMapper.selectByProductId(productId);
if (storage != null) {
redisTemplate.opsForValue().set(cacheKey, storage, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
}
return storage;
}
/**
* 缓存失效策略
*/
public void invalidateAccountCache(Long userId) {
String cacheKey = ACCOUNT_CACHE_PREFIX + userId;
redisTemplate.delete(cacheKey);
log.debug("清除账户缓存,用户ID: {}", userId);
}
public void invalidateStorageCache(Long productId) {
String cacheKey = INVENTORY_CACHE_PREFIX + productId;
redisTemplate.delete(cacheKey);
log.debug("清除库存缓存,商品ID: {}", productId);
}
/**
* 预热缓存
*/
@PostConstruct
public void warmUpCache() {
log.info("开始预热缓存");
// 预热热点账户数据
List<Long> hotUserIds = getHotUserIds();
for (Long userId : hotUserIds) {
getAccountWithCache(userId);
}
// 预热热点商品库存数据
List<Long> hotProductIds = getHotProductIds();
for (Long productId : hotProductIds) {
getStorageWithCache(productId);
}
log.info("缓存预热完成");
}
private List<Long> getHotUserIds() {
// 获取热点用户ID列表
return Arrays.asList(1L, 2L, 3L, 4L, 5L);
}
private List<Long> getHotProductIds() {
// 获取热点商品ID列表
return Arrays.asList(1L, 2L, 3L, 4L, 5L);
}
}/**
* 分布式事务链路追踪
*/
@Component
public class TransactionTracing {
private final Tracer tracer;
public TransactionTracing(Tracer tracer) {
this.tracer = tracer;
}
/**
* 创建事务追踪Span
*/
public Span createTransactionSpan(String transactionName, String xid) {
Span span = tracer.nextSpan()
.name("distributed-transaction")
.tag("transaction.name", transactionName)
.tag("transaction.xid", xid)
.tag("service.name", "order-service")
.start();
return span;
}
/**
* 记录事务阶段
*/
public void recordTransactionPhase(Span span, String phase, String status) {
span.tag("transaction.phase", phase)
.tag("transaction.status", status)
.event("phase." + phase + "." + status);
}
/**
* 记录事务异常
*/
public void recordTransactionError(Span span, Throwable error) {
span.tag("error", "true")
.tag("error.message", error.getMessage())
.tag("error.class", error.getClass().getSimpleName());
}
}
/**
* 事务健康检查
*/
@Component
public class TransactionHealthIndicator implements HealthIndicator {
@Autowired
private TransactionStatisticsService statisticsService;
@Override
public Health health() {
try {
// 检查事务成功率
TransactionStatistics stats = statisticsService.getRecentStatistics(Duration.ofMinutes(5));
double successRate = stats.getSuccessRate();
long avgDuration = stats.getAverageDuration();
Health.Builder builder = new Health.Builder();
if (successRate >= 0.95 && avgDuration <= 3000) {
builder.up();
} else if (successRate >= 0.90 && avgDuration <= 5000) {
builder.status("WARNING");
} else {
builder.down();
}
return builder
.withDetail("successRate", successRate)
.withDetail("averageDuration", avgDuration + "ms")
.withDetail("totalTransactions", stats.getTotalCount())
.withDetail("failedTransactions", stats.getFailedCount())
.build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}
/**
* 事务统计服务
*/
@Service
public class TransactionStatisticsService {
@Autowired
private MeterRegistry meterRegistry;
public TransactionStatistics getRecentStatistics(Duration duration) {
// 获取指定时间段内的事务统计
Timer transactionTimer = meterRegistry.get("distributed.transaction.duration").timer();
Counter successCounter = meterRegistry.get("distributed.transaction.success").counter();
Counter failureCounter = meterRegistry.get("distributed.transaction.failure").counter();
long totalCount = (long) (successCounter.count() + failureCounter.count());
long failedCount = (long) failureCounter.count();
double successRate = totalCount > 0 ? (double) (totalCount - failedCount) / totalCount : 0;
long avgDuration = (long) transactionTimer.mean(TimeUnit.MILLISECONDS);
return TransactionStatistics.builder()
.totalCount(totalCount)
.failedCount(failedCount)
.successRate(successRate)
.averageDuration(avgDuration)
.build();
}
}/**
* 分布式事务故障诊断器
*/
@Component
public class TransactionDiagnostics {
@Autowired
private DataSource dataSource;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 诊断数据库连接状态
*/
public DiagnosticResult diagnoseDatabaseConnection() {
try (Connection connection = dataSource.getConnection()) {
boolean isValid = connection.isValid(5);
if (isValid) {
return DiagnosticResult.success("数据库连接正常");
} else {
return DiagnosticResult.failure("数据库连接异常");
}
} catch (SQLException e) {
return DiagnosticResult.failure("数据库连接失败: " + e.getMessage());
}
}
/**
* 诊断Redis连接状态
*/
public DiagnosticResult diagnoseRedisConnection() {
try {
String pong = redisTemplate.getConnectionFactory()
.getConnection()
.ping();
if ("PONG".equals(pong)) {
return DiagnosticResult.success("Redis连接正常");
} else {
return DiagnosticResult.failure("Redis连接异常");
}
} catch (Exception e) {
return DiagnosticResult.failure("Redis连接失败: " + e.getMessage());
}
}
/**
* 诊断Seata连接状态
*/
public DiagnosticResult diagnoseSeataConnection() {
try {
// 检查TM连接
boolean tmConnected = GlobalTransactionContext.getCurrentOrCreate().getXid() != null;
if (tmConnected) {
return DiagnosticResult.success("Seata连接正常");
} else {
return DiagnosticResult.failure("Seata连接异常");
}
} catch (Exception e) {
return DiagnosticResult.failure("Seata连接失败: " + e.getMessage());
}
}
/**
* 诊断事务锁状态
*/
public DiagnosticResult diagnoseTransactionLocks() {
try {
// 查询当前锁信息
String sql = "SELECT * FROM lock_table WHERE xid IS NOT NULL";
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {
int lockCount = 0;
while (resultSet.next()) {
lockCount++;
}
if (lockCount == 0) {
return DiagnosticResult.success("无事务锁");
} else if (lockCount < 100) {
return DiagnosticResult.warning("存在 " + lockCount + " 个事务锁");
} else {
return DiagnosticResult.failure("事务锁过多: " + lockCount);
}
}
} catch (SQLException e) {
return DiagnosticResult.failure("查询事务锁失败: " + e.getMessage());
}
}
/**
* 综合诊断报告
*/
public ComprehensiveDiagnosticReport generateDiagnosticReport() {
ComprehensiveDiagnosticReport report = new ComprehensiveDiagnosticReport();
report.setDatabaseResult(diagnoseDatabaseConnection());
report.setRedisResult(diagnoseRedisConnection());
report.setSeataResult(diagnoseSeataConnection());
report.setLockResult(diagnoseTransactionLocks());
report.setTimestamp(LocalDateTime.now());
// 计算整体健康状态
report.calculateOverallHealth();
return report;
}
}
/**
* 诊断结果
*/
@Data
@Builder
public class DiagnosticResult {
private String status; // SUCCESS, WARNING, FAILURE
private String message;
private LocalDateTime timestamp;
public static DiagnosticResult success(String message) {
return DiagnosticResult.builder()
.status("SUCCESS")
.message(message)
.timestamp(LocalDateTime.now())
.build();
}
public static DiagnosticResult warning(String message) {
return DiagnosticResult.builder()
.status("WARNING")
.message(message)
.timestamp(LocalDateTime.now())
.build();
}
public static DiagnosticResult failure(String message) {
return DiagnosticResult.builder()
.status("FAILURE")
.message(message)
.timestamp(LocalDateTime.now())
.build();
}
}# docker-compose.yml - 生产环境部署配置
version: '3.8'
services:
# Seata Server
seata-server:
image: seataio/seata-server:1.5.2
hostname: seata-server
ports:
- "8091:8091"
environment:
- SEATA_PORT=8091
- STORE_MODE=db
- SEATA_CONFIG_NAME=file:/root/seata-config/registry
volumes:
- ./seata-config:/root/seata-config
networks:
- seata-network
restart: always
# MySQL主库
mysql-master:
image: mysql:8.0
hostname: mysql-master
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=root123
- MYSQL_DATABASE=seata
volumes:
- mysql-master-data:/var/lib/mysql
- ./mysql-config/master.cnf:/etc/mysql/conf.d/master.cnf
networks:
- seata-network
restart: always
# MySQL从库
mysql-slave:
image: mysql:8.0
hostname: mysql-slave
ports:
- "3307:3306"
environment:
- MYSQL_ROOT_PASSWORD=root123
- MYSQL_DATABASE=seata
volumes:
- mysql-slave-data:/var/lib/mysql
- ./mysql-config/slave.cnf:/etc/mysql/conf.d/slave.cnf
networks:
- seata-network
restart: always
depends_on:
- mysql-master
# Redis集群
redis-master:
image: redis:7.0
hostname: redis-master
ports:
- "6379:6379"
command: redis-server --appendonly yes --cluster-enabled yes
volumes:
- redis-master-data:/data
networks:
- seata-network
restart: always
# 订单服务
order-service:
image: order-service:latest
hostname: order-service
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=prod
- SEATA_SERVER_ADDR=seata-server:8091
- MYSQL_HOST=mysql-master:3306
- REDIS_HOST=redis-master:6379
networks:
- seata-network
restart: always
depends_on:
- seata-server
- mysql-master
- redis-master
# 库存服务
inventory-service:
image: inventory-service:latest
hostname: inventory-service
ports:
- "8081:8081"
environment:
- SPRING_PROFILES_ACTIVE=prod
- SEATA_SERVER_ADDR=seata-server:8091
- MYSQL_HOST=mysql-master:3306
- REDIS_HOST=redis-master:6379
networks:
- seata-network
restart: always
depends_on:
- seata-server
- mysql-master
- redis-master
volumes:
mysql-master-data:
mysql-slave-data:
redis-master-data:
networks:
seata-network:
driver: bridge/**
* 生产环境配置
*/
@Configuration
@Profile("prod")
public class ProductionTransactionConfiguration {
/**
* 生产环境数据源配置
*/
@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
HikariConfig config = new HikariConfig();
// 生产环境优化配置
config.setMaximumPoolSize(50);
config.setMinimumIdle(10);
config.setConnectionTimeout(30000);
config.setIdleTimeout(300000);
config.setMaxLifetime(1800000);
config.setLeakDetectionThreshold(60000);
// 连接池监控
config.setRegisterMbeans(true);
return new HikariDataSource(config);
}
/**
* 只读数据源配置
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
HikariConfig config = new HikariConfig();
config.setMaximumPoolSize(30);
config.setMinimumIdle(5);
config.setReadOnly(true);
return new HikariDataSource(config);
}
/**
* 读写分离配置
*/
@Bean
public DataSource routingDataSource() {
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave", slaveDataSource());
DynamicDataSource routingDataSource = new DynamicDataSource();
routingDataSource.setTargetDataSources(dataSourceMap);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
/**
* Seata生产环境配置
*/
@Bean
public GlobalTransactionScanner productionGlobalTransactionScanner() {
GlobalTransactionScanner scanner = new GlobalTransactionScanner("order-service", "default_tx_group");
// 生产环境优化配置
System.setProperty("seata.server.addr", "seata-server:8091");
System.setProperty("client.tm.commitRetryCount", "5");
System.setProperty("client.tm.rollbackRetryCount", "5");
System.setProperty("client.rm.asyncCommitBufferLimit", "10000");
System.setProperty("client.rm.reportRetryCount", "5");
System.setProperty("client.rm.tableMetaCheckEnable", "true");
System.setProperty("client.rm.reportSuccessEnable", "false");
System.setProperty("client.rm.sagaBranchRegisterEnable", "false");
System.setProperty("client.tm.degradeCheck", "false");
System.setProperty("client.tm.degradeCheckAllowTimes", "10");
System.setProperty("client.tm.degradeCheckPeriod", "2000");
return scanner;
}
}
/**
* 动态数据源
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
/**
* 数据源上下文持有者
*/
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}/**
* 事务超时处理器
*/
@Component
public class TransactionTimeoutHandler {
private static final int DEFAULT_TIMEOUT = 30000; // 30秒
private static final int MAX_TIMEOUT = 300000; // 5分钟
/**
* 动态调整事务超时时间
*/
public int calculateTimeout(TransactionContext context) {
int participantCount = context.getParticipantCount();
int businessComplexity = context.getBusinessComplexity();
// 基础超时时间
int timeout = DEFAULT_TIMEOUT;
// 根据参与者数量调整
timeout += participantCount * 5000;
// 根据业务复杂度调整
timeout += businessComplexity * 10000;
// 限制最大超时时间
return Math.min(timeout, MAX_TIMEOUT);
}
/**
* 超时重试策略
*/
@Retryable(value = {TransactionTimeoutException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2))
public void handleTimeoutWithRetry(Runnable transactionLogic) {
try {
transactionLogic.run();
} catch (TransactionTimeoutException e) {
log.warn("事务超时,准备重试: {}", e.getMessage());
throw e;
}
}
/**
* 超时降级处理
*/
public void handleTimeoutDegradation(String transactionId) {
log.warn("事务超时,执行降级处理,事务ID: {}", transactionId);
// 1. 记录超时事务
recordTimeoutTransaction(transactionId);
// 2. 发送告警
sendTimeoutAlert(transactionId);
// 3. 执行补偿逻辑
executeCompensation(transactionId);
}
private void recordTimeoutTransaction(String transactionId) {
// 记录超时事务到数据库
}
private void sendTimeoutAlert(String transactionId) {
// 发送超时告警
}
private void executeCompensation(String transactionId) {
// 执行补偿逻辑
}
}/**
* 死锁检测器
*/
@Component
public class DeadlockDetector {
@Autowired
private DataSource dataSource;
/**
* 检测数据库死锁
*/
@Scheduled(fixedRate = 30000) // 每30秒检测一次
public void detectDeadlocks() {
try (Connection connection = dataSource.getConnection()) {
// MySQL死锁检测SQL
String sql = "SELECT * FROM INFORMATION_SCHEMA.INNODB_LOCKS";
try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {
List<LockInfo> locks = new ArrayList<>();
while (resultSet.next()) {
LockInfo lockInfo = new LockInfo();
lockInfo.setLockId(resultSet.getString("lock_id"));
lockInfo.setLockTrxId(resultSet.getString("lock_trx_id"));
lockInfo.setLockMode(resultSet.getString("lock_mode"));
lockInfo.setLockType(resultSet.getString("lock_type"));
lockInfo.setLockTable(resultSet.getString("lock_table"));
locks.add(lockInfo);
}
if (!locks.isEmpty()) {
analyzeDeadlock(locks);
}
}
} catch (SQLException e) {
log.error("死锁检测失败", e);
}
}
/**
* 分析死锁情况
*/
private void analyzeDeadlock(List<LockInfo> locks) {
log.warn("检测到潜在死锁,锁数量: {}", locks.size());
// 按事务ID分组
Map<String, List<LockInfo>> locksByTrx = locks.stream()
.collect(Collectors.groupingBy(LockInfo::getLockTrxId));
// 检测循环等待
for (Map.Entry<String, List<LockInfo>> entry : locksByTrx.entrySet()) {
String trxId = entry.getKey();
List<LockInfo> trxLocks = entry.getValue();
log.warn("事务 {} 持有锁: {}", trxId, trxLocks.size());
}
// 发送死锁告警
sendDeadlockAlert(locks);
}
/**
* 死锁解决策略
*/
public void resolveDeadlock(String transactionId) {
log.info("开始解决死锁,事务ID: {}", transactionId);
try {
// 1. 回滚优先级较低的事务
rollbackLowerPriorityTransaction(transactionId);
// 2. 记录死锁事件
recordDeadlockEvent(transactionId);
// 3. 优化锁获取顺序
optimizeLockOrder();
} catch (Exception e) {
log.error("死锁解决失败,事务ID: {}", transactionId, e);
}
}
private void rollbackLowerPriorityTransaction(String transactionId) {
// 回滚优先级较低的事务
}
private void recordDeadlockEvent(String transactionId) {
// 记录死锁事件
}
private void optimizeLockOrder() {
// 优化锁获取顺序
}
private void sendDeadlockAlert(List<LockInfo> locks) {
// 发送死锁告警
}
}
/**
* 锁信息
*/
@Data
public class LockInfo {
private String lockId;
private String lockTrxId;
private String lockMode;
private String lockType;
private String lockTable;
}通过前面章节的深入学习,我们全面掌握了分布式事务的核心概念和实现原理:
ACID特性在分布式环境下的挑战:
CAP理论与BASE理论:
我们深入学习了多种分布式事务解决方案,每种方案都有其适用场景:
/**
* 分布式事务解决方案总结
*/
public class TransactionSolutionSummary {
/**
* 两阶段提交(2PC)
* 优点:强一致性,实现简单
* 缺点:性能较差,存在单点故障风险
* 适用场景:对一致性要求极高的金融场景
*/
public void twoPhaseCommitSummary() {
log.info("2PC特点:");
log.info("- 强一致性保证");
log.info("- 阻塞式协议,性能较差");
log.info("- 协调者单点故障风险");
log.info("- 适用于金融、支付等强一致性场景");
}
/**
* TCC(Try-Confirm-Cancel)
* 优点:性能好,无锁设计
* 缺点:业务侵入性强,开发复杂度高
* 适用场景:高并发、对性能要求高的场景
*/
public void tccSummary() {
log.info("TCC特点:");
log.info("- 三阶段提交,性能优秀");
log.info("- 无锁设计,支持高并发");
log.info("- 业务侵入性强,需要实现三个方法");
log.info("- 适用于电商、秒杀等高并发场景");
}
/**
* 本地消息表
* 优点:实现简单,可靠性高
* 缺点:需要额外存储,有一定延迟
* 适用场景:对实时性要求不高的异步场景
*/
public void localMessageTableSummary() {
log.info("本地消息表特点:");
log.info("- 实现简单,易于理解");
log.info("- 最终一致性保证");
log.info("- 需要定时任务处理消息");
log.info("- 适用于订单处理、数据同步等场景");
}
/**
* 消息事务
* 优点:解耦性好,扩展性强
* 缺点:依赖消息中间件,复杂度较高
* 适用场景:微服务架构下的异步处理
*/
public void messageTransactionSummary() {
log.info("消息事务特点:");
log.info("- 基于消息中间件实现");
log.info("- 服务间解耦,扩展性好");
log.info("- 支持异步处理");
log.info("- 适用于微服务架构");
}
/**
* Saga模式
* 优点:长事务支持,补偿机制灵活
* 缺点:补偿逻辑复杂,一致性较弱
* 适用场景:长流程业务,如旅游预订
*/
public void sagaSummary() {
log.info("Saga模式特点:");
log.info("- 支持长事务处理");
log.info("- 灵活的补偿机制");
log.info("- 最终一致性");
log.info("- 适用于复杂业务流程");
}
}通过实际代码示例,我们掌握了主流分布式事务框架的使用:
Seata框架:
Apache ShardingSphere:
RocketMQ事务消息:
随着云原生技术的发展,分布式事务也在向云原生方向演进:
/**
* 云原生分布式事务趋势
*/
@Component
public class CloudNativeTransactionTrends {
/**
* Kubernetes环境下的分布式事务
*/
public void kubernetesTransactionPattern() {
log.info("Kubernetes环境下分布式事务特点:");
log.info("1. 服务发现与负载均衡自动化");
log.info("2. 容器化部署,弹性伸缩");
log.info("3. 配置管理云原生化");
log.info("4. 监控与日志统一收集");
// 示例:Kubernetes配置
String k8sConfig = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: seata-server
spec:
replicas: 3
selector:
matchLabels:
app: seata-server
template:
metadata:
labels:
app: seata-server
spec:
containers:
- name: seata-server
image: seataio/seata-server:1.5.2
ports:
- containerPort: 8091
env:
- name: SEATA_PORT
value: "8091"
- name: STORE_MODE
value: "db"
""";
log.info("Kubernetes部署配置示例:\n{}", k8sConfig);
}
/**
* 服务网格中的分布式事务
*/
public void serviceMeshTransaction() {
log.info("服务网格(Service Mesh)中的分布式事务:");
log.info("1. Istio/Envoy代理层事务管理");
log.info("2. 分布式追踪自动化");
log.info("3. 流量管理与故障注入");
log.info("4. 安全策略统一管理");
}
/**
* Serverless环境下的事务处理
*/
public void serverlessTransaction() {
log.info("Serverless环境下的分布式事务:");
log.info("1. 函数计算的事务协调");
log.info("2. 事件驱动的事务模式");
log.info("3. 冷启动对事务性能的影响");
log.info("4. 状态管理的挑战与解决方案");
}
}分布式事务技术正在与其他新兴技术深度融合:
/**
* 新兴技术与分布式事务融合
*/
@Component
public class EmergingTechnologyIntegration {
/**
* 区块链与分布式事务
*/
public void blockchainTransaction() {
log.info("区块链技术在分布式事务中的应用:");
log.info("1. 智能合约实现自动化事务执行");
log.info("2. 不可篡改的事务记录");
log.info("3. 去中心化的事务协调");
log.info("4. 跨链事务处理");
// 示例:智能合约事务
String smartContract = """
pragma solidity ^0.8.0;
contract DistributedTransaction {
enum TransactionStatus { PENDING, COMMITTED, ABORTED }
struct Transaction {
address coordinator;
TransactionStatus status;
mapping(address => bool) participants;
}
mapping(bytes32 => Transaction) public transactions;
function createTransaction(bytes32 txId) public {
transactions[txId].coordinator = msg.sender;
transactions[txId].status = TransactionStatus.PENDING;
}
function commitTransaction(bytes32 txId) public {
require(transactions[txId].coordinator == msg.sender);
transactions[txId].status = TransactionStatus.COMMITTED;
}
}
""";
log.info("智能合约示例:\n{}", smartContract);
}
/**
* AI/ML在分布式事务中的应用
*/
public void aiMlTransaction() {
log.info("AI/ML在分布式事务中的应用:");
log.info("1. 智能事务路由与负载均衡");
log.info("2. 异常检测与自动恢复");
log.info("3. 性能预测与优化建议");
log.info("4. 智能补偿策略选择");
}
/**
* 边缘计算环境下的分布式事务
*/
public void edgeComputingTransaction() {
log.info("边缘计算环境下的分布式事务:");
log.info("1. 网络延迟与带宽限制");
log.info("2. 离线场景下的事务处理");
log.info("3. 边缘节点间的协调机制");
log.info("4. 云边协同的事务模式");
}
}为了更深入地理解分布式事务,推荐以下经典资料:
/**
* 扩展阅读资源推荐
*/
@Component
public class ExtendedReadingRecommendations {
/**
* 经典论文推荐
*/
public void recommendPapers() {
List<Paper> papers = Arrays.asList(
new Paper("Consensus in the Presence of Partial Synchrony",
"Dwork, Lynch, Stockmeyer", 1988,
"分布式共识算法的奠基性论文"),
new Paper("Impossibility of Distributed Consensus with One Faulty Process",
"Fischer, Lynch, Paterson", 1985,
"FLP不可能性定理,分布式系统理论基础"),
new Paper("The Part-Time Parliament",
"Leslie Lamport", 1998,
"Paxos算法的原始论文"),
new Paper("In Search of an Understandable Consensus Algorithm",
"Diego Ongaro, John Ousterhout", 2014,
"Raft算法论文,更易理解的共识算法"),
new Paper("Sagas",
"Hector Garcia-Molina, Kenneth Salem", 1987,
"Saga模式的原始论文")
);
papers.forEach(paper -> {
log.info("论文:{}", paper.getTitle());
log.info("作者:{}", paper.getAuthors());
log.info("年份:{}", paper.getYear());
log.info("简介:{}", paper.getDescription());
log.info("---");
});
}
/**
* 技术书籍推荐
*/
public void recommendBooks() {
List<Book> books = Arrays.asList(
new Book("设计数据密集型应用",
"Martin Kleppmann",
"深入讲解分布式系统设计原理"),
new Book("分布式系统概念与设计",
"George Coulouris等",
"分布式系统的经典教材"),
new Book("微服务架构设计模式",
"Chris Richardson",
"微服务架构下的事务处理模式"),
new Book("高性能MySQL",
"Baron Schwartz等",
"MySQL事务处理与优化"),
new Book("Redis设计与实现",
"黄健宏",
"Redis事务机制深度解析")
);
books.forEach(book -> {
log.info("书籍:{}", book.getTitle());
log.info("作者:{}", book.getAuthor());
log.info("简介:{}", book.getDescription());
log.info("---");
});
}
/**
* 在线资源推荐
*/
public void recommendOnlineResources() {
List<OnlineResource> resources = Arrays.asList(
new OnlineResource("Apache Seata官方文档",
"https://seata.io/zh-cn/docs/overview/what-is-seata.html",
"Seata框架的官方文档和最佳实践"),
new OnlineResource("分布式事务系列博客",
"https://www.cnblogs.com/savorboard/p/distributed-transaction.html",
"深入浅出的分布式事务系列文章"),
new OnlineResource("MIT 6.824分布式系统课程",
"https://pdos.csail.mit.edu/6.824/",
"MIT的分布式系统经典课程"),
new OnlineResource("Raft算法可视化",
"http://thesecretlivesofdata.com/raft/",
"Raft算法的可视化演示")
);
resources.forEach(resource -> {
log.info("资源:{}", resource.getTitle());
log.info("链接:{}", resource.getUrl());
log.info("描述:{}", resource.getDescription());
log.info("---");
});
}
}
@Data
@AllArgsConstructor
class Paper {
private String title;
private String authors;
private int year;
private String description;
}
@Data
@AllArgsConstructor
class Book {
private String title;
private String author;
private String description;
}
@Data
@AllArgsConstructor
class OnlineResource {
private String title;
private String url;
private String description;
}/**
* 开源项目推荐
*/
@Component
public class OpenSourceProjectRecommendations {
/**
* 分布式事务框架
*/
public void recommendTransactionFrameworks() {
List<OpenSourceProject> frameworks = Arrays.asList(
new OpenSourceProject("Seata",
"https://github.com/seata/seata",
"阿里巴巴开源的分布式事务解决方案",
"Java"),
new OpenSourceProject("Apache ShardingSphere",
"https://github.com/apache/shardingsphere",
"分布式数据库中间件,支持分布式事务",
"Java"),
new OpenSourceProject("TCC-Transaction",
"https://github.com/changmingxie/tcc-transaction",
"TCC型分布式事务框架",
"Java"),
new OpenSourceProject("Hmily",
"https://github.com/dromara/hmily",
"高性能异步分布式事务TCC框架",
"Java"),
new OpenSourceProject("ByteTCC",
"https://github.com/liuyangming/ByteTCC",
"基于TCC机制的分布式事务管理器",
"Java")
);
frameworks.forEach(project -> {
log.info("项目:{}", project.getName());
log.info("地址:{}", project.getUrl());
log.info("描述:{}", project.getDescription());
log.info("语言:{}", project.getLanguage());
log.info("---");
});
}
/**
* 监控与诊断工具
*/
public void recommendMonitoringTools() {
List<OpenSourceProject> tools = Arrays.asList(
new OpenSourceProject("Zipkin",
"https://github.com/openzipkin/zipkin",
"分布式追踪系统",
"Java"),
new OpenSourceProject("Jaeger",
"https://github.com/jaegertracing/jaeger",
"CNCF分布式追踪项目",
"Go"),
new OpenSourceProject("SkyWalking",
"https://github.com/apache/skywalking",
"应用性能监控系统",
"Java"),
new OpenSourceProject("Prometheus",
"https://github.com/prometheus/prometheus",
"监控和告警工具包",
"Go")
);
tools.forEach(tool -> {
log.info("工具:{}", tool.getName());
log.info("地址:{}", tool.getUrl());
log.info("描述:{}", tool.getDescription());
log.info("语言:{}", tool.getLanguage());
log.info("---");
});
}
}
@Data
@AllArgsConstructor
class OpenSourceProject {
private String name;
private String url;
private String description;
private String language;
}为了加深对分布式事务的理解,以下是一些值得深入思考的问题:
/**
* 深度思考题
*/
@Component
public class DeepThinkingQuestions {
/**
* 理论层面的思考题
*/
public void theoreticalQuestions() {
List<String> questions = Arrays.asList(
"1. 在CAP定理的约束下,如何在一致性和可用性之间找到最佳平衡点?",
"2. 为什么说FLP不可能性定理并不意味着分布式共识无法实现?实际系统是如何绕过这个限制的?",
"3. BASE理论中的'最终一致性'在实际业务中可以接受多长的不一致时间窗口?",
"4. 分布式事务的性能瓶颈主要来自哪里?网络延迟、锁竞争还是协调开销?",
"5. 在微服务架构中,是否应该完全避免分布式事务?有哪些替代方案?"
);
questions.forEach(question -> {
log.info("思考题:{}", question);
log.info("---");
});
}
/**
* 实践层面的思考题
*/
public void practicalQuestions() {
List<String> questions = Arrays.asList(
"1. 在电商系统中,订单、库存、支付服务如何设计分布式事务?考虑性能和一致性要求。",
"2. 如何设计一个支持百万级TPS的分布式事务系统?需要考虑哪些关键因素?",
"3. 在跨国部署的系统中,网络延迟很高,如何优化分布式事务的性能?",
"4. 如何设计分布式事务的监控体系?需要监控哪些关键指标?",
"5. 在容器化环境中部署分布式事务系统时,需要注意哪些问题?"
);
questions.forEach(question -> {
log.info("实践题:{}", question);
log.info("---");
});
}
/**
* 架构设计思考题
*/
public void architecturalQuestions() {
List<String> questions = Arrays.asList(
"1. 如何设计一个既支持强一致性又支持最终一致性的混合事务系统?",
"2. 在多云环境下,如何实现跨云的分布式事务?需要考虑哪些技术挑战?",
"3. 如何设计分布式事务的降级策略?在系统压力过大时如何保证核心功能?",
"4. 边缘计算场景下,如何处理网络分区时的分布式事务?",
"5. 如何设计支持动态扩缩容的分布式事务协调器?"
);
questions.forEach(question -> {
log.info("架构题:{}", question);
log.info("---");
});
}
}/**
* 实战练习建议
*/
@Component
public class PracticalExercises {
/**
* 初级练习
*/
public void beginnerExercises() {
log.info("初级练习建议:");
log.info("1. 搭建Seata环境,实现简单的订单-库存-账户事务");
log.info("2. 使用本地消息表模式实现异步事务处理");
log.info("3. 编写TCC模式的Try-Confirm-Cancel方法");
log.info("4. 实现基于RocketMQ的事务消息");
log.info("5. 配置Seata的AT模式自动代理");
}
/**
* 中级练习
*/
public void intermediateExercises() {
log.info("中级练习建议:");
log.info("1. 设计并实现一个完整的电商下单流程分布式事务");
log.info("2. 实现分布式事务的性能监控和告警系统");
log.info("3. 编写分布式事务的压力测试工具");
log.info("4. 实现事务的自动重试和补偿机制");
log.info("5. 设计分布式事务的故障注入和恢复测试");
}
/**
* 高级练习
*/
public void advancedExercises() {
log.info("高级练习建议:");
log.info("1. 设计支持多种事务模式的统一事务框架");
log.info("2. 实现分布式事务的智能路由和负载均衡");
log.info("3. 开发分布式事务的可视化管理平台");
log.info("4. 实现跨数据中心的分布式事务协调");
log.info("5. 设计基于机器学习的事务性能优化系统");
}
}通过本文的深入学习,我们全面掌握了分布式事务的理论基础、实现原理、主流框架使用以及性能优化等核心知识。分布式事务作为分布式系统中的关键技术,其重要性不言而喻。
核心收获:
技术发展展望:
分布式事务技术正在向更加智能化、云原生化的方向发展。未来我们将看到:
持续学习建议:
技术发展日新月异,建议大家:
分布式事务是一个复杂而又充满挑战的技术领域,希望本文能够为大家的学习和实践提供有价值的参考。让我们在分布式系统的道路上继续探索,共同推动技术的发展与进步!
作者寄语:
“在分布式系统的世界里,没有银弹,只有在特定场景下的最优解。理解业务需求,选择合适的技术方案,持续优化和改进,这是我们作为技术人员应该具备的核心能力。”