首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >不浪费工作的开关

不浪费工作的开关
EN

Stack Overflow用户
提问于 2017-05-12 16:40:06
回答 1查看 122关注 0票数 1

我有一个可观测的可观测值,每个内部可观测值都产生一个昂贵的计算值。我想要像Switch这样的行为,但是没有浪费工作的地方(SwitchFrugal?):

  • 如果有订阅的当前内部可观测到的值,我希望一直从它接收值,直到它完成。
    • 它不应该从Switch中取消订阅

  • 一旦当前内部可观测性完成,应该订阅最新的内部可观测性(如果在当前之后有)。

我一直在努力实现这种行为。使用现有的操作人员是否可行?还是需要通过Observable.Create“从头开始”完成?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-05-13 08:20:12

我会这样做:

代码语言:javascript
运行
复制
const subject = new Subject();
const start = Scheduler.async.now();

// Simulated source Observable
const source = Observable
    .timer(0, 900)
    .map(i => Observable.defer(() => {
        console.log('Subscribe inner Observable:', i);
        return Observable
            .timer(0, 400)
            .take(6)
            .map(j => i.toString() + ':' + j.toString());
    }))
    .share();

source
    .merge(subject
        .withLatestFrom(source)
        .map(([v, observable]) => {
            return observable;
        })
    )
    .exhaustMap(obs => obs.finally(() => subject.next()))
    .subscribe(val => console.log(Scheduler.async.now() - start, val));

我在模拟一个可以观测到的源,发射出可观测的。外部观测物的释放速度要快于内部观测数据的完成,所以这应该能模拟你的情况。

然后,我将source合并到链中,但只有当subject发出时才会这样做。然后在finally操作符中触发主题。

产出如下:

代码语言:javascript
运行
复制
Subscribe inner Observable: 0
45 '0:0'
452 '0:1'
856 '0:2'
1260 '0:3'
1665 '0:4'
2065 '0:5'
Subscribe inner Observable: 2
2069 '2:0'
2472 '2:1'
2876 '2:2'
3280 '2:3'
3683 '2:4'
4084 '2:5'
Subscribe inner Observable: 4
4086 '4:0'
4487 '4:1'

请注意,当第一个内部可观测性完成时,来自源的最新发射是可以用i === 2观测到的。如果您运行这段代码,您将看到这三种排放之间没有时间间隔(jsbin现在被打破了,所以我无法登录并做一个演示):

代码语言:javascript
运行
复制
2065 '0:5'
Subscribe inner Observable: 2
2069 '2:0'

如果将这与不使用merge()的默认行为进行比较,您将看到exhaustMap需要等待,直到有另一个源发出:

代码语言:javascript
运行
复制
source
    .exhaustMap(obs => obs.finally(() => subject.next()))
    .subscribe(console.log);

这个打印了下面的内容。注意时间间隔,它订阅了内部可观测到的i === 3而不是2

代码语言:javascript
运行
复制
Subscribe inner Observable: 0
45 '0:0'
449 '0:1'
853 '0:2'
1257 '0:3'
1659 '0:4'
2064 '0:5'
Subscribe inner Observable: 3
2748 '3:0'
3151 '3:1'
3553 '3:2'
3953 '3:3'
4355 '3:4'
4759 '3:5'
Subscribe inner Observable: 6
5458 '6:0'
5863 '6:1'

编辑:

为了避免订阅相同的内部可观测两次(假设这些是冷可观测的),我可以跟踪我已经订阅的可观测指数以及接下来需要使用的索引:

我将使源以随机间隔和较少的值发出:

代码语言:javascript
运行
复制
const source = Observable.range(0, 100, Scheduler.async)
    .concatMap(i => Observable.of(i).delay(Math.random() * 3000))
    .map(i => Observable.defer(() => {
        console.log('Subscribe inner Observable:', i);
        return Observable
            .timer(0, 400)
            .take(4)
            .map(j => i.toString() + ':' + j.toString());
    }))
    .map((observable, index) => [observable, index])
    .share();

然后发送我们使用subject.next()处理的索引,然后简单地忽略我们不想要的可观测值:

代码语言:javascript
运行
复制
source
    .merge(subject
        .withLatestFrom(source)
        .map(([processedIndex, observableAndIndex]) => {
            let observableIndex = observableAndIndex[1];
            if (processedIndex < observableIndex) {
                return observableAndIndex;
            }
            return false;
        })
        .filter(Boolean)
    )
    .exhaustMap(([observable, index]) => observable.finally(() => subject.next(index)))
    .subscribe(val => console.log(Scheduler.async.now() - start, val));

输出非常相似,但即使之前的可观测结果很快完成,我们也不会再次订阅它(例如,可观测的12之间的时间差):

代码语言:javascript
运行
复制
Subscribe inner Observable: 0
2803 '0:0'
3208 '0:1'
3615 '0:2'
4016 '0:3'
Subscribe inner Observable: 1
4853 '1:0'
5254 '1:1'
5658 '1:2'
6061 '1:3'
Subscribe inner Observable: 2
7814 '2:0'
8218 '2:1'
8622 '2:2'
9026 '2:3'
Subscribe inner Observable: 3
9180 '3:0'
9583 '3:1'
9987 '3:2'
10391 '3:3'
Subscribe inner Observable: 5
10393 '5:0'
10796 '5:1'
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43942658

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档