
什么是 Subscription? Subscription 是一个表示一次性资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法 unsubscribe,不接受任何参数,只是释放 Subcription 持有的资源。在之前的 RxJS 中,Subscription 被称为 Disposable。
import { interval } from 'rxjs';
const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Subscription本质上只有一个unsubscribe()函数来释放资源或取消Observable执行。
Subscription 也可以放在一起,这样调用一个 Subscription 的 unsubscribe() 能取消多个 Subscription。
import { interval } from 'rxjs';
const observable1 = interval(400);
const observable2 = interval(300);
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubsciption = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubsciption);
setTimeout(() => {
subscription.unsubscribe();
}, 1000);
// second: 0
// first: 0
// second: 1
// first: 1
// second: 2
Subscription 还有个 remove(otherSubscription) 方法,用于撤销添加到 Subscription 的子 Subscription。
什么是 Subject ? RxJS 中的 Subject 是一种特殊类型的 Observable,它允许将值多播到多个 Observer。虽然普通的 Observable 是单播的(每个订阅的 Observer 都拥有 Observable 的独立执行),但 Subject 可以多播。
Subject类似Observable,但是它可以多播给多个Observer。Subject有点像EventEmitter:他们都维护多个监听这的注册。
每个 Subject 都是一个 Observable。 给定一个 Subject,可以订阅它,使用 Observer 开始正常接收值。从 Observer 角度来看,它无法判断 Observable 的执行时来自普通的单播 Observable 还是 Subject。
在 Subject 内部,订阅不会调用传递至的新执行。它只是在一个 Observer 列表中注册给定的 Observer,类似于其他库或语言中 addListener 的工作方式。
每个 Subject 都是一个 Observer。 它是一个对象,有 next(v),error(e) 和 complete() 方法。要为 Subject 提供一个新值,只需调用 next(v),它将被多播到注册监听 Subject 的 Observer。
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2因为 Subject 是一个 Observer ,也就是说可以使用 Subject 作为参数来订阅任何 Observable。
import { Subject, from } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject);
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
通过上面的方法,我们基本上只是通过 Subject 将单播 Observable 执行转换为多播。这是 Subject 如何使任何 Observable 执行共享给多个 Observer 的唯一方法。
也有一些特殊的 Subject:BehaviorSubject,ReplaySubject 和 AsyncSubject。
“多播 Observable” 通过可能有许多订阅者的 Subject 传递通知,而普通的 “单播 Observable” 仅向单个 Observer 发送通知。
多播的
Observable在底层使用Subject来让多个Observer看到相同的Observable执行。
多播操作符底层工作原理:Observer 订阅底层 Subject,Subject 订阅源 Observable。
import { from, Subject, multicast } from 'rxjs';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
multicasted.connect();
multicast 返回一个看起来像正常 Observable 的 Observable,但是它在订阅时像 Subject 一样。multicast 返回一个 ConnectableObservable,它是个有 connect() 方法的 Observable。
connect() 方法决定共享的 Observable 具体什么时候开始执行。connect() 本质上是执行 source.subscribe(subject),coonect() 返回一个 Subscription,它可以用来取消订阅。
BehaviorSubject 是 Subject 的变体之一,具有“当前值”的概念。它存储发送给其消费者最新的值,并且每当有新的 Observer 订阅时,它将立即接收来自 BehaviorSubject 的 “当前值”。
BehaviorSubject对于表示 “随时间变化的值” 很有用。如,生日的事件流是一个Subject,但一个人的年龄是BehaviorSubject。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 初始值为 0
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject 和 BehaviorSubject 类似,但它可以给新的订阅者发送旧的值,可以记录 Observable 执行。
ReplaySubject记录Observable执行的一些值,并对新的订阅者进行重放。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 缓存 3 个值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
除了缓冲个数外,还可以定义毫秒级的窗口时间,来决定缓存记录可以保留多久。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500); // 缓存 100 个值,每 500 毫秒清除一次
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
const interval = setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject 也是一种变体,它只将 Observable 执行的最后一个值发送给它的观察者,并且仅在执行完成时发送。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
// observerA: 5
// observerB: 5
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: () => console.log('One second has passed')
});
setTimeout(() => {
subject.next();
}, 1000);
// One second has passed