前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >异步编程 - 09 Spring框架中的异步执行_@Async注解异步执行原理&源码解析

异步编程 - 09 Spring框架中的异步执行_@Async注解异步执行原理&源码解析

作者头像
小小工匠
发布2023-09-09 14:13:27
4180
发布2023-09-09 14:13:27
举报
文章被收录于专栏:小工匠聊架构

概述

在Spring中调用线程将在调用含有@Async注释的方法时立即返回,Spring是如何做到的呢?其实是其对标注@Async注解的类做了代理,比如下面的类Async-AnnotationExample。

代码语言:javascript
复制
public class AsyncAnnotationExample {
    @Async
    public CompletableFuture<String> doSomething() {

        // 1.创建future
        CompletableFuture<String> result = new CompletableFuture<String>();
        // 2.模拟任务执行
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "doSomething");
        } catch (Exception e) {
            e.printStackTrace();
        }
        result.complete("done");

        // 3.返回结果
        return result;
    }
}

由于AsyncAnnotationExample类中方法doSomething被标注了@Async注解,所以Spring框架在开启异步处理后会对AsyncAnnotationExample的实例进行代理,代理后的类代码框架如下所示。

代码语言:javascript
复制
public class AsyncAnnotationExampleProxy {
    
    public AsyncAnnotationExample getAsyncTask() {
        return asyncTask;
    }

    public void setAsyncAnnotationExample(AsyncAnnotationExample asyncTask) {
        this.asyncTask = asyncTask;
    }

    private AsyncAnnotationExample asyncTask;
    private TaskExecutor executor = new SimpleAsyncTaskExecutor();
    public CompletableFuture<String> dosomthingAsyncFuture() {

        return CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return asyncTask.dosomthing().get();
                } catch (Throwable e) {
                    throw new CompletionException(e);
                }
            }
        },executor);
    }
}

如上代码所示,Spring会对AsyncAnnotationExample类进行代理,并且会把AsyncAnnotationExample的实例注入AsyncAnnotationExampleProxy内部,当我们调用AsyncAnnotationExample的dosomthing方法时,实际调用的是AsyncAnnotation ExampleProxy的dosomthing方法,后者使用CompletableFuture.supplyAsync开启了一个异步任务(其马上返回一个CompletableFuture对象),并且使用默认的SimpleAsync TaskExecutor线程池作为异步处理线程,然后在异步任务内具体调用了AsyncAnnotationExample实例的dosomthing方法。

默认情况下,Spring框架是使用Cglib对标注@Async注解的方法进行代理的,具体拦截器是AnnotationAsyncExecutionInterceptor,我们看看其invoke方法。

代码语言:javascript
复制
public Object invoke(final MethodInvocation invocation) throws Throwable {
    //1.被代理的目标对象
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    //2. 获取被代理的方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    //3. 判断使用哪个执行器执行被代理的方法
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException(
                "No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }
    //4. 使用Callable包装要执行的方法
    Callable<Object> task = () -> {
        try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        }
        catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        }
        catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
    //5. 提交包装的Callable任务到指定执行器执行
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

·代码1获取被代理的目标对象的Class对象,本例中为class:com.artisan.async.AsyncProgram.AsyncAnnotationExample的Class对象;

·代码2获取被代理的方法,本例中为public java.util.concurrent.CompletableFuture:com.artisan.async.AsyncProgram.AsyncAnnotationExample.dosomthing();

·代码3根据规则获取使用哪个执行器TaskExecutor执行被代理的方法,其代码如下所示。

代码语言:javascript
复制
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    //4.1获取对应方法的执行器
    AsyncTaskExecutor executor = this.executors.get(method);
    //4.2不存在则按照规则查找
    if (executor == null) {
        //4.2.1 如果注解@Async中指定了执行器名称
        Executor targetExecutor;
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        }
        //4.2.2 获取默认执行器
        else {
            targetExecutor = this.defaultExecutor;
            if (targetExecutor == null) {
                synchronized (this.executors) {
                    if (this.defaultExecutor == null) {
                        this.defaultExecutor = getDefaultExecutor(this.beanFactory);
                    }
                    targetExecutor = this.defaultExecutor;
                }
            }
        }
        //4.2.3
        if (targetExecutor == null) {
            return null;
        }
        //4.2.4 添加执行器到缓存
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    //4.3返回查找的执行器
    return executor;
}

代码4.1从缓存executors中尝试获取method方法对应的执行器,如果存在则直接执行代码4.3返回;否则执行代码4.2.1判断方法的注解@Async中是否指定了执行器名称,如果有则尝试从Spring的bean工厂内获取该名称的执行器的实例,否则执行代码4.2.2获取默认的执行器(SimpleAsyncTaskExecutor),然后代码4.2.4把执行器放入缓存。

