
某天,团队的新来的很爱问问题的小伙伴突然问我:怎么在Koa服务中处理接收上传的文件数据?
我答:这个简单,你在koa-body里配一下multipart,然后在ctx.request.files取一下
他又问:好的。为什么配置了multipart就可以在ctx.request.files拿到呢?
我又答:因为koa-body帮你处理了
他再问:好的。那它是怎么处理的呢?
我:**...我去看下源码再来回答你**
通过前面刨根问底的对话,我们可以提炼出三个问题:
koa-body
multipart配置(注:更多细节在formidable配置中)
前两个问题,我们现在已经有了答案。作为一个开发者,在掌握了WHAT和HOW之后,我们更应该沉下心来,对WHY背后的原理进行探索。要知其然更知其所以然。
那么,对于这类原理性的问题,我们的思路不用多说:看源码
我们分析NPM依赖肯定要从入口文件进行分析,既从package.json中的main字段开始,一般来说都是index.js

根据Koa.js的中间件实现规范,我们可以从上面的代码中了解到:
requestBody方法返回的function才是是真正执行的中间件requestBody方法会初始化相关配置
接下来我们来看中间件真正实现的代码。
由于屏幕原因我对一些无关的代码进行了折叠,我们重点看红框处。
opts.multipart(配置验证)与ctx.is('multipart')(请求头验证)都为true时,判断为文件上传场景,调用formy方法formy方法返回的promise实例resolved时,将promise实例返回的数据附加到ctx.request.body及ctx.request.files上这下WHY的后半部分谜底解开了:在真实处理逻辑的promise实例resolved后,koa-body会将返回的数据附加在ctx.request上
通过上一部分的截图,我们可以看到对文件解析的逻辑都在formy方法中。那么这个方法是怎么实现的呢。
话不多说,上代码
function formy(ctx, opts) {
return new Promise(function (resolve, reject) {
var fields = {};
var files = {};
var form = new forms.IncomingForm(opts);
form.on('end', function () {
return resolve({
fields: fields,
files: files
});
}).on('error', function (err) {
return reject(err);
}).on('field', function (field, value) {
if (fields[field]) {
if (Array.isArray(fields[field])) {
fields[field].push(value);
} else {
fields[field] = [fields[field], value];
}
} else {
fields[field] = value;
}
}).on('file', function (field, file) {
if (files[field]) {
if (Array.isArray(files[field])) {
files[field].push(file);
} else {
files[field] = [files[field], file];
}
} else {
files[field] = file;
}
});
if (opts.onFileBegin) {
form.on('fileBegin', opts.onFileBegin);
}
form.parse(ctx.req);
});
}逻辑不多,我们再来总结一下。
IncomingForm实例,传入formidable对应的配置parse方法,并监听end、error、file、field等事件end事件的回调中,进行resolve那么forms.IncomingForm是哪来的呢?
const forms = require('formidable');原来是koa-body引用的第三方依赖formidable
这下我们明确了,**ctx.request对象上附加的数据是在formidable.IncomingForm实例中进行处理,通过file、field等事件回调进行接收,最后在end事件回调中进行返回的**。
通过前面的分析,我们知道了 koa-body 对于文件的处理是引用的 formidable。我们还是从入口文件进行分析。

入口代码非常简单,核心逻辑看来都在Formidable中

先来对Formidable.js有一个宏观印象:
IncomingForm 类IncomingForm继承了EventEmitterIncomingForm定义了诸多方法通过继承EventEmitter,IncomingForm实例可以在合适的时机触发相关事件并监听处理
回到上文,IncomingForm实例调用了parse方法。我们从parse方法开始入手分析。

