前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >不仅会用@Async,我把源码也梳理了一遍(下)

不仅会用@Async,我把源码也梳理了一遍(下)

作者头像
java思维导图
发布2019-09-29 14:15:35
1K0
发布2019-09-29 14:15:35
举报
文章被收录于专栏:java思维导图

终于到了源码分析的环节了,在这之前我已经写过了两篇文章专门分析这个@Async了,还没看过的同学先去看下哈。

好了,不啰嗦。

分析过程:

  • 开启异步代理
  • 初始化excutor和exceptionHandler
  • 定义切面处理
  • 线程处理

开启异步化支持

@EnableAsync

@EnableAsync是开启某个模块的功能加载,之前在《导图梳理springboot手动、自动装配,让springboot不再难懂》介绍过,@EnableXXX一般都有两种用法,一种直接引入配置,一种可以通过注解的元数据选择需要导入的配置。这里的@EnableAsync明显属于第二种。

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

    // 默认是@Async和EJB3.1规范下的@Asynchronous注解,
    // 这里表示可以自定义注解启动异步
    Class<? extends Annotation> annotation() default Annotation.class;

    // true表示启用CGLIB代理
    boolean proxyTargetClass() default false;

    // 切面通知模式:默认动态代理PROXY
    AdviceMode mode() default AdviceMode.PROXY;
    int order() default Ordered.LOWEST_PRECEDENCE;

}

这个EnableAsync注解,一般注解属性我们都不需要改,默认就行了。那么这里最重要的代码就只有一行了,就是这个@Import(AsyncConfigurationSelector.class),导入AsyncConfigurationSelector配置。

AsyncConfigurationSelector

我们打开AsyncConfigurationSelector看看:

代码语言:javascript
复制
/*
 * 关键点:
 * 1、父类AdviceModeImportSelector
 * 2、导入配置类ProxyAsyncConfiguration
 *
 */
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

   private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =    "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

   @Override
   @Nullable
   public String[] selectImports(AdviceMode adviceMode) {
      switch (adviceMode) {
         case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
         case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
         default:
            return null;
      }
   }
}

上面的源码里面,我在开头写了2个关键点,

  • 一个是父类AdviceModeImportSelector
  • 二是导入配置类ProxyAsyncConfiguration

大家在源码分析的时候这样去看源码,当你打开一个类的时候,里面有很多个方法,你不知道你应该从哪个方法看起,你第一时间可以先去看继承关系,就比如这AsyncConfigurationSelector类,他继承了AdviceModeImportSelector,但AdviceModeImportSelector其实又实现了ImportSelector,而ImportSelector是我们比较熟悉的接口。我们知道ImportSelector里面有个方法selectImports(AnnotationMetadata)是用于根据注解属性导入配置的,所以这里就是我们的突破口。

所以综合来看源码的方法是这样的:

当你打开一个类,比较迷茫的时候:

  • 第一步:打开类的继承关系图(Ctrl+Shift+Alt+u)

找你熟悉的一些接口,如果是Spring,特别需要注意spring的Aware系列接口,比如:

  • BeanNameAware :可以获取容器中bean的名称
  • BeanFactoryAware:获取当前bean factory这也可以调用容器的服务
  • ApplicationContextAware:当前的applicationContext, 这也可以调用容器的服务
  • MessageSourceAware:获得message source,这也可以获得文本信息
  • applicationEventPulisherAware:应用事件发布器,可以发布事件,
  • ResourceLoaderAware:获得资源加载器,可以获得外部资源文件的内容;

这些都有一些特殊的功能,在spring项目被启动的时候会被调用对应的实现接口,所以看到这些Aware实现类的时候,如果你比较迷茫,可以从Aware的实现类的接口重写开始看。

而刚才我们说到的ImportSelector也是spring的一个比较特殊的接口了,我们就从selectImports(AnnotationMetadata)方法看起:

