前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hmily(3)

Hmily(3)

作者头像
全栈程序员站长
发布2022-08-25 21:38:56
1890
发布2022-08-25 21:38:56
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

5. 提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactoryServiceImpl#factoryOf即ParticipantHmilyTransactionHandler

代码语言:javascript
复制
public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
        HmilyTransaction hmilyTransaction = null;
        HmilyTransaction currentTransaction;
        switch (HmilyActionEnum.acquireByCode(context.getAction())) {
            case TRYING:
                try {
                    hmilyTransaction = hmilyTransactionExecutor.preTryParticipant(context, point);
                    final Object proceed = point.proceed();
                    hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
                    //update log status to try
                    hmilyTransactionExecutor.updateStatus(hmilyTransaction);
                    return proceed;
                } catch (Throwable throwable) {
                    //if exception ,delete log.
                    hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
                    throw throwable;
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            case CONFIRMING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.confirm(currentTransaction);
                break;
            case CANCELING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.cancel(currentTransaction);
                break;
            default:
                break;
        }
        Method method = ((MethodSignature) (point.getSignature())).getMethod();
        return DefaultValueUtils.getDefaultValue(method.getReturnType());
    }

刚开始时TRYING,参与者执行tey方法,构建HmilyTransaction,并且保存在Guava内存缓存中。然后发布保存事件保存在本服务所在的数据库中。最后保存上下文到ThreadLocal中返回。执行本地的业务方法,最后更新事务状态,清除ThreadLocal返回。

代码语言:javascript
复制
 public HmilyTransaction preTryParticipant(final HmilyTransactionContext context, final ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, "participant hmily transaction start..:{}", context::toString);
        final HmilyTransaction hmilyTransaction = buildHmilyTransaction(point, HmilyRoleEnum.PROVIDER.getCode(), context.getTransId());
        //cache by guava
        HmilyTransactionGuavaCacheManager.getInstance().cacheHmilyTransaction(hmilyTransaction);
        //publishEvent
        hmilyTransactionEventPublisher.publishEvent(hmilyTransaction, EventTypeEnum.SAVE.getCode());
        //Nested transaction support
        context.setRole(HmilyRoleEnum.LOCAL.getCode());
        HmilyTransactionContextLocal.getInstance().set(context);
        return hmilyTransaction;
    }

6. 继续回到消费方的StarterHmilyTransactionHandler,因为远程的rpc已经调用完毕。也就是returnValue = point.proceed();执行完毕,更新本地事务状态为TRYING(1, “try阶段完成”),开始执行确认操作,如果发生异常则执行取消操作,两者类似,只是操作不一样。都是通过线程池来异步执行,HmilyTransactionExecutor#confirm,更新本地事务状态,因为在每次在执行dubbo的filter的时候都会把参与方的信息记录下来,即updateParticipant,所以这块就用反射操作调用该确认方法,这个也是个RPC调用,之前的流程也会再来一遍。请求到达提供端,首先从HmilyTransactionGuavaCacheManager中获取事务信息,如果没有的话就会从数据库中查询。最后执行相应的操作。

代码语言:javascript
复制
 public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
        LogUtil.debug(LOGGER, () -> "hmily transaction confirm .......!start");
        if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
            return;
        }
        currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
        updateStatus(currentTransaction);
        final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
        List<HmilyParticipant> failList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
        boolean success = true;
        if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
            for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
                try {
                    HmilyTransactionContext context = new HmilyTransactionContext();
                    context.setAction(HmilyActionEnum.CONFIRMING.getCode());
                    context.setRole(HmilyRoleEnum.START.getCode());
                    context.setTransId(hmilyParticipant.getTransId());
                    HmilyTransactionContextLocal.getInstance().set(context);
                    executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
                } catch (Exception e) {
                    LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
                    success = false;
                    failList.add(hmilyParticipant);
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            }
            executeHandler(success, currentTransaction, failList);
        }
    }

