腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(2074)
视频
沙龙
1
回答
EmitterProcessor
-
onNext
或
FluxSink
有了
EmitterProcessor
,我可以使用
onNext
或
sink来获取
FluxSink
并使用它。
EmitterProcessor
<Long>
emitterProcessor
=
EmitterProcessor
.create(100);
emitterProcessor
.
onNext
(1L);
FluxSink<
浏览 131
提问于2020-04-04
得票数 1
1
回答
如何在春季网络流量中设置OverflowStrategy?
、
、
请参阅如果下游无法跟上,
FluxSink
.OverflowStrategy BUFFER缓冲所有信号。警告!这会进行无界缓冲,并可能导致OutOfMemoryError。我正在寻找一种将此更改为删除
或
错误的方法:(请参阅https://projectreactor.io/docs/core/release/api/reactor/core/publisher/
FluxSink
.OverflowStrategy.html
浏览 3
提问于2020-04-30
得票数 0
2
回答
当消息准备好时,如何使用反应性流量/Mono将消息推送到上游,而不是在状态间隔内轮询?
、
、
、
、
试图在消息可用/准备就绪时将消息推送到上游,并在刷新后关闭连接,而不是使用spring反应通量间隔轮询消息。public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) { return Flux.<String>interval(Duration.of
浏览 0
提问于2019-01-12
得票数 2
回答已采纳
2
回答
与反应流一起发出
、
、
、
String msg = "MSG #" + i++; Thread.sleep(1000);输出:INFO reactor.Flux.Map.1 -
onNext
(Received: 1)INFO reactor.Flux.Map.1 -
onNext
(Receiv
浏览 0
提问于2020-01-19
得票数 2
回答已采纳
1
回答
从
EmitterProcessor
移到Sinks.many()
一段时间以来,我一直在使用带有内置sink的create an
EmitterProcessor
,如下所示:
FluxSink
<String> sink = emitter.sink(
FluxSink
.OverflowStrategy.LATEST);Flux现在我们看到
EmitterProcessor</e
浏览 0
提问于2021-03-04
得票数 0
1
回答
反应堆
EmitterProcessor
只保留最后n个元素?
、
、
如何创建只保留最新n个元素的
EmitterProcessor
,这样即使没有订阅服务器,它也能工作?现在,我创建了这样一个处理器:外部系统在一天内随机提供温度更新。在该系统的回调中,我这样做: processor.
onNext
(temp);但是,一旦添加了
onNext
(...)元素,proce
浏览 0
提问于2019-01-28
得票数 3
回答已采纳
1
回答
使用zip和last(默认值)的
EmitterProcessor
永远不会发送值
、
、
、
我尝试使用zip和last运算符来组合三个不同的
EmitterProcessor
,并处理组合后的结果。除了其中一个处理器不发出项的场景之外,这已经按照下面的定义完美地工作了。我对last(defaultValue)运算符的理解是,当
EmitterProcessor
完成时,它将发出默认值,而不会发出任何值。这将依次完成zip并发送默认值以及来自其他处理器的任何实际值。
EmitterProcessor
.zip( emitter2.last(&
浏览 0
提问于2020-01-21
得票数 0
1
回答
使用
EmitterProcessor
作为事件总线的工程反应堆
、
、
、
、
我们正在测试如何使用
EmitterProcessor
作为事件总线。>
emitterProcessor
) { // put this subscription); requestService.accept(data);
FluxSink</
浏览 6
提问于2020-04-22
得票数 2
回答已采纳
1
回答
使用reactor的Flux.buffer进行批处理工作仅适用于单个项目
、
、
、
我正在尝试使用Flux.buffer()从数据库批量加载。 用例是,从数据库加载记录可能是“突发的”,我想介绍一个小缓冲区,以便在可能的情况下将负载分组在一起。 我的概念方法是使用某种形式的处理器,发布到它的接收器,让缓冲区,然后订阅和过滤我想要的结果。 我尝试了多种不同的方法(不同类型的处理器,以不同的方式创建过滤后的Mono )。 下面是我到目前为止所得到的--主要是通过跌跌撞撞。 目前,这只返回一个结果,但后续调用会被丢弃(尽管我不确定在哪里)。 class BatchLoadingRepository { // I've tried all manner of dif
浏览 68
提问于2019-03-16
得票数 24
回答已采纳
1
回答
为什么在其中一个订阅者取消订阅后Sinks.many().multicast().onBackpressureBuffer()要完成,以及如何避免订阅
、
、
、
11:49:06.936 [main] INFO reactor.Flux.
EmitterProcessor
.1 - onSubscribe(
EmitterProcessor
.EmitterInner)reactor.Flux.
EmitterProcessor
.1 -
onNext
(1)11:49:06.942 [main] INFO reactor.Flux.
EmitterProcessor
.1 - request(unbounded) 11:49:06.943 [main] INF
浏览 4
提问于2021-03-17
得票数 12
回答已采纳
1
回答
发布不同对象类型的
FluxSink
、
> generateResponses(String request) { final FluxProcessor publish =
EmitterProcessor
.create().serialize(); final
FluxSink
<Response2> sink2= publish.sink(); fina
浏览 0
提问于2020-02-27
得票数 0
1
回答
在反应堆Sinks.Many()中,什么等同于
EmitterProcessor
onCancel
每一个,我以前都有代码
FluxSink
<String> sink = emitter.sink(
FluxSink
.OverflowStrategy.LATEST); cancelSink(id, request);例如,当使用rSocket
浏览 0
提问于2021-03-03
得票数 2
回答已采纳
1
回答
如何使用spring云流(使用webflux)以反应性的方式向Kafka主题发布消息?
、
、
、
、
浏览 7
提问于2022-10-13
得票数 0
回答已采纳
1
回答
反应堆选择接收器/处理器
、
、
天真地,我可以实现这样的东西: private final
EmitterProcessor
<Object>
emitterProcessor
;
浏览 0
提问于2018-01-12
得票数 3
回答已采纳
1
回答
反应器多次使用doOnNext
所以首先我编码如下: .doOnNext(s -> System.out.println("2 " + s)); stream.
onNext
EmitterProcessor
<String> stream =
EmitterProcessor
.&qu
浏览 3
提问于2016-11-28
得票数 4
回答已采纳
2
回答
使用spring boot rsocket捕获取消帧类型
、
、
、
我有一个spring boot rsocket实现,如果客户端取消
或
关闭他们的rsocket请求,那么我想取消服务器上的其他订阅注册。在spring boot服务器上的日志中,我可以看到发送
或
接收到一条取消消息:@MessageMapping("cancel") log.info("Captured cance
浏览 20
提问于2020-03-09
得票数 1
回答已采纳
1
回答
Stream (Hoxton)卡夫卡生产者/消费者不参与EmbeddedKafka集成测试
、
、
、
.setHeader("spring.cloud.stream.sendto.destination", destinationName).build(); return () -> processor; public
EmitterProcessor
<Message>> processor(){ return
EmitterProce
浏览 3
提问于2020-02-02
得票数 1
回答已采纳
1
回答
反应堆-磁通要求非过滤用户
import reactor.core.publisher.
EmitterProcessor
@JvmStaticfun main(args: Array<String>) { .subscribe() Thread.sleep
浏览 0
提问于2017-01-06
得票数 1
回答已采纳
1
回答
如何确保反应堆通量处理提供的所有消息
、
让我们考虑下面的代码:public void testFluxCreate() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); IntStream.range(1, 1001).forEach(it -> {
浏览 15
提问于2017-11-09
得票数 1
1
回答
在通量上同时使用publishOn和subscribeOn不会导致任何事情发生。
、
、
如果我使用subscribeOn(Schedulers.immediate())
或
弹性,它可以工作。有什么吗,为什么? val testPublisher =
EmitterProcessor
.create<String>().connect() testPublisher.subscribe { println("subscribe on ${Thread.currentThread().name}&quo
浏览 1
提问于2017-01-30
得票数 11
回答已采纳
点击加载更多
热门
标签
更多标签
云服务器
对象存储
ICP备案
云点播
实时音视频
活动推荐
运营活动
广告
关闭
领券