
什么是 Scheduler ? scheduler 控制 subscription 什么时候开始和通知什么时候派发。
scheduler 是一个数据结构,知道如何根据优先级或其他标准对任务进行存储和排序;scheduler 是一个执行上下文,表示任务在何时何地执行(如立即执行、或在另一个回调机制中,如 setTimeout 或 precess.nextTick 或动画帧 );scheduler 有一个时钟,通过 scheduler 的 now() 方法提供了“时间”的概念,在特定调度程序上调度的任务将仅遵守该时钟指示的时间;
Scheduler支持开发者定义Observable将在什么执行上下文中向其Observer传递通知。
import { Observable, observeOn, asyncScheduler } from 'rxjs';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(observeOn(asyncScheduler));
console.log('just before subscribe');
observable.subscribe(
(x) => console.log('got value ' + x),
(err) => console.error('something wrong occurred: ' + err),
() => console.log('done')
);
console.log('just after subscribe');
// just before subscribe
// just after subscribe
// got value 1
// got value 2
// got value 3
// done
observeOn(asyncScheduler) 在新的 Observable 和最终的 Observer 之间引入了一个代理 Observer。
import { Observable, observeOn, asyncScheduler } from 'rxjs';
const observable = new Observable((proxyObserver) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);
const finalObserver = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
// just after subscription
// just before subscribe
// got value 1
// got value 2
// got value 3
// done
proxyObserver 是在 observeOn(asyncScheduler) 中创建的,它的 next(val) 函数可以理解成:
const proxyObserver = {
next (val) {
asyncScheduler.subscribe(
(x) => finalObserver.next(x),
0, // delay
val // value
);
}
};
async Scheduler 使用 setTimeout 或 setInterval 运行,及时给定的延迟为 0。在 JavaScript 中,setTimeout(fn, 0) 在下次事件循环迭代中最早运行函数 fn。
Scheduler 的 schedule() 方法接受一个延迟参数,它指的是相对于 Scheduler 自己的内部时钟的时间量。Scheduler 的时钟不需要与实际的时间有关,就像延迟操作的时间不是在实际时间上运行的,而是在 Scheduler 的时钟上运行的。这在测试中特别有用,其中可以使用虚拟时间 Scheduler 来伪造现实时间,而实际上是同步执行计划任务。
async Scheduler 是 RxJS 内置的 scheduler 之一。其他一些 scheduler 都可以通过使用 Scheduler 对象的静态属性来创建。
SCHEDULER | PURPOSE |
|---|---|
null | 不传入任何 scheduler 时,通知以同步和递归方式传递。这用于恒定时间操作或尾递归操作 |
queueScheduler | 在当前事件框架中的队列上调度,用于迭代操作 |
asapScheduler | 在微任务队列进行调度,就是 Promise 使用的队列。通常是当前工作结束后,下个工作开始前。用于异步转换 |
asyncScheduler | 使用 setInterval 完成调度,用于基于时间的操作 |
animationFrameScheduler | 调度将在下一次浏览器内容重绘之前发生的任务。可用于创建流畅的浏览器动画 |
你可能已经在 RxJS 代码中使用了调度器,而没有明确说明要使用的调度器的类型。这是因为所有处理并发的 Observable 操作符都有可选的调度器。如果你没有提供调度器,RxJS 会根据最小并发的原则选择一个默认的调度器。也就是说会选择引入满足 operator 需求的最少并发的调度器。
如,对于返回有限或少量信息 observable 的 operator , RxJS 不使用 Scheduler,即 null 或 undefined。对于返回可能大量或无限数量的消息的 operator ,RxJS 会使用 queueScheduler。对于使用计时器的 operator , RxJS 会使用 asyncScheduler。
因为 RxJS 使用最小并发量的 scheduler,所以如果出于性能目的引入并发,可以选择一个不同的 scheduler。要指定特定的 scheduler,可以使用采用 scheduler 的 operator 方法。如,from([10, 20, 30], asyncScheduler)。
静态创建操作符通常以 Scheduler 作为参数。 如,from(array, scheduler) 允许你指定在传递从数组转换的每个通知时要使用的调度程序。它通常是操作符的最后一个参数,下面静态创建操作符接受 Scheduler 的参数:
bindCallbackbindNodeCallbackconbineLatestconcatemptyfromfromPromiseintervalmergeofrangethrowtimer使用 subscribeOn 来调度 subscribe() 调用将来在什么上下文中发生。 默认,对 Observable 的 subscribe() 调用将同步并立即发生。不过,可以使用实例运算符 subscribeOn(scheduler) 延迟或安排在给定 Scheduler 上发生的实际订阅,其中 scheduler 是你提供的参数。
使用 observeOn 来处理在什么上下文中发送通知。 就像上面例子中的,实例操作符 observeOn(scheduler) 在源 Observable 和目标 Observer 之间引入了一个中介 Observer,其中中介使用给定的 scheduler 调度对目标 Observer 的调用。
实例操作符可以将 Scheduler 作为参数。
时间相关的操作符,如 bufferTime、debounceTime、delay、auditTime、sampleTime、throttleTime、timeInterval、timeout、timeoutWith、windowTime 都会接受一个 Scheduler 作为最后一个参数,否则默认在 asyncScheduler 上运行。
其他将 Scheduler 作为参数的实例操作符:cache、combineLatest、concat、expand、merge、publishReplay、startWith。