通过红框处逻辑我们可以看到,parse方法的职责主要有两个:
req参数的data事件,处理数据。通过前面传入的参数,我们知道 req参数就是ctx.req,Node.js原生request对象。
这下谜底又解开了一部分,**koa-body是怎么拿到上传的文件数据的呢?通过监听Node.js原生request对象的data事件**
本节涉及到很多方法嵌套调用,我统称为write过程。
writeHeaders方法的调用链是这样的(代码比较长,大家感兴趣的可以自行去看源码):
writeHeaders() -> _parseContentType() -> initMultipart() -> MultipartParser
经过一串复杂的调用,我们完成了writeHeaders方法的使命:解析请求header,设置parser

我们可以看到,定义的MultipartParser类继承了Transform流
我们简单看下官方文档对于Tranform流的定义

通过定义的_transform方法,我们可以对写进流的buffer进行处理
_transform(buffer, _, done) {
let i = 0;
let prevIndex = this.index;
let { index, state, flags } = this;
const { lookbehind, boundary, boundaryChars } = this;
const boundaryLength = boundary.length;
const boundaryEnd = boundaryLength - 1;
this.bufferLength = buffer.length;
let c = null;
let cl = null;
const setMark = (name, idx) => {
this[`${name}Mark`] = typeof idx === 'number' ? idx : i;
};
const clearMarkSymbol = (name) => {
delete this[`${name}Mark`];
};
const dataCallback = (name, shouldClear) => {
const markSymbol = `${name}Mark`;
if (!(markSymbol in this)) {
return;
}
if (!shouldClear) {
this._handleCallback(name, buffer, this[markSymbol], buffer.length);
setMark(name, 0);
} else {
this._handleCallback(name, buffer, this[markSymbol], i);
clearMarkSymbol(name);
}
};
for (i = 0; i < this.bufferLength; i++) {
c = buffer[i];
switch (state) {
case STATE.PARSER_UNINITIALIZED:
return i;
case STATE.START:
index = 0;
state = STATE.START_BOUNDARY;
case STATE.START_BOUNDARY:
if (index === boundary.length - 2) {
if (c === HYPHEN) {
flags |= FBOUNDARY.LAST_BOUNDARY;
} else if (c !== CR) {
return i;
}
index++;
break;
} else if (index - 1 === boundary.length - 2) {
if (flags & FBOUNDARY.LAST_BOUNDARY && c === HYPHEN) {
this._handleCallback('end');
state = STATE.END;
flags = 0;
} else if (!(flags & FBOUNDARY.LAST_BOUNDARY) && c === LF) {
index = 0;
this._handleCallback('partBegin');
state = STATE.HEADER_FIELD_START;
} else {
return i;
}
break;
}
if (c !== boundary[index + 2]) {
index = -2;
}
if (c === boundary[index + 2]) {
index++;
}
break;
case STATE.HEADER_FIELD_START:
state = STATE.HEADER_FIELD;
setMark('headerField');
index = 0;
case STATE.HEADER_FIELD:
if (c === CR) {
clearMarkSymbol('headerField');
state = STATE.HEADERS_ALMOST_DONE;
break;
}
index++;
if (c === HYPHEN) {
break;
}
if (c === COLON) {
if (index === 1) {
// empty header field
return i;
}
dataCallback('headerField', true);
state = STATE.HEADER_VALUE_START;
break;
}
cl = lower(c);
if (cl < A || cl > Z) {
return i;
}
break;
case STATE.HEADER_VALUE_START:
if (c === SPACE) {
break;
}
setMark('headerValue');
state = STATE.HEADER_VALUE;
case STATE.HEADER_VALUE:
if (c === CR) {
dataCallback('headerValue', true);
this._handleCallback('headerEnd');
state = STATE.HEADER_VALUE_ALMOST_DONE;
}
break;
case STATE.HEADER_VALUE_ALMOST_DONE:
if (c !== LF) {
return i;
}
state = STATE.HEADER_FIELD_START;
break;
case STATE.HEADERS_ALMOST_DONE:
if (c !== LF) {
return i;
}
this._handleCallback('headersEnd');
state = STATE.PART_DATA_START;
break;
case STATE.PART_DATA_START:
state = STATE.PART_DATA;
setMark('partData');
case STATE.PART_DATA:
prevIndex = index;
if (index === 0) {
// boyer-moore derrived algorithm to safely skip non-boundary data
i += boundaryEnd;
while (i < this.bufferLength && !(buffer[i] in boundaryChars)) {
i += boundaryLength;
}
i -= boundaryEnd;
c = buffer[i];
}
if (index < boundary.length) {
if (boundary[index] === c) {
if (index === 0) {
dataCallback('partData', true);
}
index++;
} else {
index = 0;
}
} else if (index === boundary.length) {
index++;
if (c === CR) {
// CR = part boundary
flags |= FBOUNDARY.PART_BOUNDARY;
} else if (c === HYPHEN) {
// HYPHEN = end boundary
flags |= FBOUNDARY.LAST_BOUNDARY;
} else {
index = 0;
}
} else if (index - 1 === boundary.length) {
if (flags & FBOUNDARY.PART_BOUNDARY) {
index = 0;
if (c === LF) {
// unset the PART_BOUNDARY flag
flags &= ~FBOUNDARY.PART_BOUNDARY;
this._handleCallback('partEnd');
this._handleCallback('partBegin');
state = STATE.HEADER_FIELD_START;
break;
}
} else if (flags & FBOUNDARY.LAST_BOUNDARY) {
if (c === HYPHEN) {
this._handleCallback('partEnd');
this._handleCallback('end');
state = STATE.END;
flags = 0;
} else {
index = 0;
}
} else {
index = 0;
}
}
if (index > 0) {
// when matching a possible boundary, keep a lookbehind reference
// in case it turns out to be a false lead
lookbehind[index - 1] = c;
} else if (prevIndex > 0) {
// if our boundary turned out to be rubbish, the captured lookbehind
// belongs to partData
this._handleCallback('partData', lookbehind, 0, prevIndex);
prevIndex = 0;
setMark('partData');
// reconsider the current character even so it interrupted the sequence
// it could be the beginning of a new sequence
i--;
}
break;
case STATE.END:
break;
default:
return i;
}
}
dataCallback('headerField');
dataCallback('headerValue');
dataCallback('partData');
this.index = index;
this.state = state;
this.flags = flags;
done();
return this.bufferLength;
}_transform的逻辑也很简单:遍历buffer,在不同的阶段进行不同的处理并通过_handleCallback方法发出不同的事件进而触发对应的回调。
注:_handleCallback方法实现很有意思,有兴趣的同学可以自己看下
上文我们说到,当parser流被写入数据时,会调用_transform方法进行处理,同时触发各个周期事件的回调。
事件回调的代码如图所示:

我们需要关注的是headersEnd事件,在headsEnd事件的回调中,调用了IncomingForm实例的onPart方法
为什么说this.onPart调用的是IncomingForm实例的方法呢,可以看下前面的代码,有一步call绑定this的操作
在层层嵌套调用中,我们终于回到了IncomingForm的逻辑里,可喜可贺。
我们再来回顾一下write过程:
this.writeHeaders(req.headers); - 设置parserreq.on('data') - 调用parser的write方法,向流中写入数据parser.on('headersEnd') - headers解析完毕,调用this.onPart方法_handlePart 方法源码分析当headers解析完毕之后,我们会调用this.onPart方法。而this.onPart方法的核心逻辑都在_handlePart方法中。接下来我们来对_handlePart源码进行分析
_handlePart(part) {
if (part.originalFilename && typeof part.originalFilename !== 'string') {
this._error(
new FormidableError(
`the part.originalFilename should be string when it exists`,
errors.filenameNotString,
),
);
return;
}
if (!part.mimetype) {
let value = '';
const decoder = new StringDecoder(
part.transferEncoding || this.options.encoding,
);
part.on('data', (buffer) => {
this._fieldsSize += buffer.length;
if (this._fieldsSize > this.options.maxFieldsSize) {
this._error(
new FormidableError(
`options.maxFieldsSize (${this.options.maxFieldsSize} bytes) exceeded, received ${this._fieldsSize} bytes of field data`,
errors.maxFieldsSizeExceeded,
413, // Payload Too Large
),
);
return;
}
value += decoder.write(buffer);
});
part.on('end', () => {
this.emit('field', part.name, value);
});
return;
}
if (!this.options.filter(part)) {
return;
}
this._flushing += 1;
const newFilename = this._getNewName(part);
const filepath = this._joinDirectoryName(newFilename);
// new 了一个 File 实例
const file = this._newFile({
newFilename,
filepath,
originalFilename: part.originalFilename,
mimetype: part.mimetype,
});
file.on('error', (err) => {
this._error(err);
});
this.emit('fileBegin', part.name, file);
// 调用 open 方法
file.open();
this.openedFiles.push(file);
// 监听 data 时间
part.on('data', (buffer) => {
this._fileSize += buffer.length;
if (this._fileSize < this.options.minFileSize) {
this._error(
new FormidableError(
`options.minFileSize (${this.options.minFileSize} bytes) inferior, received ${this._fileSize} bytes of file data`,
errors.smallerThanMinFileSize,
400,
),
);
return;
}
if (this._fileSize > this.options.maxFileSize) {
this._error(
new FormidableError(
`options.maxFileSize (${this.options.maxFileSize} bytes) exceeded, received ${this._fileSize} bytes of file data`,
errors.biggerThanMaxFileSize,
413,
),
);
return;
}
if (buffer.length === 0) {
return;
}
this.pause();
// 写入 file
file.write(buffer, () => {
this.resume();
});
});
// 监听 end 事件
part.on('end', () => {
if (!this.options.allowEmptyFiles && this._fileSize === 0) {
this._error(
new FormidableError(
`options.allowEmptyFiles is false, file size should be greather than 0`,
errors.noEmptyFiles,
400,
),
);
return;
}
// 关闭事件
file.end(() => {
this._flushing -= 1;
this.emit('file', part.name, file);
this._maybeEnd();
});
});
}通过我在代码关键处加的注释,我们可以看到_handlePart方法的核心逻辑如下:
new 一个 File 对象的实例 filefilepart 的 data 事件,将数据写入 file 中part 的 end 事件,关闭 file在解读这段核心逻辑前,我们需要明确两个重要信息:
part 是什么?
part是在initMultipart方法中创建的可读流,通过data事件向外界传输数据

