大家好,又见面了,我是你们的朋友全栈君。
5. 提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactoryServiceImpl#factoryOf即ParticipantHmilyTransactionHandler
public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
HmilyTransaction hmilyTransaction = null;
HmilyTransaction currentTransaction;
switch (HmilyActionEnum.acquireByCode(context.getAction())) {
case TRYING:
try {
hmilyTransaction = hmilyTransactionExecutor.preTryParticipant(context, point);
final Object proceed = point.proceed();
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
//update log status to try
hmilyTransactionExecutor.updateStatus(hmilyTransaction);
return proceed;
} catch (Throwable throwable) {
//if exception ,delete log.
hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
throw throwable;
} finally {
HmilyTransactionContextLocal.getInstance().remove();
}
case CONFIRMING:
currentTransaction = HmilyTransactionGuavaCacheManager
.getInstance().getHmilyTransaction(context.getTransId());
hmilyTransactionExecutor.confirm(currentTransaction);
break;
case CANCELING:
currentTransaction = HmilyTransactionGuavaCacheManager
.getInstance().getHmilyTransaction(context.getTransId());
hmilyTransactionExecutor.cancel(currentTransaction);
break;
default:
break;
}
Method method = ((MethodSignature) (point.getSignature())).getMethod();
return DefaultValueUtils.getDefaultValue(method.getReturnType());
}
刚开始时TRYING,参与者执行tey方法,构建HmilyTransaction,并且保存在Guava内存缓存中。然后发布保存事件保存在本服务所在的数据库中。最后保存上下文到ThreadLocal中返回。执行本地的业务方法,最后更新事务状态,清除ThreadLocal返回。
public HmilyTransaction preTryParticipant(final HmilyTransactionContext context, final ProceedingJoinPoint point) {
LogUtil.debug(LOGGER, "participant hmily transaction start..:{}", context::toString);
final HmilyTransaction hmilyTransaction = buildHmilyTransaction(point, HmilyRoleEnum.PROVIDER.getCode(), context.getTransId());
//cache by guava
HmilyTransactionGuavaCacheManager.getInstance().cacheHmilyTransaction(hmilyTransaction);
//publishEvent
hmilyTransactionEventPublisher.publishEvent(hmilyTransaction, EventTypeEnum.SAVE.getCode());
//Nested transaction support
context.setRole(HmilyRoleEnum.LOCAL.getCode());
HmilyTransactionContextLocal.getInstance().set(context);
return hmilyTransaction;
}
6. 继续回到消费方的StarterHmilyTransactionHandler,因为远程的rpc已经调用完毕。也就是returnValue = point.proceed();执行完毕,更新本地事务状态为TRYING(1, “try阶段完成”),开始执行确认操作,如果发生异常则执行取消操作,两者类似,只是操作不一样。都是通过线程池来异步执行,HmilyTransactionExecutor#confirm,更新本地事务状态,因为在每次在执行dubbo的filter的时候都会把参与方的信息记录下来,即updateParticipant,所以这块就用反射操作调用该确认方法,这个也是个RPC调用,之前的流程也会再来一遍。请求到达提供端,首先从HmilyTransactionGuavaCacheManager中获取事务信息,如果没有的话就会从数据库中查询。最后执行相应的操作。
public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
LogUtil.debug(LOGGER, () -> "hmily transaction confirm .......!start");
if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
return;
}
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
updateStatus(currentTransaction);
final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
List<HmilyParticipant> failList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
boolean success = true;
if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
HmilyTransactionContext context = new HmilyTransactionContext();
context.setAction(HmilyActionEnum.CONFIRMING.getCode());
context.setRole(HmilyRoleEnum.START.getCode());
context.setTransId(hmilyParticipant.getTransId());
HmilyTransactionContextLocal.getInstance().set(context);
executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
} catch (Exception e) {
LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
success = false;
failList.add(hmilyParticipant);
} finally {
HmilyTransactionContextLocal.getInstance().remove();
}
}
executeHandler(success, currentTransaction, failList);
}
}
7. 如果出现非一致性异常的话,需要保证事务的事务的最后一致性,通过HmilyTransactionSelfRecoveryScheduled定时程序来实现,获取延迟多长时间后的事务信息,只要为了防止并发的时候,刚新增的数据被执行.判断事务信息的角色,如果是提供者并且状态是try刚开始的话,说明本地事务都执行失败等,也不会影响消费方,直接删除日志即可,判断重试次数是否达到上限,判断分布式事务模式为TCC还是CC,如果事务角色是提供者,重试只能由消费执行。最后更新重试次数,继续执行确认或者取消方法。HmilyTransactionRecoveryService具体方法和HmilyTransactionExecutor类似
public void onApplicationEvent(final ContextRefreshedEvent event) {
hmilyCoordinatorRepository = SpringBeanUtils.getInstance().getBean(HmilyCoordinatorRepository.class);
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(1,
HmilyThreadFactory.create("hmily-transaction-self-recovery", true));
hmilyTransactionRecoveryService = new HmilyTransactionRecoveryService(hmilyCoordinatorRepository);
selfRecovery();
}
/**
* if have some exception by schedule execute hmily transaction log.
*/
private void selfRecovery() {
scheduledExecutorService
.scheduleWithFixedDelay(() -> {
LogUtil.info(LOGGER, "self recovery execute delayTime:{}", () -> hmilyConfig.getScheduledDelay());
try {
final List<HmilyTransaction> hmilyTransactions = hmilyCoordinatorRepository.listAllByDelay(acquireData());
if (CollectionUtils.isEmpty(hmilyTransactions)) {
return;
}
for (HmilyTransaction hmilyTransaction : hmilyTransactions) {
// if the try is not completed, no compensation will be provided (to prevent various exceptions in the try phase)
if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
&& hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()) {
hmilyCoordinatorRepository.remove(hmilyTransaction.getTransId());
continue;
}
if (hmilyTransaction.getRetriedCount() > hmilyConfig.getRetryMax()) {
LogUtil.error(LOGGER, "This transaction exceeds the maximum number of retries and no retries will occur:{}", () -> hmilyTransaction);
continue;
}
if (Objects.equals(hmilyTransaction.getPattern(), PatternEnum.CC.getCode())
&& hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()) {
continue;
}
// if the transaction role is the provider, and the number of retries in the scope class cannot be executed, only by the initiator
if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
&& (hmilyTransaction.getCreateTime().getTime()
+ hmilyConfig.getRecoverDelayTime() * hmilyConfig.getLoadFactor() * 1000
> System.currentTimeMillis())) {
continue;
}
try {
hmilyTransaction.setRetriedCount(hmilyTransaction.getRetriedCount() + 1);
final int rows = hmilyCoordinatorRepository.update(hmilyTransaction);
// determine that rows>0 is executed to prevent concurrency when the business side is in cluster mode
if (rows > 0) {
if (hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()
|| hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()
|| hmilyTransaction.getStatus() == HmilyActionEnum.CANCELING.getCode()) {
hmilyTransactionRecoveryService.cancel(hmilyTransaction);
} else if (hmilyTransaction.getStatus() == HmilyActionEnum.CONFIRMING.getCode()) {
hmilyTransactionRecoveryService.confirm(hmilyTransaction);
}
}
} catch (Exception e) {
e.printStackTrace();
LogUtil.error(LOGGER, "execute recover exception:{}", e::getMessage);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledDelay(), TimeUnit.SECONDS);
}
private Date acquireData() {
return new Date(LocalDateTime.now().atZone(ZoneId.systemDefault())
.toInstant().toEpochMilli() - (hmilyConfig.getRecoverDelayTime() * 1000));
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/143204.html原文链接:https://javaforall.cn