
功能需求 | 适用的操作符 |
|---|---|
过滤掉不满足判定条件的数据 | filter |
获得满足判定条件的第一个数据 | first |
获得满足判定条件的最后一个数据 | last |
从数据流中选取最先出现的若干个数据 | take |
从数据流中选取最后出现的若干个数据 | takeLast |
从数据流中选取数据直到某种情况发生 | takeWhile 和 takeUntil |
从数据流中中忽略最先出现的若干数据 | skip |
基于时间的数据流量筛选 | throttleTime 、debounceTime 和 auditTime |
基于数据内容的数据流量筛选 | throttle 、debounce 和 audit |
基于采样方式的数据流量筛选 | sample 和 sampleTime |
删除重复的数据 | distinct |
删除重复的连续数据 | distinctUntilChanged 和 distinctUntilKeyChanged |
忽略数据流中的所有数据 | ignoreElements |
只选取指定出现位置的数据 | elementAt |
判断是否只有一个数据满足判定条件 | single |
过滤类操作符最基本的功能就是对一个给定的数据流中每个数据判断是否满足某个条件,如果满足条件就可以传递给下游,否则就抛弃掉。
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/filter';
const source$ = Observable.range(1, 5);
const even$ = source$.filter(x => x % 2 === 0);
even$.subscribe(x => console.log(x));
// 2
// 4
使用 filter 产生的 Observable 对象,产生数据的时机和上游是一致的,当上游产生数据的时候,只要这个数据满足判定条件,就会立刻被同步传给下游。
const source$ = Observable.interval(1000);
const even$ = source$.filter(x => x % 2 === 0);