代码语言:javascript
复制
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {

   public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

   protected String getAdviceModeAttributeName() {
      return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
   }

   @Override
   public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
      Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
      Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");

      // 获取@EnableAsync的属性值
      AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
      if (attributes == null) {
         throw new IllegalArgumentException(String.format(
               "@%s is not present on importing class '%s' as expected",
               annType.getSimpleName(), importingClassMetadata.getClassName()));
      }

      AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());

      // 调用抽象方法,由子类重写
      String[] imports = selectImports(adviceMode);
      if (imports == null) {
         throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
      }
      return imports;
   }

   /**
    * 由子类AsyncConfigurationSelector重写了这个方法
    */
   @Nullable
   protected abstract String[] selectImports(AdviceMode adviceMode);

}

所以从这个方法看起你就很容易梳理到代码了。上面代码中,首先把EnableAsync的元数据都封装到AnnotationAttributes中,然后再获取到AdviceMode,最后选择出需要导入的配置,而导入的配置方法是个抽象方法selectImports(AdviceMode),由子类重写。子类自然就是AsyncConfigurationSelector,所以你才可以看到AsyncConfigurationSelector里有个selectImports方法,其实是重写了父类的。

现在逻辑相对清晰了,由于AdviceMode默认就是AdviceMode.PROXY,所以我们导入的配置就是ProxyAsyncConfiguration,接下来我们再去分析。

ProxyAsyncConfiguration

这看配置类的名称,翻译过来就是代理异步配置。有时候我们看源码也要从一个类的名称去猜测可能的功能。我们之前在第二篇文章中猜想过,应该有个Aop切面处理@Async注解,如果大家熟悉aop的原理的话,aop也是使用的了代理。那么应该就是在这个配置类里面实现的了。

好了,由一个名字我想了这么多,接下来先看下代码:

代码语言:javascript
复制
/*
 * 这里的关键点有2个:
 * 1、父类AbstractAsyncConfiguration
 * 2、初始化对象AsyncAnnotationBeanPostProcessor
 *
 */
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

   @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
      Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");

      AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
      bpp.configure(this.executor, this.exceptionHandler);

      // 自定义的异步注解
      Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
      if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
         bpp.setAsyncAnnotationType(customAsyncAnnotation);
      }

      bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
      bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
      return bpp;
   }
}

上面我依然点出了两个重点:

  • 1、父类AbstractAsyncConfiguration
  • 2、初始化对象AsyncAnnotationBeanPostProcessor 为什么一直强调父类,因为子类初始化之前,父类是要先完成初始化的,所以加载顺序都是父类先加载,这点必须清楚,另外就是子类一般都要重写父类的方法,重写的方法一般在父类的其他方法中会被调用。

既然是继承关系,我们依然来看下继承关系图:

上面我们就看到了上面我说过的Aware系列的接口之一ImportAware,作用是通过实现ImportAware接口获取对应注解的元数据。

所以,我们先去看完父类AbstractAsyncConfiguration,然后再回头来看子类ProxyAsyncConfiguration。

代码语言:javascript
复制
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

   @Nullable
   protected AnnotationAttributes enableAsync;

   @Nullable
   protected Supplier<Executor> executor;

   @Nullable
   protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;


   /**
    * 通过实现ImportAware接口获取@EnableAsync的属性值
    * @param importMetadata
    */
   @Override
   public void setImportMetadata(AnnotationMetadata importMetadata) {
      this.enableAsync = AnnotationAttributes.fromMap(
            importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
      if (this.enableAsync == null) {
         throw new IllegalArgumentException(
               "@EnableAsync is not present on importing class " + importMetadata.getClassName());
      }
   }

   /**
    * Collect any {@link AsyncConfigurer} beans through autowiring.
    *
    * 导入自定义的实现了AsyncConfigurer接口的bean
    * 并给executor、exceptionHandler附上自定义的值
    *
    */
   @Autowired(required = false)
   void setConfigurers(Collection<AsyncConfigurer> configurers) {

      // 如果没有自定义实现AsyncConfigurer接口,直接返回
      if (CollectionUtils.isEmpty(configurers)) {
         return;
      }
      if (configurers.size() > 1) {
         throw new IllegalStateException("Only one AsyncConfigurer may exist");
      }

      // 有的话,直接赋值
      AsyncConfigurer configurer = configurers.iterator().next();
      this.executor = configurer::getAsyncExecutor;
      this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
   }

}

