我很难让RxJS的重试操作符和映射操作符一起工作。
本质上,我要做的是获取一个值流(在本例中,是用于测试的GitHub用户),并对每个值对该值执行一些操作。
但是,我希望能够以允许我为该操作设置有限重试的方式来处理错误。我还假设RxJS异步处理这样的事情。
Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll(x => Rx.Observable.of(x))
.map(x => {
if(x.id % 5 == 0) {
console.log("error");
return Rx.Observable.throw("error");
}
return x;
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));
到目前为止,这就是我所得到的,但是它应该重试相同的值3次--它只打印一次“错误”--这不是我要寻找的行为。
我是不是遗漏了一些琐碎的东西,或者这类事情在RxJS中根本不可能实现?总之,我想异步执行对值的操作,重试所述操作有限次,但允许其他有效操作并行继续。我只使用承诺就实现了类似的目标,但是代码相当混乱,我希望RxJS能让我读起来更好一些。
发布于 2017-10-16 05:37:12
编辑-删除早期失败的解决方案(某些注释可能不再相关),
要重试单个失败,您需要中断单个请求并重试这些请求。
为了允许正确地处理错误,对每个预期结果将流划分为子流.
我还添加了一些代码,允许其中一次重试成功。
const Observable = Rx.Observable
// Define tests here, allow requestAgain() to pass one
const testUser = (user) => user.id % 5 !== 0
const testUserAllowOneSuccess = (user) => user.id === 5 || user.id % 5 !== 0
const requestAgain = (login) => Observable.of(login)
.switchMap(login => jQuery.getJSON("https://api.github.com/users/" + login ))
.map(x => {
if(!testUserAllowOneSuccess(x)) {
console.log("logging error retry attempt", x.id);
throw new Error('Invalid user: ' + x.id)
}
return x;
})
.retry(3)
.catch(error => Observable.of(error))
const userStream = Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.concatAll()
const passed = userStream.filter(x => testUser(x))
const failed = userStream.filter(x => !testUser(x))
.flatMap(x => requestAgain(x.login))
const retryPassed = failed.filter(x => !(x instanceof Error))
const retryFailed = failed.filter(x => (x instanceof Error))
.toArray()
.map(errors => { throw errors })
const output = passed.concat(retryPassed, retryFailed)
output.subscribe(
x=> console.log('subscribe next', x.id ? x.id : x),
e => console.log('subscribe error', e)
);
工作实例CodePen
发布于 2017-10-15 23:31:03
我看到有几个问题可能会给你带来麻烦:
1)在您的map语句中,您通过返回一个可观察到的值来执行抛出操作。这将是适当的,如果成功案例也是返回一个可观察的,然后你是扁平的(通过连接,开关,或合并),但是你的成功案例只是发出一个值,永远不会变平。因此,当返回可观察的抛出时,它永远不会被夷为平地,因此只会传递并记录可观察的对象。
相反,您只需执行一个普通的javascript throw 'error'
(尽管考虑抛出一个错误对象而不是一个字符串)。
2)当retry收到错误时,它将重新订阅。如果您使用可观测值进行抓取,这将意味着它将再次进行提取。然而,您用一个承诺进行了获取,然后只从该承诺中创建了一个可观察到的。所以当你重新订阅可观测对象时,它最终会使用它第一次做的相同的承诺对象,并且这个承诺已经处于一个解决的状态。不会进行任何额外的抓取。
要解决这个问题,您需要有一个可观察的版本来执行提取。如果您需要从头开始制作它,它可能如下所示:
Rx.Observable.create(observer => {
jQuery.getJSON("https://api.github.com/users")
.then(result => {
observer.next(result);
observer.complete();
}, err => {
observer.error(err);
observer.complete();
});
})
虽然如果您打算经常这样做,我建议您创建一个为您执行包装的函数。
编辑:
我的最终目标是基本上能够获取一个api URL数组,并使用每个URL启动get请求。如果get请求因任何原因而失败,则应该尝试总共3次才能再次到达该端点。然而,这种get请求的失败不应该阻止其他请求的发生,它们应该是并行发生的。此外,如果一个get请求失败了3次,它应该被标记为失败,而不应该作为一个整体停止这个过程。到目前为止,我还没有找到使用RxJS的方法。
然后我会做这样的事:
function getJson (url) {
return Rx.Observable.create(observer) => {
jQuery.getJSON(url)
.then(result => {
observer.next(result);
observer.complete();
}, err => {
observer.error(err);
observer.complete();
});
}
}
const endpoints = ['someUrl', 'someOtherUrl'];
const observables = endpoints.map(endpoint =>
return getJson(url)
.retry(3)
.catch(err => Rx.Observable.of('error'));
});
Rx.Observable.forkJoin(...observables)
.subscribe(resultArray => {
// do whatever you need to with the results. If any of them
// errored, they will be represented by just the string 'error'
});
发布于 2017-10-15 23:37:26
映射函数返回Rx.Observable.throw("error");
。这是错误的,因为下面的retry()
将接收包装(即Observable<Observable<T>>
而不是Observable<T>
)。您可以使用flatMap
来纠正这一点,但也需要通过Rx.Observable.of(x)
包装该值。
.flatMap(x => {
if(x % 5 == 0) {
console.log("error");
return Rx.Observable.throw("error");
}
return Rx.Observable.of(x);
})
.retry(3)
.subscribe(x => console.log(x), e => console.log(e));
另一种选择是throw new Error()
而不是尝试return Rx.Observable.throw("error");
。
确保你重新尝试正确的事情
Rx.Observable.fromPromise(jQuery.getJSON("https://api.github.com/users"))
.retry(3)
.concatAll(x => Rx.Observable.of(x))
...
https://stackoverflow.com/questions/46760917
复制相似问题