到这里就探讨完成了AnnotationAsyncExecutionInterceptor的invoke方法内代码3是如何确定那个执行器,然后在invoke方法中的代码4使用Callable包装要执行的方法,代码5提交包装的Callable任务到指定执行器。

到这里所有的执行使用的都是调用线程,调用线程提交异步任务到执行器后就返回了,异步任务真正执行的是具体执行器中的线程。下面我们看看代码5 doSubmit的代码。

代码语言:javascript
复制
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    //5.1判断方法返回值是否为CompletableFuture类型或者是其子类
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            }
            catch (Throwable ex) {
                throw new CompletionException(ex);
            }
        }, executor);
    }
    //5.2判断返回值类型是否为ListenableFuture类型或者是其子类
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    //5.3判断返回值类型是否为ListenableFuture类型或者是其子类
    else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    }
    //5.4其他情况下没有返回值
    else {
        executor.submit(task);
        return null;
    }
}

·代码5.1判断方法返回值是否为CompletableFuture类型或者是其子类,如果是则把任务使用CompletableFuture.supplyAsync方法提交到线程池executor执行,该方法会马上返回一个CompletableFuture对象。

·代码5.2判断方法返回值是否为ListenableFuture类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个ListenableFuture对象。

·代码5.3判断方法返回值是否为Future类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个Future对象。

·代码5.4说明方法不需要返回值,直接提交任务到线程池executor后返回null。

上面我们讲解了代理拦截器AnnotationAsyncExecutionInterceptor的invoke方法如何对标注@Async的方法进行处理,实现异步执行的。其实还有一部分还没讲,前面说了要开始异步处理,必须使用@EnableAsync注解或者task:annotation-driven/来开启异步处理,那么这两个部分背后到底做了什么呢?下面我们就来一探究竟。

首先我们看看添加@EnableAsync注解后发生了什么?在Spring容器启动的过程中会有一系列扩展接口对Bean的元数据定义、初始化、实例化做拦截处理,也存在一些处理器类可以动态地向Spring容器添加一些框架需要使用的Bean实例。其中ConfigurationClassPostProcessor处理器类则是用来解析注解类,并把其注册到Spring容器中的,其可以解析标注@Configuration、@Component、@ComponentScan、@Import、@ImportResource等的Bean。当我们使用context:annotation-config/或者context:component-scan/时,Spring容器会默认把ConfigurationClassPostProcessor处理器注入Spring容器。

而@EnableAsync的定义如下:

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}

所以我们添加了@EnableAsync注解后,ConfigurationClassPostProcessor会解析其中的@Import(AsyncConfigurationSelector.class),并把AsyncConfigurationSelector的实例注入Spring容器,其代码如下所示。

代码语言:javascript
复制
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }
}

AsyncConfigurationSelector实现了ImportSelector接口的selectImports方法,根据AdviceMode参数返回需要导入到Spring容器的Bean的全路径包名。该方法会在ConfigurationClassPostProcessor中的ConfigurationClassParser类中调用。默认情况下的adviceMode为PROXY,所以会把ProxyAsyncConfiguration的实例注入Spring容器。

ProxyAsyncConfiguration的代码如下所示。

代码语言:javascript
复制
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTarget
Class"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

如上代码ProxyAsyncConfiguration的asyncAdvisor方法添加了@Bean注解,所以该方法返回的Bean也会被注入Spring容器。该方法创建了AsyncAnnotationBean PostProcessor处理器,所以AsyncAnnotationBeanPostProcessor的一个实例会被注入Spring容器中,由于其实现了BeanFactoryAware接口,所以Spring框架会调用其setBeanFactory(BeanFactory beanFactory)方法把Spring BeanFactory(存放bean的容器)注入该Bean,setBeanFactory方法代码如下所示。

代码语言:javascript
复制
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);

    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}

如上代码创建了一个AsyncAnnotationAdvisor的实例并保存到了AsyncAnnotation BeanPostProcessor的advisor变量。Spring中每个AsyncAnnotationAdvisor都包含一个Advice(切面逻辑)和一个PointCut(切点),也就是会对符合PointCut的方法使用Advice进行功能增强,对应Advice和PointCut是在AsyncAnnotationAdvisor构造函数内创建的。

代码语言:javascript
复制
public AsyncAnnotationAdvisor(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    //6.1.异步注解类型
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
                ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    }
    catch (ClassNotFoundException ex) {
    }
    //6.2创建切面逻辑
    this.advice = buildAdvice(executor, exceptionHandler);
    //6.3创建切点
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

如上代码6.1收集注解@Async和@javax.ejb.Asynchronous到asyncAnnotationTypes,代码6.2则创建Advice,其代码如下所示。

代码语言:javascript
复制
protected Advice buildAdvice(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

由上述代码可知,这里创建了AnnotationAsyncExecutionInterceptor拦截器作为切面逻辑。下面看看代码6.3如何创建切点。

代码语言:javascript
复制
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            result = new ComposablePointcut(cpc);
        }
        else {
            result.union(cpc);
        }
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

