在RxJS v5中没有实现pausable
运算符的情况下,有没有更好的方法来创建可暂停的间隔?下面的代码是有效的,但它是通过跟踪上次发出的值作为偏移量来实现的。看起来应该有更好的方法。
const source = Rx.Observable.interval(100).share()
const offset = new Rx.BehaviorSubject(0)
let subscription;
let currentValue;
function start() {
subscription = source
.subscribe(i => {
currentValue = i + offset.value
})
}
function pause() {
source.take(1).subscribe(i => offset.next(i + offset.value))
subscription.unsubscribe()
}
发布于 2017-09-21 03:20:36
share()
运算符是an alias for .publish().refCount()
。refCount()
意味着当其他订阅者不存在时,可观察对象将自我清理。因为你正在取消对源代码的订阅,所以它会自动清理,然后在subscribe
d to时重新启动。请改用带有connect()
的publish()
。代码如下:
const source = Observable.interval(100).publish();
source.connect();
// Start with false, change to true after 200ms, then false again
// after another 200ms
const pauser = Observable.timer(200)
.mapTo(true)
.concat(Observable.timer(200).mapTo(false))
.startWith(false);
const pausable = pauser
.switchMap(paused => (paused ? Observable.never() : source))
.take(10);
pausable.subscribe(x => console.log(x));
有关运行示例,请参阅此jsbin:http://jsbin.com/jomusiy/3/edit?js,console。
发布于 2017-09-21 05:44:52
没有通用的方法可以做到这一点。这取决于你所说的暂停到底是什么意思,也取决于你暂停的是什么。(您是想停止发射然后重新开始,是缓冲和排出,还是需要有效地向上游添加延迟,但在其他情况下保留上游值的分布?)
当上游是一个特别的计时器时,我有一种有效的方式来做这件事,就像在你的例子中一样。这是我自己问题的答案。
RxJS (5.0rc4): Pause and resume an interval timer
这个方法有一个很大的优势,它可以及时保存源的值分布,但只会增加一个延迟。
对于更一般的情况:
冷观测的
never
和上游之间的switch
。暂停时,从上游取消订阅。skip
你已经看到的,然后switch
到skip
ed流。在未暂停时,您必须对已发出的值进行计数,以便下次有人取消暂停时可以skip
这么多值。你只需要记住你以前见过多少。然而,每次取消暂停都会导致冷可观察对象从头开始重放。这在一般情况下可能是非常低效的。代码应该是这样的。这里的pauser
是一个主题,您可以将其设置为true或false以暂停上游。函数pausableCold(暂停,上游){ var seen = 0;return pauser.switch(paused => { if (paused) { return Observable.never();} else { return upstream.skip(seen).do(() => seen++);} });}
buffer
while暂停,然后在未暂停时排出并合并到热上游。(这会保留所有值,但不会保留它们在时间上的分布。此外,如果天气可能很冷,您应该使用publish
进行上游热操作。)
https://stackoverflow.com/questions/46311703
复制相似问题