7. 如果出现非一致性异常的话,需要保证事务的事务的最后一致性,通过HmilyTransactionSelfRecoveryScheduled定时程序来实现,获取延迟多长时间后的事务信息,只要为了防止并发的时候,刚新增的数据被执行.判断事务信息的角色,如果是提供者并且状态是try刚开始的话,说明本地事务都执行失败等,也不会影响消费方,直接删除日志即可,判断重试次数是否达到上限,判断分布式事务模式为TCC还是CC,如果事务角色是提供者,重试只能由消费执行。最后更新重试次数,继续执行确认或者取消方法。HmilyTransactionRecoveryService具体方法和HmilyTransactionExecutor类似

代码语言:javascript
复制
public void onApplicationEvent(final ContextRefreshedEvent event) {
        hmilyCoordinatorRepository = SpringBeanUtils.getInstance().getBean(HmilyCoordinatorRepository.class);
        this.scheduledExecutorService =
                new ScheduledThreadPoolExecutor(1,
                        HmilyThreadFactory.create("hmily-transaction-self-recovery", true));
        hmilyTransactionRecoveryService = new HmilyTransactionRecoveryService(hmilyCoordinatorRepository);
        selfRecovery();
    }

 /**
     * if have some exception by schedule execute hmily transaction log.
     */
    private void selfRecovery() {
        scheduledExecutorService
                .scheduleWithFixedDelay(() -> {
                    LogUtil.info(LOGGER, "self recovery execute delayTime:{}", () -> hmilyConfig.getScheduledDelay());
                    try {
                        final List<HmilyTransaction> hmilyTransactions = hmilyCoordinatorRepository.listAllByDelay(acquireData());
                        if (CollectionUtils.isEmpty(hmilyTransactions)) {
                            return;
                        }
                        for (HmilyTransaction hmilyTransaction : hmilyTransactions) {
                            // if the try is not completed, no compensation will be provided (to prevent various exceptions in the try phase)
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()) {
                                hmilyCoordinatorRepository.remove(hmilyTransaction.getTransId());
                                continue;
                            }
                            if (hmilyTransaction.getRetriedCount() > hmilyConfig.getRetryMax()) {
                                LogUtil.error(LOGGER, "This transaction exceeds the maximum number of retries and no retries will occur:{}", () -> hmilyTransaction);
                                continue;
                            }
                            if (Objects.equals(hmilyTransaction.getPattern(), PatternEnum.CC.getCode())
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()) {
                                continue;
                            }
                            // if the transaction role is the provider, and the number of retries in the scope class cannot be executed, only by the initiator
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && (hmilyTransaction.getCreateTime().getTime()
                                    + hmilyConfig.getRecoverDelayTime() * hmilyConfig.getLoadFactor() * 1000
                                    > System.currentTimeMillis())) {
                                continue;
                            }
                            try {
                                hmilyTransaction.setRetriedCount(hmilyTransaction.getRetriedCount() + 1);
                                final int rows = hmilyCoordinatorRepository.update(hmilyTransaction);
                                // determine that rows>0 is executed to prevent concurrency when the business side is in cluster mode
                                if (rows > 0) {
                                    if (hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.CANCELING.getCode()) {
                                        hmilyTransactionRecoveryService.cancel(hmilyTransaction);
                                    } else if (hmilyTransaction.getStatus() == HmilyActionEnum.CONFIRMING.getCode()) {
                                        hmilyTransactionRecoveryService.confirm(hmilyTransaction);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                LogUtil.error(LOGGER, "execute recover exception:{}", e::getMessage);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledDelay(), TimeUnit.SECONDS);

    }

    private Date acquireData() {
        return new Date(LocalDateTime.now().atZone(ZoneId.systemDefault())
                .toInstant().toEpochMilli() - (hmilyConfig.getRecoverDelayTime() * 1000));
    }

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/143204.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年5月1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档