当使用 first 不给任何判定函数时,就相当于找上游 Observable 吐出的第一个数据:
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/first';
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const first$ = source$.first();
// 3
如果修改上面的代码,给 first 一个判定函数参数,得到的结果就会不一样:
const first$ = source$.first(x => x % 2 == 0);
// 4
first 的第二个参数是可选参数,如果使用,发挥的就是“结果选择器”的作用。
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const first$ = source$.first(
x => x % 2 == 0,
(value, index) => [value, index]
);
// [4, 2]如果 first 的上游 Observable 到完结时依然没有满足判定条件的数据,那么 first 会向下游抛出一个 error ;而 find 与 findIndex 没有匹配的数据就会吐出一个 undefined 。
不过,也可以设置默认值:
const first$ = source$.first(
x => x < 0,
f => f,
-1
);last 这个操作符做的事情和 first 正相反,找的是一个 Observable 中最后一个判定条件的数据。
const source$ = Observable.of(3, 1, 4, 1, 5, 9, 2, 6);
const last$ = source$.last();
//和 first 不同的是, last 无论如何都要等到上游 Observable 完结的时候才吐出数据,因为上游 Observable 完结之前, last 也无从知道是不是拿到了“最后一个”数据。
take 只支持一个参数 count ,也就是限定拿上游 Observable 的数据数量。
const source$ = Observable.interval(1000);
const last$ = source$.take(3);
takeLasttake 相当于一个可以获取多个数据的 first ,那么 takeLast 相当于一个可以获取多个数据的 last 。
const source$ = Observable.of(3, 1, 4, 1, 5, 9);
const last3$ = source$.takeLast(3);
// 1
// 5
// 9如果上游在一段时间范围内产生的数据,那么就必须要等到上游完结 takeLast 产生的 Observable 对象才产生数据:
const source$ = Observable.interval(1000);
const take$ = source$.take(5);
const last3$ = take$.takeLast(3);
take 的作用是获取上游的数据,只要没有超过给定的数量限制,上游产生一个数据,take 都会立刻转手给下游。所以,弹珠图上 take 产生的 Observable 对象数据产生时刻和 source$ 是一致的; takeLast 只有确定上游数据完结的时候才能产生数据,而且是一次性产生所有数据,即 takeLast 在 take 产生的 Observable 对象完结时把 2、3、4 数据一次性传给下游。
takeWhiletakeWhile 接受一个判定函数作为参数,这个判定函数有两个参数,分别代表上游的数据和对应的序号, takeWhile 会吐出上游数据,直到判定函数返回 false ,只要遇到第一个判定函数返回 false 的情况, takeWhile 产生的 Observable 就完结。
const source$ = Observable.range(1, 100);
const takeWhile$ = source$.takeWhile(x => x % 2 === 0);在上面的例子中,takeWhile$ 一个数据都不吐出就完结,因为上游 source$ 吐出的第一个数据是1,不满足判定条件。
因为 takeWhile 的判定函数支持第二个序号参数,所以实际上可以利用 takeWhile 来实现 take:
Observable.prototype.take = function (count) {
return this.takeWhile((x, i) => i < count);
};take 和 filter 的组合
如果想要获得上游 Observable 满足条件的前 N 个数据,怎么办呢?
Observable.prototype.takeCountWhile = function (count, predicate) {
return this.filter(predicate).take(count);
};
注意,filter 并不是抽干上游 Observable 才传递数据给 take ,而是对上游每个数据都在用 predicate 判定通过之后,立刻传递给 take 。
const source$ = Observable.interval(1000);
const even$ = source$.takeCountWhile(2, x => x % 2 === 0);
takeUntiltakeUntil 是一个里程碑式的过滤类操作符,因为 takeUntil 让我们可以用 Observable 对象来控制另一个 Observable 对象的数据产生。
takeUntil 的神奇特点就是其参数是另一个 Observable 对象 notifier,由这个 notifier 来控制什么时候结束从上游 Observable 拿数据,因为 notifier 本身又是一个 Observable ,吐出数据可以非常灵活,这就意味着可以利用非常灵活的规则用 takeUntil 产生下游 Observable 。
使用 takeUntil ,上游的数据直接转手给下游,直到(Until)参数 notifier 吐出一个数据或者完结,这个 notifier 就像一个水龙头开关,控制着 takeUntil 产生的 Observable 对象,一开始这个水龙头开关是打开状态,上游的数据像水一样直接流到下游,但是 notifier 只要一有动静,水龙头开关立刻关闭,上游通往下游的通道也就关闭了。
const source$ = Observable.interval(1000);
const notifier$ = Observable.timer(2500);
const takeUntil$ = source$.takeUntil(notifier$);
skipskip 接受一个 count 参数,会默默忽略上游 Observable 吐出的前 count 个数据,然后,从第 count+1 个数据开始,就和上游 Observable 保持一致了,上游 Observable 吐出什么数据, skip 产生的 Observable 就吐出什么数据,上游 Observable 完结, skip 产生的 Observable 跟着完结。当然,如果上游吐出的数据不够 count 个,那 skip 产生的 Observable 就会在上游 Observable 完结的时候立刻完结。
const source$ = Observable.interval(1000);
const skip$ = source$.skip(3);
skipWhile 和 skipUntil“回压”(Back Pressure)也称为“背压”,是一个源自于传统工程中的概念,在一个传输管道中,液体或者气体应该朝某一个方向流动,但是前方管道口径变小,这时候液体或者气体就会在管道中淤积,产生一个和流动方向相反的压力,因为这个压力的方向是往回走的,所以称为回压。
在 RxJS 的世界中,数据管道就像是现实世界中的管道,数据就像是现实中的液体或者气体,如果数据管道中某一个环节处理数据的速度跟不上数据涌入的速度,上游无法把数据推送给下游,就会在缓冲区中积压数据,这就相当于对上游施加了压力,这就是 RxJS 世界中的“回压”。
回压这种现象的根源是数据管道中某个环节数据涌入的速度超过了处理速度,那么,既然处理不过来,干脆就舍弃掉一些涌入的数据,这种方式称为“有损回压控制”(Lossy Backpressure Control),通过损失掉一些数据让流入和处理的速度平衡。
throttleTime 的作用是限制在 duration 时间范围内,从上游传递给下游数据的个数; debounceTime 的作用是让传递给下游的数据间隔不能小于给定的时间 dueTime 。
const source$ = Observable.interval(1000);
const result$ = source$.throttleTime(2000);
result$.subscribe(console.log);
// 0
// 2
// 4
const source$ = Observable.interval(1000);
const result$ = source$.debounceTime(2000);
因为 debounceTime 要等上游在 dueTime 毫秒范围内不产生任何其他数据时才把这个数据传递给下游,如果在 dueTime 范围内上游产生了新的数据,那么 debounceTime 就又要重新开始计时。
const source$ = Observable.interval(1000);
const filter$ = source$.filter(x => x % 3 === 0);
const result$ = filter$.debounceTime(2000);
使用 throttleTime 和 debounceTime 的一个常见场景就是用来减少不必要的 DOM 事件处理。
当数据流中可能有大量数据产生,希望一段时间内爆发的数据只有一个能够被处理到,这时候就应该使用 throttleTime 。
对于 debounceTime ,适用情况是,只要数据在以很快的速度持续产生时,那就不去处理它们,直到产生数据的速度降下来。
throttle 和 debounce 和不带 Time 后缀的兄弟操作符的区别是,这两个操作符不是用时间来控制流量,而是用 Observable 中的数据来控制流量。
throttle 的参数是一个函数,这个函数应该返回一个 Observable 对象,这个 Observable 对象可以决定 throttle 如何控制上游和下游之间的流量。
const source$ = Observable.interval(1000);
const durationSelector = (value) => {
console.log(`call durationSelector with ${value}`);
return Observable.timer(2000);
};
const result$ = source$.throttle(durationSelector);
result$.subscribe(console.log);
// call durationSelector with 0
// 0
// call durationSelector with 2
// 2
// call durationSelector with 4
// 4
当 source$ 产生第一个数据 0 的时候, throttle 就和 throttleTime 一样,毫不犹豫地把这个数据 0 传给了下游,在此之前会用这个数据 0 作为参数调用 durationSelector ,然后订阅 durationSelector 返回的 Observable 对象,在这个 Observable 对象产生第一个对象之前,所有上游传过来的数据都会被丢弃,于是, source$ 产生的数据 1 就被丢弃了,因为 durationSelector 返回的 Observable 对象被订阅之后 2000 毫秒才会产生数据。
durationSelector 产生 Observable 对象只有第一个产生的数据会有作用,而且这个数据的产生时机是关键,至于这个数据是个什么值反而不重要,在上面的例子中,使用 timer 来产生只有一个数据的 Observable 对象,当然也可以使用 interval 来产生多个数据的 Observable 对象,但是依然只有第一个数据起到作用。
如果 durationSelector 只是返回固定延时产生数据的 Observable 对象,那么 throttle 的功能就和 throttleTime 没有两样,不过, durationSelector 有参数,就是当前传给下游的数据,所以完全可以根据这个参数来产生更灵活的操作。
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
对于 debounce ,和 debounceTime 相比一样是用一个函数参数代替了数值参数,这样就可以产生更灵活的时间控制。
const source$ = Observable.interval(1000);
const durationSelector = value => {
return Observable.timer(value % 3 === 0 ? 2000 : 1000);
};
const result$ = source$.debounce(durationSelector);
durationSelector 函数返回的 Observable 第一个数据产生时间延迟取代了 debounceTime 的 dueTime 参数,决定了上游一个数据会被延迟多久传给下游,因为 3 的倍数延时 2000 毫秒,总是会被下一个数据“打断”,所以3的倍数总是进入不了下游。
可以认为 audit 是做 throttle 类似的工作,不同的是在“节流时间”范围内, throttle 把第一个数据传给下游, audit 是把最后一个数据传给下游。
const source$ = Observable.interval(1000);
const result$ = source$.auditTime(2000);
result$.subscribe(console.log);
// 1
// 3
// 5
audit 也接受 durationSelector 这样的函数参数:
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(
Observable.interval(1000).take(3).mapTo('B')
)
.concat(
Observable.interval(500).take(3).mapTo('C')
);
const durationSelector = value => {
return Observable.timer(800);
};
const result$ = source$.audit(durationSelector);

