同步的处理模式
在我们传统的服务中,当一个HTTP请求过来时,tomcat或者是其他的中间件都会有一个主线程来处理请求,所有的业务逻辑都会在这个线程里面处理完,最后会给出一个响应。由于我们的tomcat所管理的线程数是有限的,当线程到达一定程度后,再有请求过来将会无法去处理了。
而异步处理是,当一个HTTP请求过来时,我们tomcat的主线程会去调起其他的副线程来执行我们的业务逻辑,当副线程执行完后再由我们的主线程把结果响应回去。在副线程处理业务逻辑中,我们的主线程是可以空闲出来的,然后去处理其他的请求的,也就是说采用这种模式去处理请求的话,我们的服务器的吞吐量会有一个明显的提升。
下面我们用线程等待模拟我们的实际业务,通过访问这个接口,得到的结果显然是开始与结束相差1秒。
@RequestMapping("/order")
public String order() throws Exception{
logger.info("主线程开始");
Thread.sleep(1000);
logger.info("主线程结束");
return "success";
}
首先我们能想到的是使用Callable来实现我们的异步处理。
@RequestMapping("/order")
public Callable<String> order() throws Exception {
logger.info("主线程开始");
Callable<String> result = new Callable<String>() {
@Override
public String call() throws Exception {
logger.info("副线程开始");
Thread.sleep(1000);
logger.info("副线程返回");
return "success";
}
};
logger.info("主线程结束");
return result;
}
经过这样的改动后,我们的业务处理放到副线程里面。当我们调用这个接口,控制台是:
并且我们通过浏览器的开发工具知道,这个请求一个耗时1秒多,也就是我们的业务消耗的时间。
虽然我们已经学会使用Callable去异步处理我们的请求,但是因为Runnable这种形式不能满足我们所有的场景。在使用Runnable处理时,如上面代码案例所示,我们的副线程必须由我们的主线程来调起的。
在我们正真的企业开发中,场景是比较复杂的,我们就以订单下单通常为例:
如上图所示,我们可以知道,接收下单的请求和真正处理下单的业务逻辑并不是在同台服务器上,当HTTP请求进到应用1里面,应用1会把他放到消息队列中,然后应用2去监听这个消息队列,当监听到这个消息队列中有下单的请求后,应用2开始处理业务逻辑。应用2处理完毕后,会把这个消息结果放进消息队列中,同时应用1里面有个一线程2监听消息队列,当它监听到有请求处理完毕后,它会根据消息的结果去返回一个HTTP响应。
在这个场景里面,线程1和线程2是完全隔离的。在这种相似的场景下面Runnable显然不能满足我们的需求了,这时就要用到我们的DeferredResult。下面我们来写一个段代码来看如何用DeferredResult来实现我们的异步请求。
由于篇幅字数不宜过长,我们也不可能搭建一个消息队列的服务处理。我们把上面业务图分成4块内容。
我们使用下面这个类来进行模拟消息队列就好:
@Component
public class MockQueue {
/**
* 下单的的消息
* 当这个属性有值的时候就认为有下单消息
*/
private String placeOrder;
/**
* 订单完成的消息
* 当这个属性有值的时候就认为有订单完成的消息
*/
private String completeOrder;
private Logger logger = LoggerFactory.getLogger(getClass());
public String getPlaceOrder() {
return placeOrder;
}
public void setPlaceOrder(String placeOrder) throws Exception {
// 假设这是应用2单独的线程
new Thread(() -> {
logger.info("接到下单请求, " + placeOrder);
try {
// 模拟下单的过程
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
// 收到下单完成消息
this.completeOrder = placeOrder;
logger.info("下单请求处理完毕," + placeOrder);
}).start();
}
public String getCompleteOrder() {
return completeOrder;
}
public void setCompleteOrder(String completeOrder) {
this.completeOrder = completeOrder;
}
}
我们使用下面这块代码来模拟我们的线程1和线程2间发送DefferredResult:
@Component
public class DeferredResultHolder {
private Map<String, DeferredResult<String>> map = new HashMap<String, DeferredResult<String>>();
public Map<String, DeferredResult<String>> getMap() {
return map;
}
public void setMap(Map<String, DeferredResult<String>> map) {
this.map = map;
}
}
再来看下我们主线程的代码,在这段代码中我们是看不到有其他的线程来参与的。
@RestController
public class AsyncController {
@Autowired
private MockQueue mockQueue;
@Autowired
private DeferredResultHolder deferredResultHolder;
private Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping("/order")
public DeferredResult<String> order() throws Exception {
logger.info("主线程开始");
// 8为随机数作为我们的订单号
String orderNumber = RandomStringUtils.randomNumeric(8);
mockQueue.setPlaceOrder(orderNumber);
DeferredResult<String> result = new DeferredResult<>();
deferredResultHolder.getMap().put(orderNumber, result);
return result;
}
我们最后来实现我们线程2监听我们的消息队列的代码,这个类需要实现ApplicationListener。接口泛型是ContextRefreshedEvent,这个事件是Spring容器初始化完毕的事件。
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private MockQueue mockQueue;
@Autowired
private DeferredResultHolder deferredResultHolder;
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
new Thread(() -> {
// 模拟监听completeOrder有值
while (true) {
// 有值就处理
if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
String orderNumber = mockQueue.getCompleteOrder();
logger.info("返回订单处理结果:"+orderNumber);
// setResult设置最终要返回的信息
deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
mockQueue.setCompleteOrder(null);
}else{
// 没值就睡1秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
实际上我们有3个线程,第一个是主线程负责接收HTTP请求,第二个线程是真正处理下单的逻辑,第三个线程把处理结果返回给客户端。而第一个线程和最后一个线程会用DefferredResult实现交互。
处理结果如下:
对于前台来说是毫无感知的,但我们的后台一共有三个线程处理这个业务。
@Configuration
public class WebConfig extends WebMvcConfigurerAdapter {
/**
* 同步的拦截器注册
* @param registry
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(timeInterceptor);
}
/**
* 异步的拦截器注册
* @param configurer
*/
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 注册异步的拦截器
configurer.registerCallableInterceptors();
// 设置异步请求默认此时时间
configurer.setDefaultTimeout();
// 设置异步的线程池
// 在默认的情况下。就像我们Runnable
// spring副线程是不会去复用的,而是开启新的线程
// 那么我们可以通过这个方法设置可从用的线程池
configurer.setTaskExecutor();
}
}