首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RxJS重试行为不像预期的那样?

RxJS重试行为不像预期的那样?
EN

Stack Overflow用户
提问于 2017-10-15 23:10:04
回答 4查看 2.7K关注 0票数 0

我很难让RxJS的重试操作符和映射操作符一起工作。

本质上,我要做的是获取一个值流(在本例中,是用于测试的GitHub用户),并对每个值对该值执行一些操作。

但是,我希望能够以允许我为该操作设置有限重试的方式来处理错误。我还假设RxJS异步处理这样的事情。

代码语言:javascript
运行
复制
Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll(x => Rx.Observable.of(x))
.map(x => {
    if(x.id % 5 == 0) {
        console.log("error");
        return Rx.Observable.throw("error");
    }
    return x;
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));

到目前为止,这就是我所得到的,但是它应该重试相同的值3次--它只打印一次“错误”--这不是我要寻找的行为。

我是不是遗漏了一些琐碎的东西,或者这类事情在RxJS中根本不可能实现?总之,我想异步执行对值的操作,重试所述操作有限次,但允许其他有效操作并行继续。我只使用承诺就实现了类似的目标,但是代码相当混乱,我希望RxJS能让我读起来更好一些。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-10-16 05:37:12

编辑-删除早期失败的解决方案(某些注释可能不再相关),

要重试单个失败,您需要中断单个请求并重试这些请求。

为了允许正确地处理错误,对每个预期结果将流划分为子流.

我还添加了一些代码,允许其中一次重试成功。

代码语言:javascript
运行
复制
const Observable = Rx.Observable

// Define tests here, allow requestAgain() to pass one
const testUser = (user) => user.id % 5 !== 0
const testUserAllowOneSuccess = (user) => user.id === 5 || user.id % 5 !== 0

const requestAgain = (login) => Observable.of(login)
  .switchMap(login => jQuery.getJSON("https://api.github.com/users/" + login ))
  .map(x => {
    if(!testUserAllowOneSuccess(x)) {
      console.log("logging error retry attempt", x.id);
      throw new Error('Invalid user: ' + x.id)
    }
    return x;
  })
  .retry(3)
  .catch(error => Observable.of(error))

const userStream = Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
  .concatAll()

const passed = userStream.filter(x => testUser(x))
const failed = userStream.filter(x => !testUser(x))
  .flatMap(x => requestAgain(x.login))

const retryPassed = failed.filter(x => !(x instanceof Error))
const retryFailed = failed.filter(x => (x instanceof Error))
  .toArray()
  .map(errors => { throw errors })

const output = passed.concat(retryPassed, retryFailed)

output.subscribe(
  x=> console.log('subscribe next', x.id ? x.id : x), 
  e => console.log('subscribe error', e)
);

工作实例CodePen

票数 2
EN

Stack Overflow用户

发布于 2017-10-15 23:31:03

我看到有几个问题可能会给你带来麻烦:

1)在您的map语句中,您通过返回一个可观察到的值来执行抛出操作。这将是适当的,如果成功案例也是返回一个可观察的,然后你是扁平的(通过连接,开关,或合并),但是你的成功案例只是发出一个值,永远不会变平。因此,当返回可观察的抛出时,它永远不会被夷为平地,因此只会传递并记录可观察的对象。

相反,您只需执行一个普通的javascript throw 'error' (尽管考虑抛出一个错误对象而不是一个字符串)。

2)当retry收到错误时,它将重新订阅。如果您使用可观测值进行抓取,这将意味着它将再次进行提取。然而,您用一个承诺进行了获取,然后只从该承诺中创建了一个可观察到的。所以当你重新订阅可观测对象时,它最终会使用它第一次做的相同的承诺对象,并且这个承诺已经处于一个解决的状态。不会进行任何额外的抓取。

要解决这个问题,您需要有一个可观察的版本来执行提取。如果您需要从头开始制作它,它可能如下所示:

代码语言:javascript
运行
复制
Rx.Observable.create(observer => {
  jQuery.getJSON("https://api.github.com/users")
    .then(result => {
      observer.next(result);
      observer.complete();
    }, err => {
      observer.error(err);
      observer.complete();
    });
})

虽然如果您打算经常这样做,我建议您创建一个为您执行包装的函数。

编辑:

我的最终目标是基本上能够获取一个api URL数组,并使用每个URL启动get请求。如果get请求因任何原因而失败,则应该尝试总共3次才能再次到达该端点。然而,这种get请求的失败不应该阻止其他请求的发生,它们应该是并行发生的。此外,如果一个get请求失败了3次,它应该被标记为失败,而不应该作为一个整体停止这个过程。到目前为止,我还没有找到使用RxJS的方法。

然后我会做这样的事:

代码语言:javascript
运行
复制
function getJson (url) {
  return Rx.Observable.create(observer) => {
    jQuery.getJSON(url)
      .then(result => {
        observer.next(result);
        observer.complete();
      }, err => {
        observer.error(err);
        observer.complete();
      });
  }
}

const endpoints = ['someUrl', 'someOtherUrl'];
const observables = endpoints.map(endpoint => 
  return getJson(url)
    .retry(3)
    .catch(err => Rx.Observable.of('error'));
});

Rx.Observable.forkJoin(...observables)
  .subscribe(resultArray => {
    // do whatever you need to with the results. If any of them
    //   errored, they will be represented by just the string 'error'
  });
票数 2
EN

Stack Overflow用户

发布于 2017-10-15 23:37:26

映射函数返回Rx.Observable.throw("error");。这是错误的,因为下面的retry()将接收包装(即Observable<Observable<T>>而不是Observable<T> )。您可以使用flatMap来纠正这一点,但也需要通过Rx.Observable.of(x)包装该值。

代码语言:javascript
运行
复制
.flatMap(x => {
    if(x % 5 == 0) {
        console.log("error");
        return Rx.Observable.throw("error");
    }
    return Rx.Observable.of(x);
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));

另一种选择是throw new Error()而不是尝试return Rx.Observable.throw("error");

确保你重新尝试正确的事情

代码语言:javascript
运行
复制
Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
  .retry(3)
  .concatAll(x => Rx.Observable.of(x))
  ...
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46760917

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档