sample 是要根据规则在一个范围内取一个数据,抛弃其他数据。
const source$ = Observable.interval(500).take(2).mapTo('A')
.concat(
Observable.interval(1000).take(3).mapTo('B')
)
.concat(
Observable.interval(500).take(3).mapTo('C')
);
const result$ = source$.sampleTime(800);
表面上看 sampleTime 和 auditTime 非常像, auditTime 也会把时间块中最后一个数据推给下游,但是对于 auditTime 时间块的开始是由上游产生数据触发的,而 sampleTime 的时间块开始则和上游数据完全无关,所以,可以看到 sampleTime 产生的数据序列分布十分均匀。
注意,如果 sampleTime 发现一个时间块内上游没有产生数据,那在时间块结尾也不会给下游传递数据。
sample 的参数并不是一个返回 Observable 对象的函数,而就是一个简单的 Observable 对象。 sample 之所以这样设计,是因为对于“采样”这个动作,逻辑上可以认为和上游产生什么数据没有任何关系,所以不需要一个函数来根据数据产生 Observable 对象控制节奏,直接提供一个 Observable 对象就足够了。
通常 sample 的参数被称为 notifier ,当 notifier 产生一个数据的时候, sample 就从上游拿最后一个产生的数据传给下游。
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinct();
distinct$.subscribe(console.log);
// 0
// 1
// 2
// 3distinct 提供了一个函数参数 keySelector ,用于定制 distinct 应该比对什么样的属性。
distinct 还有一个潜在的问题需要注意,如果上游产生的不同数据很多,那么可能会造成内存泄露。为了克服这个缺点,distinct 还提供第二个可选的参数 flush ,第二个参数可以是一个 Observable 对象,每当这个 Observable 对象产生数据时,distinct 就清空“唯一数据集合”,一切重来,这样就避免了内存泄露。
distinctUntilChanged 拿到一个数据不是和一个“唯一数据集合”比较,而是直接和上一个数据比较,也就是说,这个操作符要保存上游产生的上一个数据就足够,当然,也就没有了 distinct 潜在的内存泄露问题。
const source$ = Observable.of(0, 1, 1, 2, 0, 0, 1, 3, 3);
const distinct$ = source$.distinctUntilChanged();
distinct$.subscribe(console.log);
// 0
// 1
// 2
// 0
// 1
// 3ignoreElments 就是要忽略所有的元素,这里的元素是指上游产生的数据,忽略所有上游数据,只关心 complete 和 error 事件。
const source$ = Observable.interval(1000).take(5);
const result$ = source$.ignoreElements();elementAt 把上游数据当数组,只获取指定下标的那一个数据,就这个简单功能,使用 first 配合函数参数也一样能够实现。不过 elementAt 还有一个附加功能体现了自己的存在价值,它的第二个参数可以指定没有对应下标数据时的默认值。
single 这个操作符用来检查上游是否只有一个满足对应条件的数据,如果答案为“是”,就向下游传递这个数据;如果答案为“否”,就向下游传递一个异常。