我有一个用例,在这个用例中,我希望通过一个GroupedFlux创建一组PartitionKey,并在每个组延迟元素中创建100毫秒。然而,我希望多个小组同时开始。因此,如果有3个组,我期望每100毫秒发出3条消息。但是,使用下面的代码,我每100毫秒只看到一条消息。
这是我期待的代码。
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.flatMap(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
这是日志。
21:24:29.318 parallel-5] : GroupByKey : 2
21:24:29.424 parallel-6] : GroupByKey : 3
21:24:29.529 parallel-7] : GroupByKey : 1
21:24:29.634 parallel-8] : GroupByKey : 2
21:24:29.739 parallel-9] : GroupByKey : 3
21:24:29.844 parallel-10] : GroupByKey : 1
21:24:29.953 parallel-11] : GroupByKey : 2
21:24:30.059 parallel-12] : GroupByKey : 3
21:24:30.167 parallel-1] : GroupByKey : 1
(请参见每条日志语句之间几乎100 ms的差异。1s列是时间戳。
发布于 2021-11-01 15:44:35
经过更多的分析,我发现它运转良好。我的测试有错误的PartitionKey数据,这导致了一个GroupedFlux。
如果有人怀疑delayElements在groupedFlux上的工作方式不同,请回答我自己的问题。事实并非如此。
https://stackoverflow.com/questions/69762661
复制相似问题