前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >利用规则引擎搭建任务编排引擎

利用规则引擎搭建任务编排引擎

作者头像
用户3147702
发布于 2022-06-27 08:49:52
发布于 2022-06-27 08:49:52
2.4K00
代码可运行
举报
运行总次数:0
代码可运行

1. 引言

上一篇文章中,我们介绍了规则引擎的基本算法与使用:

规则引擎从入门到实践

我们看到,规则引擎的基础算法 Rete 算法其实是基于有向无环图的一种算法。

事实上,在实际工作生活中,并不是只有我们的逻辑推理是由有向无环图构成的,复杂的任务编排执行也可以被改造为有向无环图的形式。

2. 示例 -- 文章发布

下面来举个例子,我们需要构建一个博客文章发布系统,用户在这个博客系统中发布文章的流程很简单:

  1. 首先,对文章进行预处理,抽取文章关键字;
  2. 第 1 步完成后,将文章推送给机器学习算法模型自动审核与人工审核;
  3. 如果机器学习算法模型审核通过,按照机器学习算法计算结果,更新文章内容;
  4. 如果人工审核通过,则将文章状态更改为已发布,执行发布流程。

进一步,机器学习算法可能对文章内容进行自动修改,而人工审核可能对博客投稿的频道进行修改,虽然最终是否执行发布流程依赖于人工审核结果,但由于机器学习审核与人工审核是并发进行的,所以 3、4 两步存在竞争条件,需要复杂的加锁、判断逻辑。

我们看到,仅仅是上述四个步骤,就已经让我们的业务代码中出现了难以维护的加锁、判断逻辑,如果接下来又有新的需求:

当发布流程执行完成后,需要将机器学习算法模型计算结果推送给 C 部门。

那么,我们需要在上述流程中补充以下逻辑:

  1. 步骤 3 需要判断是否已经完成人工审核,如果是,则执行推送计算结果给 C 部门操作;如果否,则执行放置缓存操作;
  2. 步骤 4 需要判断缓存中是否已经存在算法计算结果,如果是,则在执行发布流程后执行推送操作。

这个简单的需求,我们来画一个泳道图:

图中用虚线框住的部分存在竞争条件,可见,整个流程变得极为复杂,日常维护、问题排查、后续改进等等都难以进行。

那么,针对这样的复杂场景,有什么办法可以优化吗?

3. 用规则引擎简化流程

3.1 问题复杂的原因

为什么一个看似简单的文章发布系统的例子实现起来却是如此复杂呢?

原因在于我们划分整个流程各步骤的粒度过粗,导致新的逻辑加入时难以应对。

同时,并发场景下,对于竞争条件的保护代码与业务代码耦合,造成了业务代码难以维护的问题。

只要有一个流程编排引擎,让他去处理流程各节点之间的依赖问题,就可以让我们仅仅将目光集中于业务,而不用去为缓存、加锁、判断等逻辑操心了。

3.2 文章发布流程图形化

首先,我们需要绘制出上述文章发布流程中各个任务节点构成的有向无环图:

经过流程编排,我们让后一个节点严格依赖前一个节点,将上述场景的泳道图改造为上述的有向无环图,整个文章发布流程是不是就十分简化了呢?

3.3 代码编写

3.3.1 状态记录