1、setImportMetadata方法里读取了EnableAsync的元数据存在了AnnotationAttributes 中。2、setConfigurers导入自定义的AsyncConfigurer配置类。

我们在第一篇文中就自定义了线程池,还有异步线程的错误处理器等,就是通过实现AsyncConfigurer接口实现的,而我们自定义的类就会被注入到setConfigurers这个方法中,然后被赋值给当前类的executor和exceptionHandler。

所以这个父类中,其实就是一些初始化,初始化this.enableAsync、this.executor和this.exceptionHandler。

当然了,我们不是必须要实现AsyncConfigurer重写executor和exceptionHandler,所以this.executor和this.exceptionHandler可能还是为null的。

我们再回到ProxyAsyncConfiguration的asyncAdvisor()方法,看这个方法名称,有点异步切面的意思呀,那么返回值AsyncAnnotationBeanPostProcessor是否就是一个切面增强类呢?这个我们去看下继承关系。

继承的东西比较多,先来说说我们比较熟悉的东西:

  • BeanClassLoaderAware - 获取当前类的类加载器
  • BeanFactoryAware - 获取Spring的核心容器BeanFactory
  • BeanPostProcessor - bean初始化过程中的前置、后置处理器
  • AbstractAdvisingBeanPostProcessor - 生成aop代理的后置处理器

那么现在来梳理一下逻辑,首先ProxyAsyncConfiguration中会开始初始化AsyncAnnotationBeanPostProcessor,因为是@Bean,所以在对象注入spring容器之前,你先不用看aware系列,不用看BeanPostProcessor,先看@Bean里面方法的内容,那是注入spring容器之前可能做一些初始化。

而asyncAdvisor()方法中,关键的代码其实也没多少,逻辑如下:1、就是new一个AsyncAnnotationBeanPostProcessor对象 2、bpp.configure(this.executor, this.exceptionHandler);就是赋值excutor和exceptionHandler:

  • AsyncAnnotationBeanPostProcessor#configure
代码语言:javascript
复制
public void configure(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

   this.executor = executor;
   this.exceptionHandler = exceptionHandler;
}

3、bpp.setAsyncAnnotationType(customAsyncAnnotation);如果有自定义的异步注解就赋值

然后就返回了对象,通过@Bean注解,这时候这个new出来的AsyncAnnotationBeanPostProcessor对象就会注入到spring容器中,进而调用aware和beanPostProcessor那一套流程。

AsyncAnnotationBeanPostProcessor

接下来就是重点之中的重点了,可以说@Async的重点核心就是这个类,之前做了这么多准备就是为了初始化这个类。

我们来回顾一下上面的内容,首先我们获得了自定义的excutor和exceptionHandler,然后新建了AsyncAnnotationBeanPostProcessor对象并注入到了spring容器中,因为bean的生命周期比较复杂。

我怕很多人没研究过spring的容器,对spring bean的声明周期不太了解,特意从网上找了一张总结的图,让大家一张图搞懂Spring bean的生命周期,从Spring容器启动到容器销毁bean的全过程。

通过这个图,我们再回到我们的这个AsyncAnnotationBeanPostProcessor这个类的继承关系图,你就知道了执行的顺序流程如下:

  • 1、因为实现了BeanPostProcessor,所以先执行postProcessBeforeInitialization
  • 2、执行构造器方法
  • 3、执行BeanFactoryAware 、BeanClassLoaderAware的对应方法
  • 4、执行BeanPostProcessor的postProcessAfterInitialization方法