File 对象的实例 file 是什么?
是根据传入的filePath创建的可写流

明白了这两个前提,这下我们终于明白了!
_handlePart方法就是打开一个文件流,把parser解析出的数据通过文件流写入文件,然后关闭。
我们的分析终于来到了尾声!
整体流程如何结束呢?
我们注意到有一个方法叫_maybeEnd,当满足条件时,会触发end事件

我们选取this._flushing变量进行分析,如何满足条件。
可以看到,this._flushing初始值为0,当_handlePart方法执行时,this._flushing+=1;当file关闭时,this._flushing-=1。
由此我们可以分析出,当上传的文件都处理完毕时,this._flushing变量恢复为0,满足条件,触发end事件
大家看到end事件应该很兴奋,因为我们终于走到结束了。
end事件被谁接收,相信大家心里已经有数了。
没错,我们回到了koa-body的代码中。后面的流程如下:
formy方法返回的promise实例中监听到IncomingForm实例发出的end事件,promise实例进行resolve 
promise实例的then中接收到resolve返回的数据,附加到ctx.request对象上 
next()本文带领大家从一个文件上传的例子入手,分析了koa-body及formidable中关于处理文件上传的核心逻辑。
对于我们前面遗留的问题,相信大家此时已经有了答案。
简单回答,koa-body是如何处理文件上传的呢?
答:
req.on('data')获取数据header,解析boundary除了对koa-body文件上传流程有了清晰的了解之外,在整体探索分析过程中,我们还应该有一些别的收获,比如
node_modules中下载的代码格式不同,二者对比阅读有奇效EventEmitter是通信神器,这种思路可以利用到业务代码中