首先,我们需要一个类实例,实现整个编排引擎执行过程中各节点状态的记录:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 package cn.techlog.testjava.main.drools.article_publish;
 
 import java.util.Map;
 
 public class PublishProcess {
     private Integer preProcess;
     private Integer pushMonitor;
     private Integer receiveManualMonitor;
     private Integer receiveMachineMonitor;
     private Integer processManualMonitor;
     private Integer processMachineMonitor;
     private Map<String, String> machineMonitorResult;
 
     public Integer getPreProcess() {
         return preProcess;
     }
 
     public void setPreProcess(Integer preProcess) {
         this.preProcess = preProcess;
     }
 
     public Integer getPushMonitor() {
         return pushMonitor;
     }
 
     public void setPushMonitor(Integer pushMonitor) {
         this.pushMonitor = pushMonitor;
     }
 
     public Integer getReceiveManualMonitor() {
         return receiveManualMonitor;
     }
 
     public void setReceiveManualMonitor(Integer receiveManualMonitor) {
         this.receiveManualMonitor = receiveManualMonitor;
     }
 
     public Integer getReceiveMachineMonitor() {
         return receiveMachineMonitor;
     }
 
     public void setReceiveMachineMonitor(Integer receiveMachineMonitor) {
         this.receiveMachineMonitor = receiveMachineMonitor;
     }
 
     public Integer getProcessManualMonitor() {
         return processManualMonitor;
     }
 
     public void setProcessManualMonitor(Integer processManualMonitor) {
         this.processManualMonitor = processManualMonitor;
     }
 
     public Integer getProcessMachineMonitor() {
         return processMachineMonitor;
     }
 
     public void setProcessMachineMonitor(Integer processMachineMonitor) {
         this.processMachineMonitor = processMachineMonitor;
     }
 
     public Map<String, String> getMachineMonitorResult() {
         return machineMonitorResult;
     }
 
     public void setMachineMonitorResult(Map<String, String> machineMonitorResult) {
         this.machineMonitorResult = machineMonitorResult;
     }
 }

3.3.2 编写规则引擎文件 drl

根据流程编排的有向无环图,就可以完成 drl 文件的编写了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 package cn.techlog.testjava.main.drools.test
 import cn.techlog.testjava.main.drools.article_publish.PublishProcess
 import java.util.Map
 import java.util.HashMap
 
 rule "pre_process"
     when
         $publishProcess: PublishProcess(preProcess == null || preProcess == 0)
     then
         System.out.println("do pre process");
         $publishProcess.setPreProcess(1);
         update($publishProcess)
     end
 
 rule "push_monitor"
     when
         $publishProcess: PublishProcess(preProcess == 1, (pushMonitor == null || pushMonitor == 0))
     then
         System.out.println("do push monitor: push manual monitor and push machine monitor");
         $publishProcess.setPushMonitor(1);
         update($publishProcess)
     end
 
 rule "process_manual_monitor"
     when
         $publishProcess: PublishProcess(receiveManualMonitor == 1, (processManualMonitor == null || processManualMonitor == 0))
     then
         System.out.println("do process manual monitor, update article category and do article publish");
         $publishProcess.setProcessManualMonitor(1);
         update($publishProcess)
     end
 
 rule "process_machine_monitor"
     when
         $publishProcess: PublishProcess(receiveMachineMonitor == 1, processManualMonitor == 1, (processMachineMonitor == null || processMachineMonitor == 0))
     then
         System.out.println("do process machine monitor, update article content and push to C department: " + $publishProcess.getMachineMonitorResult());
         $publishProcess.setProcessMachineMonitor(1);
         update($publishProcess)
     end

3.3.3 编写执行函数

接下来我们来编写程序模拟这一流程的执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 package cn.techlog.testjava.main.drools.article_publish;
 
 import cn.techlog.testjava.main.util.FileUtil;
 import org.kie.api.io.ResourceType;
 import org.kie.api.runtime.StatelessKieSession;
 import org.kie.internal.conf.MultithreadEvaluationOption;
 import org.kie.internal.utils.KieHelper;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
 public class ArticlePublishTest {
     private static final PublishProcess publishProcess = new PublishProcess();
     private static final StatelessKieSession kieSession;
 
     static {
         String drlStr = FileUtil.getFileString("drl/article_publish.drl");
         KieHelper helper = new KieHelper();
         helper.addContent(drlStr, ResourceType.DRL);
         kieSession = helper.build(MultithreadEvaluationOption.YES).newStatelessKieSession();
     }
 
     public static void main(String[] args) {
         FutureTask<String> futureTask1 = new FutureTask<>(ArticlePublishTest::doDrl);
         FutureTask<String> futureTask2 = new FutureTask<>(ArticlePublishTest::startNode);
         Thread thread1 = new Thread(futureTask1);
         Thread thread2 = new Thread(futureTask2);
         thread1.start();
         thread2.start();
     }
 
     private static String startNode() throws InterruptedException {
         Thread.sleep(3000);
         System.out.println("-> receive machine monitor result");
         Map<String, String> result = new HashMap<>();
         result.put("result", "hello world");
         publishProcess.setMachineMonitorResult(result);
         publishProcess.setReceiveMachineMonitor(1);
         kieSession.execute(publishProcess);
 
         Thread.sleep(3000);
         System.out.println("-> receive manual monitor result");
         publishProcess.setReceiveManualMonitor(1);
         kieSession.execute(publishProcess);
         return null;
     }
 
     private static String doDrl() {
         kieSession.execute(publishProcess);
         return null;
     }
 }

