前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elsa V3学习之Flowchart详解(下)

Elsa V3学习之Flowchart详解(下)

作者头像
饭勺oO
发布2024-08-20 08:56:55
740
发布2024-08-20 08:56:55
举报
文章被收录于专栏:饭勺oO的技术博客

接上文,我们介绍了Flowchart的部分逻辑,下来来讲解flowchart剩下的逻辑。

OnChildCompletedAsync

看下OnChildCompletedAsync的代码。

代码语言:javascript
复制
    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
    {
        var logger = context.GetRequiredService<ILogger<Flowchart>>();
        var flowchartContext = context.TargetContext;
        var completedActivityContext = context.ChildContext;
        var completedActivity = completedActivityContext.Activity;
        var result = context.Result;

        // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
        var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
        var outcomeNames = result is Outcomes outcomes
            ? outcomes.Names
            : [null!, "Done"];

        // Only query the outbound connections if the completed activity wasn't already completed.
        var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
        var children = outboundConnections.Select(x => x.Target.Activity).ToList();
        var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());

        scope.RegisterActivityExecution(completedActivity);

        // If the complete activity is a terminal node, complete the flowchart immediately.
        if (completedActivity is ITerminalNode)
        {
            await flowchartContext.CompleteActivityAsync();
        }
        else if (scheduleChildren)
        {
            if (children.Any())
            {
                // Schedule each child, but only if all of its left inbound activities have already executed.
                foreach (var activity in children)
                {
                    var existingActivity = scope.ContainsActivity(activity);
                    scope.AddActivity(activity);

                    var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

                    // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                    if (!inboundActivities.Contains(completedActivity))
                    {
                        await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                        continue;
                    }

                    // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                    if (activity is not IJoinNode)
                    {
                        var executionCount = scope.GetExecutionCount(activity);
                        var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);

                        if (haveInboundActivitiesExecuted) 
                            await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                    }
                    else
                    {
                        // Select an existing activity execution context for this activity, if any.
                        var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
                            x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
                        var scheduleWorkOptions = new ScheduleWorkOptions
                        {
                            CompletionCallback = OnChildCompletedAsync,
                            ExistingActivityExecutionContext = joinContext,
                            PreventDuplicateScheduling = true
                        };

                        if (joinContext != null)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
                        else if (!existingActivity)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
                        else
                        {
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
                            continue;
                        }

                        await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
                    }
                }
            }

            if (!children.Any())
            {
                await CompleteIfNoPendingWorkAsync(flowchartContext);
            }
        }

        flowchartContext.SetProperty(ScopeProperty, scope);
    }

从第11行开始,这里先做了下判断,判断执行结束的节点状态是否是已完成。如果不是已完成,则不会再往下执行,节点执行结束并不表示节点执行完成,这个节点可能处于异常,或者暂停状态。

第17-18行的代码表示,获取已完成的节点的后续所有节点。如果这个节点的出口连接了多个节点,那么这里的children会有多个节点。

第19行和21行则是获取当前工作流的上下文的一个Scope状态。并将已执行过的节点做记录。

再接下来有个判断,判断执行完成的节点是否是ITerminalNode类型,如果是则直接完成整个工作流。继承ITerminalNode的节点都将是流程的终结节点。

然后才是判断scheduleChildren的结果,如果不是已完成的状态,则保存一下scope状态,不继续执行后续流程。 如果节点已完成,则继续判断children是否有值,如果没有,说明后续没有连线的节点,那么就会继续判断节点是否处于挂起状态或者异常状态,如果都没有,则结束工作流程。

代码语言:javascript
复制
private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
{
    var hasPendingWork = HasPendingWork(context);

    if (!hasPendingWork)
    {
        var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);

        if (!hasFaultedActivities)
        {
            await context.CompleteActivityAsync();
        }
    }
}

如果children有值。那么将遍历children,并执行该Activity。 执行Children Activity时,首先在scope记录该节点。 然后判断已完成的活动在不在左侧入口的路线中,如果不在,则执行Activity。

代码语言:javascript
复制
var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

// If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
if (!inboundActivities.Contains(completedActivity))
{
    await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
    continue;
}

如果在,那么再继续判断当前Activity是否属于IJoinNode类型,如果是IJoinNode类型,那么需要等待其左侧所有连接的节点执行结束后再继续执行。 如果不是,那么继续判断其左侧节点的执行次数是否大于当前节点,满足条件则继续执行。

这里可以发现,每一步的ScheduleActivityAsync都会继续把OnChildCompletedAsync传递下去,使用递归的方式执行我们的工作流,知道工作流程结束。

到这我们就基本理清楚我们的flowchart的执行逻辑了。

Signal

然后我们回过头看flowchart的构造函数。

代码语言:javascript
复制
public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
{
    OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
    OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
    OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
}

可以看到有几个信号接收的订阅。这里的Signal用于在工作流执行的过程中接收到的外部信号,并对应作出处理,这里最简单的是CancelSignal,当flowchart的执行过程中,如果收到这个信号,那么将立即完成执行工作流。

代码语言:javascript
复制
    private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
    {
        await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
    }

详细的Signal的执行逻辑,我们将在后续文章中继续介绍。flowchart介绍先到此结束

结语

通过两篇文章,我们基本理清楚了我们编排后的工作流的运行逻辑。希望对小伙伴们有所帮助。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-08-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • OnChildCompletedAsync
  • Signal
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档