前言
最近,明学是一个火热的话题,而我,却也想当那么一回明学家,那就是,把JavaScript和多线程并发这两个八竿子打不找的东西,给硬凑了起来,还写了一个并发库concurrent-thread-js。尴尬的是,当我发现其中的不合理之处,即这个东东的应用场景究竟是什么时,我发现我已经把代码写完了。
⚠️注意! 本文中的线程指的都是用JS异步函数模拟的“假线程”,不是真正意义上的多线程,请不要误解⚠️
https://github.com/penghuwan/concurrent-thread.js
事实上,这个库用处很小,但是在写的过程中,我对Promise,Async函数以及event事件流的使用产生了新的认识,同时也逐渐去学习和了解怎么去从零开始去写一个非业务的,通用的npm模块,所以希望拿出来和大家分享一下,这才是本文的真正的目的。
好,我们从一个故事开始。
场景一
场景二
https://github.com/penghuwan/concurrent-thread.jsgithub.com
注意!倘若不考虑webworker这种解决方案,我们一般都认为JS是单线程的。
为单线程的JavaScript实现并发协调的功能,语意,命名和作用性质上参考Java的实现,提sleep/join/interupt等API以及锁和条件变量等内容,并提供线程间通信的功能,依赖ES6语法,基于Promise和Async函数实现,故需要Babel编译才能运行。JavaScrpt本来就是单线程的,所以这只是在API的层面实现了模拟,在下文的介绍中,每条所谓的线程其实就是普通的异步函数,并在此基础上实现不同线程的协调配合。
没错,一般来说JS中模拟多线程我们也许会选用webworker,但是它必须要求你手动创建额外的webworker脚本文件,并通过new work('work.js')这种方式使用,这并不能达到我项目中想要的API的效果,而且注意:webwork中的环境不是window!很多方法你调用不了的。你只能采取这种方案,也即在主线程完成该功能,这是我没有选择webworker的另一个原因。
说是这样说,但其实在大多数时候还是用webworker就够了
这个问题真是灵魂拷问,可是既然代码写都写了,我怎么也得编一个理由出来啊!额。。。让我想想哈
它的作用是:当JS工程需要让两个函数在执行上不互相干扰,同时也不希望它们会阻塞主线程,与此同时,还希望这两个函数实现类似并发多线程之间的协调的需求的时候,你可以使用这个并发模拟库,实际上这种应用场景。。。这尼玛有这种应用场景吗?!(扎心了呀)。
三个类:ThreadPool,Lock和Condition
我们的API分别写入三个类中,分别是
注:以下所说的"线程"都是指JS中模拟的异步函数
submit模拟提交线程至线程池
// 备注:为循序渐进介绍,以下为简化代码
// 存储每个线程函数的状态,例如是否中断,以及线程状态等
const threadMap = {};
class ThreadPool {
// 模拟线程中断
interrupt(threadName) { }
// 模拟线程同步
join(threadName, targetThread) { }
// 模拟线程休眠
sleep(ms) { }
};
function submit(func, name) {
if (!func instanceof Function) return;
// 方式1:传入一个具名函数;方式2:传入第二个参数,即线程命名空间
const threadName = func.name || name;
// threadMap负责存储线程状态数据
threadMap[threadName] = { state: RUNNABLE, isInterrupted: false };
// 让func异步调用,同时将传入函数的作用域绑定为 ThreadPool原型
Promise.resolve({
then: func.bind(ThreadPool.prototype);
})
}
首先,我们做了三件事情:
submit(async function example() {
this.interrupt();
});
但问题在于:现在因为所有的函数通过this调用的都是ThreadPool原型中的方法,我们要在调用唯一的interrupt方法,需要在异步函数中传入"线程"标识,如线程名。这显然不方便,也不优雅,例如下面的命名为example的线程函数
submit(async function example() {
this.interrupt('example');
});
使用这个模块用户会感到奇怪:我明明在example函数中,为什么还要给调用方法传example这个名字参数??难道不能在模块内部把这事情干了吗?
对!我们下面做的就是这件事情,我们编写一个delegateThreadPool方法,由它为ThreadPool代理处理不同“线程“函数的函数名
// 返回代理后的ThreadPool
function delegateThreadPool(threadName) { // threadName为待定的线程名,在submit方法调用时候传入
// 代理后的ThreadPool
const proxyClass = {};
// 获取ThreadPool原来的所有的方法,赋给props数组
var props = Object.getOwnPropertyNames(ThreadPool.prototype);
for (let prop of props) {
// 代理ThreadPool,为其所有方法增加threadName这个参数
let fnName = prop;
proxyClass[fnName] = (...args) => {
const fn = baseClass[fnName];
return fn(threadName, ...args);
};
}
return proxyClass;
}
function submit(func, name) {
// 省略其他代码 。。。
const proxyScope = delegateThreadPool(threadName);
// 让func异步调用,不阻塞主线程,同时实现并发
Promise.resolve({
then: function () {
// 给func绑定this为代理后的ThreadPool对象,以便调用方法
func.call(proxyScope);
}
});
}
// 调用this.sleep方法时,已经无需增加函数命名作为参数了
submit(async function example() {
this.interrupt();
});
也就是说,我们的线程函数func绑定的已经不是ThreadPool.prototype了,而是delegateThreadPool处理后返回的对象:proxyScope。这时候,我们在“线程”函数体里调用this.interrupt方法时,已经无需增加函数命名作为参数了,因为这个工作,proxyScope对象帮我们做了,其实它的工作很简单——就是它的每个函数,都在一个返回的闭包里面调用ThreadPool的同名函数,并传递线程名作为第一个参数。
作用:线程休眠
sleep方法很简单,无非就是返回一个Promise实例,在Promise的函数里面调setTimeOut,等时间到了执行resolve函数,这段时间里修饰Promise的await语句会阻塞一段时间,resolve后又await语句又继续向下执行了,能满足我们想要的休眠效果
// 模拟“线程”休眠
sleep(ms) {
return new Promise(function (resolve) {
setTimeout(resolve, ms);
})
}
// 提交“线程”
submit(async function example() {
// 阻塞停留3秒,然后才输出1
await this.sleep(3000);
console.log(1);
});
作用:线程中断,可用于处理线程停止等操作
这里要先介绍一下Java里面的interrupt方法:在JAVA里,你不能通过调用terminate方法停掉一个线程,因为这有可能会因为处理逻辑突然中断而导致数据不一致的问题,所以要通过interrupt方法把一个中断标志位置为true,然后通过isInterrupted方法作为判断条件跳出关键代码。
所以为了模拟,我在JS中处理“线程”中断也是这么去做的,但是我们这样做的根本原因是:我们压根没有可以停掉一个线程函数的方法!(JAVA是有但是不准用,即废弃了而已)
// 模拟线程中断
interrupt(threadName) {
if (!threadName) { throw new Error('Miss function parameters') }
if (threadMap[threadName]) {
threadMap[threadName].isInterrupted = true;
}
}
// 获取线程中断状态
isInterrupted(threadName) {
if (!threadName) { throw new Error('Miss function parameters') }
// !!的作用是:将undefined转为false
return !!threadMap[threadName].isInterrupted;
}
A4. join方法
join(threadName): "线程"同步,调用此方法的"线程"函数将在threadName执行结束后继续执行
join方法和上面的sleep方法是一样的道理,我们让它返回一个Promise,只要我们不调resolve,那么外部修饰Promise的await语句就会一直暂停,等到join的那个另一个线程执行完了,我们看准时机!把这个Promise给resolve,这时候外部修饰Promise的await语句不就又可以向下执行了吗?
但问题在于:我们如何实现这个“一个函数执行完通知另一个函数的功能呢”?没错!那就是我们JavaScript最喜欢的套路: 事件流! 我们下面使用event-emitter这个前后端通用的模块实现事件流。
我们只要在任何一个函数结束的时候触发结束事件(join-finished),同时传递该线程的函数名作为参数,然后在join方法内部监听该事件,并在响应时候调用resolve方法不就可以了嘛。
首先是在join方法内部监听线程函数的结束事件
import ee from 'event-emitter';
const emitter = ee();
// 模拟线程同步
join(threadName, targetThread) {
return new Promise((resolve) => {
// 监听其他线程函数的结束事件
emitter.on('join-finished', (finishThread) => {
// 根据结束线程的线程名finishThread做判断
if (finishThread === targetThread) {
resolve();
}
})
})
}
同时在线程函数执行结束时触发join-finished事件,传递线程名做参数
import ee from 'event-emitter';
const emitter = ee();
function submit(func, name) {
// ...
Promise.resolve({
then: func().then(() => {
emitter.emit('join-finished', threadName);
})
});
}
使用如下:
submit(async function thread1 () {
this.join('thread2');
console.log(1);
});
submit(async function thread2 () {
this.sleep(3000);
console.log(2)
})
// 3s后,依次输出 2 1
我们主要是要编写两个方法:lock和unlock方法。我们需要设置一个Boolean属性isLock
// 这是一个非公平锁
class Lock {
constructor() {
this.isLock = false;
}
//加锁
lock() {
if (this.isLock) {
const self = this;
// 循环while死循环,不停测试isLock是否等于false
return new Promise((resolve) => {
(function recursion() {
if (!self.isLock) {
// 占用锁
self.isLock = true;
// 使外部await语句继续往下执行
resolve();
return;
}
setTimeout(recursion, 100);
})();
});
} else {
this.isLock = true;
return Promise.resolve();
}
}
// 解锁
unLock() {
this.isLock = false;
}
}
const lockObj = new Lock();
export default lockObj;
运行示例如下:
async function commonCode() {
await Lock.lock();
await Executor.sleep(3000);
Lock.unLock();
}
submit(async function example1() {
console.log('example1 start')
await commonCode();
console.log('example1 end')
});
submit(async function example2() {
console.log('example2 start')
await commonCode();
console.log('example2 end')
});
输出
// 立即输出
example1 start
example2 start
// 3秒后输出
example1 end
// 再3秒后输出
example2 end
对不起!写到这里,我实在是口干舌燥,写不下去了,但是道理和前面是一样的:
无非是:事件监听 + Promise + Async函数组合拳,一套搞定
import ee from 'event-emitter';
const ev = ee();
class Condition {
constructor() {
this.n = 0;
this.list = [];
}
// 当不满足条件时,让线程处于等待状态
wait() {
return new Promise((resolve) => {
const eventName = `notify-${this.n}`;
this.n++;
const list = this.list;
list.push(eventName);
ev.on(eventName, () => {
// 从列表中删除事件名
const i = list.indexOf(eventName);
list.splice(i, 1);
// 让外部函数恢复执行
debugger;
resolve();
})
})
}
// 选择一个线程唤醒
notify() {
const list = this.list;
let i = Math.random() * (this.list.length - 1);
i = Math.floor(i);
ev.emit(list[i])
}
}
测试代码
async function testCode() {
console.log('i will be wait');
if (true) {
await Condition.wait();
};
console.log('i was notified ');
}
submit(async function example() {
testCode();
setTimeout(() => {
Condition.notify();
}, 3000);
});
输出
i will be wait
// 3秒后输出
i was notified
其实说到底,我想和大家分享的不是什么并发啊,什么多线程啦。
其实我想表达的是:事件监听 + Promise + Async函数这套组合拳很好用啊
本文完,下面是全部项目代码(刚写了文章才发现有bug,待会改改)