ok,顺序已给出,那么初始化的过程就清晰了,接下来我们只需要一步一步去看对应模块的代码。

  • 第一步:postProcessBeforeInitialization 好像啥都没做,忽略
代码语言:javascript
复制
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
   return bean;
}
  • 第二步:构造器

其实对象我们是new出来的,然后再通过@Bean注入容器的,并不是使用@Component或者xml方式注入,所以构造器应该是早就执行了

代码语言:javascript
复制
public AsyncAnnotationBeanPostProcessor() {
   setBeforeExistingAdvisors(true);
}

这个构造器只有一行代码,是说是不是在其他已存在的aop之前执行,参数表示是的。

  • 第三步:BeanClassLoaderAware、BeanFactoryAware

因为BeanClassLoaderAware是aop代码部分的了,是为了给对象生成代理的时候统一类加载器。所以这个方法我们不需要看。

我们来看下BeanFactoryAware的setBeanFactory方法:

  • AsyncAnnotationBeanPostProcessor#setBeanFactory
代码语言:javascript
复制
@Override
public void setBeanFactory(BeanFactory beanFactory) {
   super.setBeanFactory(beanFactory);

   /**
    * 创建切面处理
    */
   AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
   if (this.asyncAnnotationType != null) {
      advisor.setAsyncAnnotationType(this.asyncAnnotationType);
   }
   advisor.setBeanFactory(beanFactory);
   this.advisor = advisor;
}

代码中,除了引入beanFactory之外,还定义了一个切面advisor ,并把切面advisor赋值给当前对象。

我们中篇文中说过,用编码实现一个aop,需要准备几个东西:

  • ProxyFactory 代理工厂
  • Pointcut 切点
  • Advice 通知
  • Advisor 切面
  • Target 被代理对象

有了这几个组件之后,我们就可以构建成一个aop。

那么再看这里代码,这里的advisor就在这里初始化获取到了。而我们可以这样理解:Advisor = pointcut + Advice ,所以可说,我们完成了切面的初始化,其实也是@Async核心重要的一部分了。

ok,有了知识储备,搞啥都清晰。我们接着往下面走, 看AsyncAnnotationAdvisor的初始化过程先,也就是构造方法:

代码语言:javascript
复制
public AsyncAnnotationAdvisor(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

   Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
   asyncAnnotationTypes.add(Async.class);
   try {
      asyncAnnotationTypes.add((Class<? extends Annotation>)
            ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
   }
   catch (ClassNotFoundException ex) {
      // If EJB 3.1 API not present, simply ignore.
   }
   this.advice = buildAdvice(executor, exceptionHandler);
   this.pointcut = buildPointcut(asyncAnnotationTypes);
}

上面重点是这两行:

代码语言:javascript
复制
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);

切面等于切点加通知处理。就是这两样东西了。也就是构造器里面其实得到了切点和通知。接下来我们继续看着两个方法:

  • AsyncAnnotationAdvisor#buildAdvice
代码语言:javascript
复制
protected Advice buildAdvice(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

   AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
   interceptor.configure(executor, exceptionHandler);
   return interceptor;
}

我们先来看下AnnotationAsyncExecutionInterceptor的继承关系:

这里面我们比较熟悉的类有Advice、Interceptor、BeanFactoryAware。结合第二篇文章中讲到的生成aop的编码实现。你基本可以确定,这个AnnotationAsyncExecutionInterceptor类就是我们环绕通知的处理类了,Advice说明了这个类是个aop通知处理类,Interceptor说明了处理的方法是拦截器的invoke方法。切面拦截到切点时候就会到这个方法的invoke中执行对应的业务处理逻辑。

那么对应到@Async,执行的逻辑应该就是起一个线程执行方法。

