我需要构建一个流,它在调用后立即发出api请求,如果页面没有刷新,则每小时(下午1:00,下午2:00 )。我用setTimeout()构建它,但是我想用RxJ实现它。你能帮帮我吗?
isUpdated() {
return new Observable<Object>(function update(obs) {
this.http.get(`/update`).subscribe(data => {
obs.next(data);
});
const timer = (60 - new Date().getMinutes()) * 60 * 1000;
setTimeout(() => update.call(this, obs), timer);
}.bind(this));
}
//call
isUpdated().subscribe(data => console.log(data));
发布于 2020-12-03 07:01:03
我认为要解决这个问题,你需要把它分解成更小的部分。
首先,我们知道,在某个时候,根据当前时间,我们想知道何时触发下一个调用。如果我们得到一个时间戳,它给出了ms中的当前时间,并且我们希望在下一个小时之前得到ms的数量,那么我们可以这样做:
const timeToNextHourInMs = (currentTimestampMs) => {
const timestampSeconds = currentTimestampMs / 1000;
const numberOfSecondsIntoTheCurrentHour = timestampSeconds % 3600;
const numberOfSecondsToTheNextHour = 3600 - numberOfSecondsIntoTheCurrentHour;
return numberOfSecondsToTheNextHour * 1000;
};
我希望变量名足够明确,我不需要评论,但让我知道否则。
接下来,我们要处理流问题:
以下是你如何做到的:
this.http.get(`/update`).pipe(
timestamp(),
switchMap(({ timestamp, value }) =>
concat(
of(value),
EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
)
),
repeat()
);
让我们来看看上面的逻辑:
switchMap
,但是由于HTTP调用只返回1值,所以在这个非常特殊的情况下它并不重要。我们也可以用flatMap
或concatMap
switchMap
内部,我们首先使用concat
发送我们刚刚从HTTP调用中得到的值,但也让这个可观察到的值一直保持到当前our的末尾(通过使用我们先前创建的函数)。retry
,一旦流完成,我们将再次订阅它(并且提醒您,流只在新的一个小时开始时才会完成!)我建议在这里添加一件事情,但这并不是最初问题的一个要求,那就是要进行一些错误处理,以便当您进行调用时出现问题时,它会在几秒钟后自动重试。否则,想象一下,当轮询开始时,如果您的网络在那个时候不工作5s,那么您的流就会立即出错。
为此,您可以引用此精彩的回答,并在可重用的自定义操作符中这样做:
const RETRY_DELAY = 2000;
const MAX_RETRY_FOR_ONE_HTTP_CALL = 3;
const automaticRetry = () => (obs$) =>
obs$.pipe(
retryWhen((error$) =>
error$.pipe(
concatMap((error, index) =>
iif(
() => index >= MAX_RETRY_FOR_ONE_HTTP_CALL,
throwError(error),
of(error).pipe(delay(RETRY_DELAY))
)
)
)
)
);
这将在每次重试之间延迟三次可观察到的重试。3次之后,流将抛出最后发出的错误,从而出错。
现在,我们只需将这个自定义操作符添加到流中:
this.http.get(`/update`).pipe(
automaticRetry(),
timestamp(),
switchMap(({ timestamp, value }) =>
concat(
of(value),
EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
)
),
repeat()
);
我还没有对上面的代码进行实际测试,所以请在您这边这样做,并让我知道它是如何进行的。但如果我的逻辑是正确的,下面是事情的发展方向:
automaticRetry
,它等待了3秒,然后重新尝试了一次,仍然什么也没有。它再等待3秒,这一次很好,结果会被传递到下游让我知道是怎么回事
发布于 2020-12-03 02:44:16
也许您可以这样做(我没有测试这段代码):
import { merge, concat, of, timer, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';
...
isUpdated() {
return concat(
merge(
of(null), // make call inmediately
timer((60 - new Date().getMinutes()) * 60 * 1000), // the next full hour eg. 2:00
),
interval(60 * 60 * 1000), // every hour
).pipe(
switchMap(() => this.http.get(...)),
)
})
concat
只有在第一个merge
完成后才会订阅interval
,这是在下一个完整的小时(例如: 2:00 ),然后每小时发出一次。
@MrkSef有一个很好的想法,它可以简化成这样的东西:
isUpdated() {
return timer(
(60 - new Date().getMinutes()) * 60 * 1000, // the next full hour eg. 2:00
60 * 60 * 1000 // every hour
).pipe(
startWith(null), // initial request
switchMap(() => this.http.get(...)),
)
})
发布于 2020-12-03 08:28:36
我觉得很简单。
计时器接受两个参数,第一个参数是第一个呼叫的延迟,然后是调用的间隔。你的下一个小时的偏移是正确的,我用你的代码。
使用startWith(null)
立即启动。
const startDelayMs = (60 - new Date().getMinutes()) * 60 * 1000;
const hourMs = 60 * 60 * 1000;
timer(startDelayMs, hourMs).pipe(
startWith(null),
switchMap(()) // do your call here
)
https://stackoverflow.com/questions/65124122
复制