大家好,我是码农小余。很久不见,怪是想念。
最近在整一个 OpenAPI 编排器,想到 npm-run-all 的任务流。看了一下这个 6 年前的源码。npm-run-all[1] 是一个用来并行或者串行运行多个 npm 脚本的 CLI 工具。阅读完本文,你能收获到:
直入主题,整个 npm-run-all 的整体执行流程如下:
当我们在终端敲入命令,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap 去分发不同命令不同参数的逻辑。help 和 version 比较简单,本文不做分析。任务控制方面,会先调用 npmRunAll 做参数解析,然后执行 runTasks 执行任务组中任务,全部任务执行后返回结果,结束整个流程。
npm-run-all
包支持三条命令,我们看到源码根目录的 package.json 文件:
{
"name": "npm-run-all",
"version": "4.1.5",
"description": "A CLI tool to run multiple npm-scripts in parallel or sequential.",
"bin": {
"run-p": "bin/run-p/index.js",
"run-s": "bin/run-s/index.js",
"npm-run-all": "bin/npm-run-all/index.js"
},
"main": "lib/index.js",
"files": [
"bin",
"lib",
"docs"
],
"engines": {
"node": ">= 4"
}
}
bin 下面定义的命令脚本:
直接看到 bin/npm-run-all/index.js
:
require("../common/bootstrap")("npm-run-all")
上述代码中,如果是执行 run-p 这条命令,则函数传入的参数是 run-p
,run-s
同理。bootstrap 通过参数的不同,将任务分发到 bin 下不同目录中:
.
├── common
│ ├── bootstrap.js
│ ├── parse-cli-args.js
│ └── version.js
├── npm-run-all
│ ├── help.js
│ ├── index.js
│ └── main.js
├── run-p
│ ├── help.js
│ ├── index.js
│ └── main.js
└── run-s
├── help.js
├── index.js
└── main.js
照着上述代码结构,结合 ../common/bootstrap
代码:
"use strict"
module.exports = function bootstrap(name) {
const argv = process.argv.slice(2)
switch (argv[0]) {
case undefined:
case "-h":
case "--help":
return require(`../${name}/help`)(process.stdout)
case "-v":
case "--version":
return require("./version")(process.stdout)
default:
// https://github.com/mysticatea/npm-run-all/issues/105
// Avoid MaxListenersExceededWarnings.
process.stdout.setMaxListeners(0)
process.stderr.setMaxListeners(0)
process.stdin.setMaxListeners(0)
// Main
return require(`../${name}/main`)(
argv,
process.stdout,
process.stderr
).then(
() => {
// I'm not sure why, but maybe the process never exits
// on Git Bash (MINGW64)
process.exit(0)
},
() => {
process.exit(1)
}
)
}
}
bootstrap 函数依据调用的命令调用不同目录下的 help、version 或者调用 main 函数,达到了差异消除的效果。
然后再把目光放在 process.stdout.setMaxListeners(0)
这是啥玩意?打开 issue 链接[2],通过报错信息和翻阅官方文档:
By default
EventEmitter
s will print a warning if more than10
listeners are added for a particular event. This is a useful default that helps finding memory leaks. Theemitter.setMaxListeners()
method allows the limit to be modified for this specificEventEmitter
instance. The value can be set toInfinity
(or0
) to indicate an unlimited number of listeners. 默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告。这是一个有用的默认值,有助于发现内存泄漏。emitter.setMaxListeners() 方法允许为这个特定的 EventEmitter 实例修改限制。该值可以设置为 Infinity(或 0)以指示无限数量的侦听器。
为什么要处理这个情况呢?因为用户可能这么使用:
$ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11
你永远想象不到用户会怎么使用你的工具!
分析完不同命令的控制逻辑,我们进入核心的 npmRunAll 函数,参数解析部分逻辑如下:
module.exports = function npmRunAll(args, stdout, stderr) {
try {
const stdin = process.stdin
const argv = parseCLIArgs(args)
} catch () {
// ...
}
}
解析处理所有标准输入流参数,最终生成并返回 ArgumentSet 实例 set。parseCLIArgsCore 只看控制任务流执行的参数:
function addGroup(groups, initialValues) {
groups.push(Object.assign(
{ parallel: false, patterns: [] },
initialValues || {}
))
}
function parseCLIArgsCore(set, args) {
LOOP:
for (let i = 0; i < args.length; ++i) {
const arg = args[i]
switch (arg) {
// ...
case "-s":
case "--sequential":
case "--serial":
if (set.singleMode && arg === "-s") {
set.silent = true
break
}
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups)
break
case "-p":
case "--parallel":
if (set.singleMode) {
throw new Error(`Invalid Option: ${arg}`)
}
addGroup(set.groups, { parallel: true })
break
default: {
// ...
break
}
}
}
// ...
return set
}
将任务都装到 groups
数组中,如果是并行任务(传了 -p
、--parallel
参数),就给任务加上 { parallel: true }
标记。默认是 { parallel: false }
,即串行任务。
在进入这一小节之前,我们就 npm-run-all 源码在 scripts 下加一条 debug 命令:
$ "node ./bin/npm-run-all/index.js lint test"
解析完参数生成的 argv.groups
如下:
[{
paralles: false,
patterns: ['lint', 'test']
}]
有了这个数组结果,我们再看任务执行流程会更加明朗。
// bin/npm-run-all/main.js
module.exports = function npmRunAll(args, stdout, stderr) {
try {
// 省略解析参数
// 执行任务
const promise = argv.groups.reduce(
(prev, group) => {
// 分组中没有任务,直接返回 null
if (group.patterns.length === 0) {
return prev
}
return prev.then(() => runAll(
group.patterns, // ['lint', 'test']
{
// ……
// 是否并行执行
parallel: group.parallel,
// 并行的最大数量
maxParallel: group.parallel ? argv.maxParallel : 1,
// 一个任务失败后继续执行其他任务
// ……
arguments: argv.rest,
// 这个📌用于当任务以0码退出时,终止全部任务
race: group.parallel && argv.race,
//……
}
))
},
Promise.resolve(null)
)
// ...
return promise
}
catch (err) {
//eslint-disable-next-line no-console
console.error("ERROR:", err.message)
return Promise.reject(err)
}
}
上述代码解析完命令行中的参数之后,通过 reduce 拼接所有任务组的结果。任务组就是 npm-run-all 支持同时配置并行和串行的任务,并生成多个任务组。例如:
$ npm-run-all -p a b -s c d e
上述命令会生成两个任务组,并行任务组是 ['a', 'b'],串行任务组是 ['c', 'd', 'e']。就会执行两次 runAll
。接下来就看看单个任务组的执行逻辑:
// lib/index.js
module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
// 省略一系列参数格式化和默认值处理……
try {
const patterns = parsePatterns(patternOrPatterns, args)
// 省略非法参数报错和参数之间的校验……
return Promise.resolve()
.then(() => {
if (taskList != null) {
return { taskList, packageInfo: null }
}
return readPackageJson()
})
.then(x => {
// 从 package.json 中匹配任务
// 🌰中 patterns 是 ['lint', 'tests'],所以 lint 和 test 这两个任务一定要从 package.json 的 scripts 中能查看到
const tasks = matchTasks(x.taskList, patterns)
return runTasks(tasks, {
// 省略上面格式化和校验后的参数……
})
})
}
catch (err) {
return Promise.reject(new Error(err.message))
}
}
上述代码将参数的处理和校验全部省略掉了,看到核心的逻辑 matchTasks
和 runTasks
。matchTasks
通过读取 package.json
下 scripts
中的命令,然后判断任务组 patterns
中的任务是否都存在于 taskList
中。
然后就来到了本小节的核心逻辑——调用 runTasks
依次执行每个任务组中的任务:
module.exports = function runTasks(tasks, options) {
return new Promise((resolve, reject) => {
// 任务组中不存在任务,直接返回空数组
if (tasks.length === 0) {
resolve([])
return
}
// 结果数组
const results = tasks.map(task => ({ name: task, code: undefined }))
// 任务队列
const queue = tasks.map((task, index) => ({ name: task, index }))
// 用于判断并行的时候任务是否全部完成
const promises = []
let error = null
let aborted = false
/**
* Done.
* @returns {void}
*/
function done() {
// ……
}
/**
* Aborts all tasks.
* @returns {void}
*/
function abort() {
// ……
}
/**
* Runs a next task.
* @returns {void}
*/
function next() {
// 任务被终止了,则不需要再往下执行了
if (aborted) {
return
}
// 并行时,只有满足 queue 和 promises 的长度都为 0,才可以判断任务组完成
if (queue.length === 0) {
if (promises.length === 0) {
done()
}
return
}
const task = queue.shift()
const promise = runTask(task.name, optionsClone)
promises.push(promise)
promise.then(
(result) => {
// 完成一个任务,就将其从 promises 中删除
remove(promises, promise)
if (aborted) {
return
}
if (result.code) {
error = new NpmRunAllError(result, results)
// 失败后不继续执行后续的任务,则直接终止整个任务组
if (!options.continueOnError) {
abort()
return
}
}
// Aborts all tasks if options.race is true.
if (options.race && !result.code) {
abort()
return
}
// Call the next task.
next()
},
(thisError) => {
remove(promises, promise)
if (!options.continueOnError || options.race) {
error = thisError
abort()
return
}
next()
}
)
}
// 最大并发数
const max = options.maxParallel
// 对比任务数量、配置的并发数、取较小者。这是为了防止配置的 maxParallel 比实际执行的任务数量还大的情况
const end = (typeof max === "number" && max > 0)
? Math.min(tasks.length, max)
: tasks.length
for (let i = 0; i < end; ++i) {
next()
}
})
}
通过队列,依次执行组中的每一条任务,任务成功后将结果存入 result,然后调用 next 执行下一个任务;可以通过 abort 终止全部任务;通过 done 完成整个队列的状态更新,并将结果返回。
接下来,通过一张图和本小节的示例,来更好地理解串行机制:
从左到右得流程,讲解一下:
aggregateOutput
参数,会将任务的输出流写入到内存流;并行机制的不同就在于初始的时候会调用多次 next 函数,并且会判断当前是否还有正在执行的任务。
上图是我们执行以下命令的流程图:
$ node ./bin/npm-run-all/index.js -p lint test --max-parallel 2
命令的意思是并行执行 lint 和 test 任务,并且最大并发数是 2。
回到上面的流程图:
本节我们学习了任务组中的任务不管是串行机制还是并行机制,都通过任务队列依次执行。不同的是,串行是首次只执行一次 next,并行根据参数执行多次 next。当满足队列为空并且所有任务都完成,就结束当前任务组,并将缓存在 results 中的结果返回。
了解完任务组的串行和并行机制,这一小节就来了解单个任务是如何被执行的。
module.exports = function runTask(task, options) {
let cp = null
const promise = new Promise((resolve, reject) => {
// 包装输出、输入、错误信息流……
// 在输出流中写入任务名称
if (options.printName && stdout != null) {
stdout.write(createHeader(
task,
options.packageInfo,
options.stdout.isTTY
))
}
// 执行命令的 npm 路径,npm-cli 的路径
const npmPath = options.npmPath || process.env.npm_execpath
const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
// 执行路径,一般是全局的 bin/node 路径
const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
// 判断是不是 yarn
const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
const spawnArgs = ["run"]
if (npmPathIsJs) {
spawnArgs.unshift(npmPath)
}
if (!isYarn) {
Array.prototype.push.apply(spawnArgs, options.prefixOptions)
}
else if (options.prefixOptions.indexOf("--silent") !== -1) {
spawnArgs.push("--silent")
}
Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))
// 执行命令
cp = spawn(execPath, spawnArgs, spawnOptions)
// 省略输出流格式化……
// Register
cp.on("error", (err) => {
cp = null
reject(err)
})
cp.on("close", (code, signal) => {
cp = null
// 成功后返回任务名称、状态
resolve({ task, code, signal })
})
})
// 给当前的promise挂上静态方法,用于结束当前子进程任务
promise.abort = function abort() {
if (cp != null) {
cp.kill()
cp = null
}
}
return promise
}
runTask 做了四件事:
有人会问为什么要去看一个 6 年前写的源码?语法老旧、甚至还能看到标记声明[3] 。但我想表达的是,npm-run-all
这个包的核心逻辑——通过任务队列去实现串行和并行的任务流模型是非常经典的,类似 continueOnError、race 这样的逻辑控制节点也随处可见。例如当前非常先进的构建系统 Turborepo[4] ,它对 pipeline 的控制逻辑也能复用这个任务流模型。
[1]
npm-run-all: https://www.npmjs.com/package/npm-run-all
[2]
issue 链接: https://github.com/mysticatea/npm-run-all/issues/105
[3]
标记语法: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/label#browser_compatibility
[4]
Turborepo: https://turborepo.org/docs/core-concepts/pipelines