清楚了这一点之后,我们再回到AnnotationAsyncExecutionInterceptor构造方法中,最终调用的是父类中的构造方法:

代码语言:javascript
复制
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
   this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
   this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}

这里就是给executor和exceptionHandler一个初始化的执行器或错误处理器,初始化默认处理器之后再执行interceptor.configure(executor, exceptionHandler);

代码语言:javascript
复制
public void configure(@Nullable Supplier<Executor> defaultExecutor,
      @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

   this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
   this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

这意思就是如果我们之前自定义了执行器和错误处理器,那么用我们自定义的,如果没有就用刚刚在构造器中初始化的默认的。

所以,切面的环绕通知处理Advice已经生成。我们再来看看另一个方法

  • AsyncAnnotationAdvisor#buildPointcut
代码语言:javascript
复制
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
   ComposablePointcut result = null;
   for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
      Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
      Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
      if (result == null) {
         result = new ComposablePointcut(cpc);
      }
      else {
         result.union(cpc);
      }
      result = result.union(mpc);
   }
   return (result != null ? result : Pointcut.TRUE);
}

这生成切点的逻辑也挺简单的,之前允许在@EnableAsync中通过annatation定义自定义的异步线程注解,我们常用的默认是@Async。所以这里意思其实是把所有的可能的注解都union起来,union就是合并意思。不管自定义的,还是默认的都作为切点。

这时候切点Pointcut已初始化好。

所以在AsyncAnnotationAdvisor中我们初始化好了Advice和Pointcut,而切面就等于Advice+Pointcut,那么它是一个切面来的吗?我们来看下继承关系:

果然实现了Advisor,是个切面。所以致此,我们已经定义了一个切面。

  • 第四步:执行BeanPostProcessor的postProcessAfterInitialization方法

上面三步走完之后,我们定义得到了一个切面,接下来我们进入最后一步,就是bean的后置处理,这个后置处理器其实是aop中实现的,所以我们定义一个aop切面,其实都需要进入这个后置处理器,那么这里面做了什么事情呢?

代码语言:javascript
复制
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
   if (this.advisor == null || bean instanceof AopInfrastructureBean) {
      // Ignore AOP infrastructure such as scoped proxies.
      return bean;
   }

   if (bean instanceof Advised) {
      Advised advised = (Advised) bean;
      if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
         // Add our local Advisor to the existing proxy's Advisor chain...
         if (this.beforeExistingAdvisors) {
            advised.addAdvisor(0, this.advisor);
         }
         else {
            advised.addAdvisor(this.advisor);
         }
         return bean;
      }
   }

   if (isEligible(bean, beanName)) {
      ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
      if (!proxyFactory.isProxyTargetClass()) {
         evaluateProxyInterfaces(bean.getClass(), proxyFactory);
      }
      proxyFactory.addAdvisor(this.advisor);
      customizeProxyFactory(proxyFactory);
      return proxyFactory.getProxy(getProxyClassLoader());
   }

   // No proxy needed.
   return bean;
}

看了里面的逻辑,应该就是直接给bean生成代理的了,那么我们写的Async代码中,那些需要我们生成代理呢,是不是所有写了@Async注解的方法或者类?因为我知道aop通过ProxyFactory生成代理的,所以我在 ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); 这里打个端点,然后启动项目。

果然,需要代理的是UserServiceImpl,因为我的@Async方法都是写在UserServiceImpl上的:

所以UserServiceImpl就是aop需要代理的对象。其中prepareProxyFactory的代码如下:

代码语言:javascript
复制
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
   ProxyFactory proxyFactory = new ProxyFactory();
   proxyFactory.copyFrom(this);
   proxyFactory.setTarget(bean);
   return proxyFactory;
}

就是创建proxyFactory对象,然后设置目标代理对象就是UserServiceImpl,然后接着走是设置interface的,

代码语言:javascript
复制
if (!proxyFactory.isProxyTargetClass()) {
   evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}

