在Spring中调用线程将在调用含有@Async注释的方法时立即返回,Spring是如何做到的呢?其实是其对标注@Async注解的类做了代理,比如下面的类Async-AnnotationExample。
public class AsyncAnnotationExample {
@Async
public CompletableFuture<String> doSomething() {
// 1.创建future
CompletableFuture<String> result = new CompletableFuture<String>();
// 2.模拟任务执行
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "doSomething");
} catch (Exception e) {
e.printStackTrace();
}
result.complete("done");
// 3.返回结果
return result;
}
}
由于AsyncAnnotationExample类中方法doSomething被标注了@Async注解,所以Spring框架在开启异步处理后会对AsyncAnnotationExample的实例进行代理,代理后的类代码框架如下所示。
public class AsyncAnnotationExampleProxy {
public AsyncAnnotationExample getAsyncTask() {
return asyncTask;
}
public void setAsyncAnnotationExample(AsyncAnnotationExample asyncTask) {
this.asyncTask = asyncTask;
}
private AsyncAnnotationExample asyncTask;
private TaskExecutor executor = new SimpleAsyncTaskExecutor();
public CompletableFuture<String> dosomthingAsyncFuture() {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return asyncTask.dosomthing().get();
} catch (Throwable e) {
throw new CompletionException(e);
}
}
},executor);
}
}
如上代码所示,Spring会对AsyncAnnotationExample类进行代理,并且会把AsyncAnnotationExample的实例注入AsyncAnnotationExampleProxy内部,当我们调用AsyncAnnotationExample的dosomthing方法时,实际调用的是AsyncAnnotation ExampleProxy的dosomthing方法,后者使用CompletableFuture.supplyAsync开启了一个异步任务(其马上返回一个CompletableFuture对象),并且使用默认的SimpleAsync TaskExecutor线程池作为异步处理线程,然后在异步任务内具体调用了AsyncAnnotationExample实例的dosomthing方法。
默认情况下,Spring框架是使用Cglib对标注@Async注解的方法进行代理的,具体拦截器是AnnotationAsyncExecutionInterceptor
,我们看看其invoke方法。
public Object invoke(final MethodInvocation invocation) throws Throwable {
//1.被代理的目标对象
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
//2. 获取被代理的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//3. 判断使用哪个执行器执行被代理的方法
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//4. 使用Callable包装要执行的方法
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//5. 提交包装的Callable任务到指定执行器执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
·代码1获取被代理的目标对象的Class对象,本例中为class:com.artisan.async.AsyncProgram.AsyncAnnotationExample的Class对象;
·代码2获取被代理的方法,本例中为public java.util.concurrent.CompletableFuture:com.artisan.async.AsyncProgram.AsyncAnnotationExample.dosomthing();
·代码3根据规则获取使用哪个执行器TaskExecutor执行被代理的方法,其代码如下所示。
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//4.1获取对应方法的执行器
AsyncTaskExecutor executor = this.executors.get(method);
//4.2不存在则按照规则查找
if (executor == null) {
//4.2.1 如果注解@Async中指定了执行器名称
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
//4.2.2 获取默认执行器
else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
//4.2.3
if (targetExecutor == null) {
return null;
}
//4.2.4 添加执行器到缓存
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
//4.3返回查找的执行器
return executor;
}
代码4.1从缓存executors中尝试获取method方法对应的执行器,如果存在则直接执行代码4.3返回;否则执行代码4.2.1判断方法的注解@Async中是否指定了执行器名称,如果有则尝试从Spring的bean工厂内获取该名称的执行器的实例,否则执行代码4.2.2获取默认的执行器(SimpleAsyncTaskExecutor),然后代码4.2.4把执行器放入缓存。
到这里就探讨完成了AnnotationAsyncExecutionInterceptor的invoke方法内代码3是如何确定那个执行器,然后在invoke方法中的代码4使用Callable包装要执行的方法,代码5提交包装的Callable任务到指定执行器。
到这里所有的执行使用的都是调用线程,调用线程提交异步任务到执行器后就返回了,异步任务真正执行的是具体执行器中的线程。下面我们看看代码5 doSubmit的代码。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//5.1判断方法返回值是否为CompletableFuture类型或者是其子类
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
//5.2判断返回值类型是否为ListenableFuture类型或者是其子类
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
//5.3判断返回值类型是否为ListenableFuture类型或者是其子类
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
//5.4其他情况下没有返回值
else {
executor.submit(task);
return null;
}
}
·代码5.1判断方法返回值是否为CompletableFuture类型或者是其子类,如果是则把任务使用CompletableFuture.supplyAsync方法提交到线程池executor执行,该方法会马上返回一个CompletableFuture对象。
·代码5.2判断方法返回值是否为ListenableFuture类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个ListenableFuture对象。
·代码5.3判断方法返回值是否为Future类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个Future对象。
·代码5.4说明方法不需要返回值,直接提交任务到线程池executor后返回null。
上面我们讲解了代理拦截器AnnotationAsyncExecutionInterceptor的invoke方法如何对标注@Async的方法进行处理,实现异步执行的。其实还有一部分还没讲,前面说了要开始异步处理,必须使用@EnableAsync注解或者task:annotation-driven/来开启异步处理,那么这两个部分背后到底做了什么呢?下面我们就来一探究竟。
首先我们看看添加@EnableAsync注解后发生了什么?在Spring容器启动的过程中会有一系列扩展接口对Bean的元数据定义、初始化、实例化做拦截处理,也存在一些处理器类可以动态地向Spring容器添加一些框架需要使用的Bean实例。其中ConfigurationClassPostProcessor处理器类则是用来解析注解类,并把其注册到Spring容器中的,其可以解析标注@Configuration、@Component、@ComponentScan、@Import、@ImportResource等的Bean。当我们使用context:annotation-config/或者context:component-scan/时,Spring容器会默认把ConfigurationClassPostProcessor处理器注入Spring容器。
而@EnableAsync的定义如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}
所以我们添加了@EnableAsync注解后,ConfigurationClassPostProcessor会解析其中的@Import(AsyncConfigurationSelector.class),并把AsyncConfigurationSelector的实例注入Spring容器,其代码如下所示。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
AsyncConfigurationSelector实现了ImportSelector接口的selectImports方法,根据AdviceMode参数返回需要导入到Spring容器的Bean的全路径包名。该方法会在ConfigurationClassPostProcessor中的ConfigurationClassParser类中调用。默认情况下的adviceMode为PROXY,所以会把ProxyAsyncConfiguration的实例注入Spring容器。
ProxyAsyncConfiguration的代码如下所示。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTarget
Class"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
如上代码ProxyAsyncConfiguration的asyncAdvisor方法添加了@Bean注解,所以该方法返回的Bean也会被注入Spring容器。该方法创建了AsyncAnnotationBean PostProcessor处理器,所以AsyncAnnotationBeanPostProcessor的一个实例会被注入Spring容器中,由于其实现了BeanFactoryAware接口,所以Spring框架会调用其setBeanFactory(BeanFactory beanFactory)方法把Spring BeanFactory(存放bean的容器)注入该Bean,setBeanFactory方法代码如下所示。
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
如上代码创建了一个AsyncAnnotationAdvisor的实例并保存到了AsyncAnnotation BeanPostProcessor的advisor变量。Spring中每个AsyncAnnotationAdvisor都包含一个Advice(切面逻辑)和一个PointCut(切点),也就是会对符合PointCut的方法使用Advice进行功能增强,对应Advice和PointCut是在AsyncAnnotationAdvisor构造函数内创建的。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//6.1.异步注解类型
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
}
//6.2创建切面逻辑
this.advice = buildAdvice(executor, exceptionHandler);
//6.3创建切点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
如上代码6.1收集注解@Async和@javax.ejb.Asynchronous到asyncAnnotationTypes,代码6.2则创建Advice,其代码如下所示。
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
由上述代码可知,这里创建了AnnotationAsyncExecutionInterceptor拦截器作为切面逻辑。下面看看代码6.3如何创建切点。
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
在上述代码中使用收集的注解集合asyncAnnotationTypes,并在每个注解处创建了一个AnnotationMatchingPointcut作为切点,AnnotationMatchingPointcut内部的AnnotationClassFilter的方法matches则作为某个方法是否满足切点的条件,具体代码如下所示。
public boolean matches(Class<?> clazz) {
return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(clazz, this.annotationType) :
clazz.isAnnotationPresent(this.annotationType));
}
由如上代码可知,判断方法通过是否有注解@Async为依据来判断方法是否符合切点。
到此我们把AsyncAnnotationBeanPostProcessor的setBeanFactory(BeanFactory bean-Factory)方法逻辑讲解完毕了,其内部保存了一个AsyncAnnotationAdvisor对象用来对Spring容器中符合条件(这里为含有@Async注解的方法的Bean)的Bean的方法进行功能增强,下面我们看看AsyncAnnotationBeanPostProcessor的postProcess AfterInitialization方法是如何对这些符合条件的Bean进行代理的。
public Object postProcessAfterInitialization(Object bean, String beanName) {
...
if (isEligible(bean, beanName)) {
//7.1
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//7.2 设置拦截器
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//7.3 获取代理类
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
如上代码7.1使用prepareProxyFactory创建了代理工厂,其代码如下所示。
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
代码7.2则设置在其setBeanFactory方法内创建的AsyncAnnotationAdvisor对象作为Advisor,代码7.3从代理工厂获取代理后的Bean实例并返回到Spring容器,所以当我们调用含有@Async注解的Bean的方法时候,实际调用的是被代理后的Bean。
当我们调用被代理的类的方法时,代理类内部会先使用AsyncAnnotationAdvisor中的PointCut进行比较,看其是否符合切点条件(方法是否含有@Async)注解,如果不符合则直接调用被代理的对象的原生方法,否则调用AsyncAnnotationAdvisor内部的AnnotationAsyncExecutionInterceptor进行拦截异步处理。
在了解添加@EnableAsync注解后会发生什么后,下面我们来看看当添加标签<task:annotation-driven/>
开启异步处理时,背后又发生了什么?在Spring中对于标签<XXX:/>
总是会存在名称为XXXTaskNamespaceHandler的处理器负责该标签的解析,所以对于标签,自然存在TaskNamespaceHandler处理器负责其解析,其代码如下所示。
public class TaskNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser());
this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser());
this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser());
}
}
由如上代码可知,<task:annotation-driven/>
是使用AnnotationDrivenBeanDefinitionParser来进行解析的,下面我们看看其parse方法。
public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
...
@Override
@Nullable
public BeanDefinition parse(Element element, ParserContext parserContext) {
Object source = parserContext.extractSource(element);
...
//8.1
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerAsyncExecutionAspect(element, parserContext);
}
else {
//8.2 mode="proxy"
if (registry.containsBeanDefinition(TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)) {
parserContext.getReaderContext().error(
"Only one AsyncAnnotationBeanPostProcessor may exist within the context.", source);
}
else {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
"org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor");
builder.getRawBeanDefinition().setSource(source);
String executor = element.getAttribute("executor");
if (StringUtils.hasText(executor)) {
builder.addPropertyReference("executor", executor);
}
String exceptionHandler = element.getAttribute("exception-handler");
if (StringUtils.hasText(exceptionHandler)) {
builder.addPropertyReference("exceptionHandler", exceptionHandler);
}
if (Boolean.valueOf(element.getAttribute(AopNamespaceUtils.PROXY_TARGET_CLASS_ATTRIBUTE))) {
builder.addPropertyValue("proxyTargetClass", true);
}
registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME);
}
}
//8.3 Finally register the composite component.
parserContext.popAndRegisterContainingComponent();
return null;
}
}
由如上代码可知,其主要是用来创建AsyncAnnotationBeanPostProcessor在Spring容器中的元数据定义,并注册到Spring容器中,剩下的流程就与基于@EnableAsync注解开启异步处理的流程一样了。
我们梳理如何使用Spring框架中的@Async进行异步处理,以及其内部如何使用代理的方式来实现,并且可知使用@Async实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了的负担