在上述代码中使用收集的注解集合asyncAnnotationTypes,并在每个注解处创建了一个AnnotationMatchingPointcut作为切点,AnnotationMatchingPointcut内部的AnnotationClassFilter的方法matches则作为某个方法是否满足切点的条件,具体代码如下所示。

代码语言:javascript
复制
public boolean matches(Class<?> clazz) {
    return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(clazz, this.annotationType) :
            clazz.isAnnotationPresent(this.annotationType));
}

由如上代码可知,判断方法通过是否有注解@Async为依据来判断方法是否符合切点。

到此我们把AsyncAnnotationBeanPostProcessor的setBeanFactory(BeanFactory bean-Factory)方法逻辑讲解完毕了,其内部保存了一个AsyncAnnotationAdvisor对象用来对Spring容器中符合条件(这里为含有@Async注解的方法的Bean)的Bean的方法进行功能增强,下面我们看看AsyncAnnotationBeanPostProcessor的postProcess AfterInitialization方法是如何对这些符合条件的Bean进行代理的。

代码语言:javascript
复制
public Object postProcessAfterInitialization(Object bean, String beanName) {
    ...

    if (isEligible(bean, beanName)) {
        //7.1
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        //7.2 设置拦截器
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);
        //7.3 获取代理类
        return proxyFactory.getProxy(getProxyClassLoader());
    }

    // No proxy needed.
    return bean;
}

如上代码7.1使用prepareProxyFactory创建了代理工厂,其代码如下所示。

代码语言:javascript
复制
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.copyFrom(this);
    proxyFactory.setTarget(bean);
    return proxyFactory;
}

代码7.2则设置在其setBeanFactory方法内创建的AsyncAnnotationAdvisor对象作为Advisor,代码7.3从代理工厂获取代理后的Bean实例并返回到Spring容器,所以当我们调用含有@Async注解的Bean的方法时候,实际调用的是被代理后的Bean。

当我们调用被代理的类的方法时,代理类内部会先使用AsyncAnnotationAdvisor中的PointCut进行比较,看其是否符合切点条件(方法是否含有@Async)注解,如果不符合则直接调用被代理的对象的原生方法,否则调用AsyncAnnotationAdvisor内部的AnnotationAsyncExecutionInterceptor进行拦截异步处理。

在了解添加@EnableAsync注解后会发生什么后,下面我们来看看当添加标签<task:annotation-driven/>开启异步处理时,背后又发生了什么?在Spring中对于标签<XXX:/>总是会存在名称为XXXTaskNamespaceHandler的处理器负责该标签的解析,所以对于标签,自然存在TaskNamespaceHandler处理器负责其解析,其代码如下所示。

代码语言:javascript
复制
public class TaskNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser());
        this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser());
        this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser());
    }
}

由如上代码可知,<task:annotation-driven/>是使用AnnotationDrivenBeanDefinitionParser来进行解析的,下面我们看看其parse方法。

代码语言:javascript
复制
public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
...
    @Override
    @Nullable
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        Object source = parserContext.extractSource(element);

        ...
        //8.1 
        String mode = element.getAttribute("mode");
        if ("aspectj".equals(mode)) {
            // mode="aspectj"
            registerAsyncExecutionAspect(element, parserContext);
        }
        else {
            //8.2 mode="proxy"
            if (registry.containsBeanDefinition(TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)) {
                parserContext.getReaderContext().error(
                        "Only one AsyncAnnotationBeanPostProcessor may exist within the context.", source);
            }
            else {
                BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
                        "org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor");
                builder.getRawBeanDefinition().setSource(source);
                String executor = element.getAttribute("executor");
                if (StringUtils.hasText(executor)) {
                    builder.addPropertyReference("executor", executor);
                }
                String exceptionHandler = element.getAttribute("exception-handler");
                if (StringUtils.hasText(exceptionHandler)) {
                    builder.addPropertyReference("exceptionHandler", exceptionHandler);
                }
                if (Boolean.valueOf(element.getAttribute(AopNamespaceUtils.PROXY_TARGET_CLASS_ATTRIBUTE))) {
                    builder.addPropertyValue("proxyTargetClass", true);
                }
                registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME);
            }
        }

    
        //8.3 Finally register the composite component.
        parserContext.popAndRegisterContainingComponent();

        return null;
    }
}

由如上代码可知,其主要是用来创建AsyncAnnotationBeanPostProcessor在Spring容器中的元数据定义,并注册到Spring容器中,剩下的流程就与基于@EnableAsync注解开启异步处理的流程一样了。

小结

我们梳理如何使用Spring框架中的@Async进行异步处理,以及其内部如何使用代理的方式来实现,并且可知使用@Async实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了的负担

好文推荐

一文彻底讲透@Async注解的原理和使用方法

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-09-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 小结
  • 好文推荐
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档