3.3.4 执行结果

执行程序,我们看到输出了结果:

do pre process do push monitor: push manual monitor and push machine monitor -> receive machine monitor result -> receive manual monitor result do process manual monitor, update article category and do article publish do process machine monitor, update article content and push to C department: {result=hello world}

虽然在我们的异步接收线程中,机器审核结果早于人工审核结果先被收到,但由于我们的流程编排,机器审核结果的处理则直到人工审核结果处理后才完成执行。

4. 说明

我们看到,在我们的模拟文章发布流程中,我们将复杂、多分支、存在竞争条件的文章发布流程通过规则引擎模拟实现的任务编排引擎成功变成了串行执行,没有竞争条件存在的简单流程。

但这个例子仍然是非常基础的,在实际的场景中,你可能还是会遇到以下这些问题:

4.1 任务重做

在实际场景中,任务的某个节点需要重做是经常让人很头疼的一件事,因为对于线上场景,任务经常是可重入的,否则重复回调等常见情况就会造成你的任务出现问题,但既然任务是可重入的,那怎么让你的任务重新执行一次呢?进行上述改造以后,就不再存在这个问题了,因为编排引擎决定了任务不会被执行两次,如果某个任务需要被重新执行,只需要将状态描述类中对应的字段置为 0,其他不需要执行的任务对应的状态字段置为 1,即可保证仅重新执行该节点,而无需担心其他节点意外受到影响了。

4.2 性能

从性能上来说,规则文件的解析与实例化是非常耗时的,因此,提前 build,例如在项目启动时就完成所有规则的实例化,然后将 kieSession 放在内存中,这样在实际执行的过程中,性能会有明显提升。另一方面,不要在单个规则中加入复杂的判断逻辑,对于复杂场景,拆分成多个 rule 可以有效提升性能,同时,不要在 then 块中进行判断,所有的判断应该都放在 when 块中。

4.3 并发执行

显然,生产环境中要比上述 demo 更加复杂,最基本的一点,线上环境中,各个任务不会都在同一台机器上执行,同时,接收到异步回调的节点也会分布在不同的服务器上,虽然通过流程编排,解决了业务代码中的竞争条件,极大地简化了整个流程,但任务的状态描述结构仍然需要在分布式环境中共享,这就需要一个中心化的缓存,同时,分布式环境下,对任务状态对象中字段的修改也同样存在着竞争条件,因此最好的方法是将这个状态对象的缓存与竞争条件的加锁逻辑封装为一个新的工程框架,这就是一个任务编排引擎。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. 示例 -- 文章发布
  • 3. 用规则引擎简化流程
    • 3.1 问题复杂的原因
    • 3.2 文章发布流程图形化
    • 3.3 代码编写
      • 3.3.1 状态记录
      • 3.3.2 编写规则引擎文件 drl
      • 3.3.3 编写执行函数
      • 3.3.4 执行结果
  • 4. 说明
    • 4.1 任务重做
    • 4.2 性能
    • 4.3 并发执行
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档