
Observable 是多个值的惰性 Push 集合。他填补了下表中的缺失点:
SINGLE | MULTIPLEXED | |
|---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
如,下面是一个 Observable,它在订阅时立即(同步)推送值 1、2、3,并且从 subscribe 调用开始后过 1 s 再推送值 4,然后结束。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
要调用 Observable 并查看这些值,我们需要订阅它:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('Before subscribe');
observable.subscribe({
next(x) { console.log('Next: ' + x); },
error(err) { console.error('Error: ' + err); },
complete() { console.log('Complete'); }
});
console.log('After subscribe');
// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete
Pull 和 Push 是两种不同的协议,描述了数据生产者和数据消费者如何进通信。
什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。
每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull 出单个返回值来使用它。
ES 2015 中介绍了生成器函数和迭代器 (opens new window)(function *),也属于 Pull 系统。调用 iterator.next() 的代码是消费者,从迭代器(生产者)中拉出多个值。
PRODUCER | CONSUMER | |
|---|---|---|
Pull | Passive:produces data when requested | Active:decides when data is requested |
Push | Active:produces data at its own pace | Passive:reacts to received data |
什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。
Promise 是目前 JavaScript 中最常见的 Push 系统类型。Promise (生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。
RxJS 引入了 Observable,一个新的 JavaScript Push 系统。Observable 是一个多值生产者,推送数据给 Observer(消费者)。
Observable 不像 EventEmitter 也不像 Promise 用于多个值。在一些情况下 Observable 会表现地像 EventEmitter,如当使用 RxJS 的 Subject 进行多播时,但通常它们的行为不像 EventEmitter。
Observable 类似于零参数的函数,但将它们泛化为允许多个值。
function foo () {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42
const y = foo.call(); // same as foo()
console.log(y);
// Hello
// 42
使用 Observable 改写上面的代码:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe(x => {
console.log(x);
});
// Hello
// 42
foo.subscribe(y => {
console.log(y);
});
// Hello
// 42因为 函数 和 Observable 都是惰性计算。如果你不调用函数,console.log('Hello') 就不会被执行。同样对于 Observable,如果你不“调用”它(使用 subscribe), console.log('Hello') 也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。和 EventEmitter 共享副作用并且无论订阅者是否存在都立即触发相反,Observable 没有共享执行并且是惰性计算。
订阅一个 Observable 就是调用一个函数。
部分人觉得 Observable 是异步的,这并不是真的。
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after
使用 Observable 会观察到和函数一样的输出:
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after
这说明,对 foo 的订阅完全是同步的,就像一个函数一样。
Observable既能同步也可以异步地传递值。
那 Observable 和函数之间的区别是什么?Observable 可以随着时间推移“返回”多个值,这是函数无法做到的。
function foo () {
console.log('Hello');
return 42;
return 100; // dead code, will never happen
}
函数只能返回一个值,而 Observable 可以返回多个值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
也可以异步地返回值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300);
}, 1000);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
// 300
结论:
func.call() 表示同步地返回一个值observable.subscribe() 表示同步或异步地返回 0 或多个值Observable 使用 new Observable 或一个创建操作符来 created,会被 Observer subscribed,execute 来向 Observer 传递 next / error / complete 通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable 实例中,当其中一些与其他类型相关,如 Observer 和 Subscription。
Observable 核心关注点:
Observable 构造函数接受一个参数:subscribe 函数
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
Observable 可以使用
new Observable来创建。通常,Observable使用创建函数如of、from、interval等来创建。
observable.subscribe(x => {
console.log(x);
});
这不是个巧合,observable.subscribe 和 new Observable(function subscribe(subscriber) {}) 的 subscribe 有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。
这表示订阅调用不会在同一个 Observable 的多个 Observer 之间共享。当使用 Observer 调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {}) 中的 subscribe 函数为给定的 subscriber 运行。对 observable.subscribe 的每次调用都会为给定的 subscriber 触发其对应的设置。
对于
Observable的订阅就像调用一个函数,提供了可以传递数据的回调。
这和 addEventListener / removeEventListener 等事件处理程序 API 完全不同。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护一个 Observer 列表。
订阅调用只是一种启动 Observable 执行并将值或时间传递给该执行的 Observer 的方法。
new Observable(function subscribe(subscriber) {}) 里面的代码表示 Observable 的执行,只发生在每个订阅的 Observer 上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。
Observable 执行可以传递的值类型:
Next 通知:发送一个值,如 Number、String、Object 等Error 通知:发送一个错误,如 ErrorComplete 通知:不发送值Next 通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。Error 和 Complete 通知在 Observable 执行过程中只可能执行一次,并且只能有一个发生。
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable 严格遵守协议,在 Complete 通知之后的 Next 通知将不会被发送:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered to subscribers
});
可以在 subscribe 代码外包一层 try/catch 块,以捕获错误:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err);
}
});
因为 Observable 执行可能是无限的,但是对于 Observer 来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer,一旦 Observer 接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。
当 observable.subscribe 被调用时,Observer 被附加到新创建的 Observable 执行中,该调用还会返回 Subscription 对象。
const subscription = observable.subscribe(x => console.log(x));
Subscription (opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。
import { from } from 'rxjs';
const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));
// Later
subscription.unsubscribe();当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源,如可以在函数 subscribe() 中返回自定义取消订阅函数来实现。
const observable = new Observable(function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
});就像 observable.subscribe 类似于 new Observable(function subscribe (subscriber) {}), 我们从 subscribe 返回的 unsubscribe 在概念上等同于 subscription.unsubscribe。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。
function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('completed')
});
// Later
unsubscribe();之所以使用像 Observable、Observer 和 Subscription 的 Rx 类型,是为了安全考虑和 Operator 的可组合性。
什么是 Observer? Observer 作为消费者消费 Observable 派发的值。Observer 只是一组回调,用于 Observable 派发的每种类型的通知:next, error 和 complete。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`),
complete: () => console.log('Observer got a complete notification')
};
// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);
Observer只是有三个回调的对象,用于Observable可能派发每种类型的通知。
RxJS 中的 Observer 也可能是部分的。如果没有提供某种回调,Observable 也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer 中找不到对应的回调。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`)
};
在订阅 Observable 时,也可以不用将回调放在一个 Observer 对象中,只传一个 next 回调函数作为参数就可以。
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));
在 observable.subscribe 内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer 对象。