webfulx 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动,可以让我们在不扩充硬件资源的前提下,提升系统的吞吐量和伸缩性。
我翻了无论官方的案例,文档,已经网上资料
都是用的默认的http请求线程池作为工作线程,我的默认是8个。
也就是当8个请求同时来的时候就堵塞了,下一个http请求就进不来了。我一直没搞懂所谓的异步回调到底是怎么用。
用我浅薄的认知,我理解的应该请求过来都是无堵塞的,然后工作线程异步执行完之后回调http响应。无论工作线程需要花多久时间,请求按理说都先被接受。
相信像我一样新萌玩家都先用 官方的案例 https://github.com/spring-projects/spring-boot/tree/2.1.x/spring-boot-samples/spring-boot-sample-webflux/src/main/java/sample/webflux。
比如这样:
@GetMapping("/")
public String welcome() {
return "Hello World";
}
@GetMapping("/")
public Mono<String> getUser() {
return Mono.just("开源中国-王念博客");
}
@GetMapping("/")
public Mono<String> getUser() {
return Mono.just(userService.getUser());
}
@Bean
public RouterFunction<ServerResponse> monoRouterFunction(EchoHandler echoHandler) {
return route(POST("/echo"), echoHandler::echo);
}
这些写法不上生产,本地调试调试,以为就已经响应式了。
刚开始发现线程池就8个时候,我就采用了一个卑鄙无耻的方式,配置100个,同学们你们觉得我做的对吗?
public static void main(String[] args) { System.setProperty("reactor.netty.ioWorkerCount", "100"); SpringApplication.run(CtmsDataProcessApplication.class, args); }
我废话真多,回归正题,我刚研究了一下发现貌似找到了一个对的方式。
@RequestMapping("/")
public Mono aaa(String abc) throws InterruptedException {
logger.info("请求线程,非堵塞,看线程名:调用/方法");
return Mono.create(new Consumer<MonoSink<Object>>() {
@Override
public void accept(MonoSink<Object> stringMonoSink) {
new Thread(new Runnable() {
@Override
public void run() {
logger.info("异步线程");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//关键就在这里,只要在请求时的这个MonoSink里,无论是哪个线程里设置这个值就可以了
stringMonoSink.success(abc);
}
}).start();
}
});
}
上面我为了测试 new Thread,这个线程就是工作线程,这样就容易理解了,不过这里推荐使用 jdk1.8新特性 CompletableFuture 来实现。
到现在也不知道用法对不对,如果有不对的老哥,看到了给小弟留个言。