其中evaluateProxyInterfaces的内容是:

可以看到因为UserServiceImpl是个实现类,所以对应的接口需要声明,这样使用UserService调用方法时候才会触发aop。所以这里面的重要代码就是proxyFactory.addInterface(ifc);

然后方法继续执行,就到了proxyFactory.addAdvisor(this.advisor);这一句,advisor就是我们上面初始化好的切面,这里直接set给proxyFactory,定义切点,和切面处理。

再接着走,到了customizeProxyFactory(proxyFactory);这一步其实可以重写,然后proxyFactory自定义一些需要的属性等。@Async中没有重写,所以这一步我们跳过。

最后到了代码*return *proxyFactory.getProxy(getProxyClassLoader());这一步,是不是aop生成代理了。

源码梳理

所以总结我们上面所有的内容,我们再来梳理一下proxyFactory的伪代码过程:

代码语言:javascript
复制
ProxyFactory proxyFactory = new ProxyFactory();
// 目标代理类
proxyFactory.setTarget("UserServiceImpl bean");
// 代理接口
proxyFactory.addInterface("UserService");

// 切点
AsyncAnnotationAdvisor.pointcut = @Async注解
// 环绕通知处理
AsyncAnnotationAdvisor.advice = AnnotationAsyncExecutionInterceptor拦截器
// 切面 = 切点+通知
proxyFactory.addAdvisor("AsyncAnnotationAdvisor");
// 生成代理
UserService userService = proxyFactory.getProxy(getProxyClassLoader());

结合我们第二篇文章中的aop编码实现方式,是不是很相似了。所以这时候aop我们已经完全定义好了。

接下来我们回头来看环绕通知处理里面的业务逻辑,因为现在aop已经生成,拦截@Async之后我们需要异步处理代理的方法。这时候我们进入AnnotationAsyncExecutionInterceptor的invoke方法。

  • 在父类中AsyncExecutionInterceptor#invoke
代码语言:javascript
复制
public Object invoke(final MethodInvocation invocation) throws Throwable {
   Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
   Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
   final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

   // 获取executor执行器
   AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
   if (executor == null) {
      throw new IllegalStateException(
            "No executor specified and no default executor set on AsyncExecutionInterceptor either");
   }

   // 定义一个Callable异步线程
   Callable<Object> task = () -> {
      try {
         // 被拦截的方法执行
         Object result = invocation.proceed();
         if (result instanceof Future) {
            return ((Future<?>) result).get();
         }
      }
      catch (ExecutionException ex) {
         handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
      }
      catch (Throwable ex) {
         handleError(ex, userDeclaredMethod, invocation.getArguments());
      }
      return null;
   };

   return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

上面第一步获取到executor,然后再通过Callable定义一个异步线程。然后把task放在doSubmit中执行。

代码语言:javascript
复制
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
   if (CompletableFuture.class.isAssignableFrom(returnType)) {
      return CompletableFuture.supplyAsync(() -> {
         try {
            return task.call();
         }
         catch (Throwable ex) {
            throw new CompletionException(ex);
         }
      }, executor);
   }
   else if (ListenableFuture.class.isAssignableFrom(returnType)) {
      return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
   }
   else if (Future.class.isAssignableFrom(returnType)) {
      return executor.submit(task);
   }
   else {
      executor.submit(task);
      return null;
   }
}

这里面就明显了,就是去执行我们的task,然后返回结果。具体的我们就不再去深究啦。相信到了这一步,我们已经明白了@Async的原理。

以上就是@Async的源码分析,相对来说还是比较简单,看过aop源码的人再来看@Async的话几乎都不用花什么时间,所以技术这东西我们要学会多积累。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java思维导图 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • @EnableAsync
  • AsyncConfigurationSelector
  • ProxyAsyncConfiguration
  • AsyncAnnotationBeanPostProcessor
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档