Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Async的线程池使用的哪个?

Async的线程池使用的哪个?

原创
作者头像
青衫染红尘
发布于 2022-08-02 11:26:01
发布于 2022-08-02 11:26:01
1.2K0
举报
文章被收录于专栏:Surpass' BlogSurpass' Blog

前言

在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync@Async 就可以使用它了。但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor 而不是之前熟悉的 SimpleAsyncTaskExecutor

那么来看一下他的执行过程吧。

正文

  1. 首先要使异步生效,我们得在启动类中加入 @EnableAsync 那么就点开它看看。它会使用 @Import 注入一个 AsyncConfigurationSelector 类,启动是通过父类可以决定它使用的是配置类 ProxyAsyncConfiguration
代码语言:java
AI代码解释
复制
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

    public AsyncConfigurationSelector() {
    }

    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch(adviceMode) {
        case PROXY:
            return new String[]{ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
        default:
            return null;
        }
    }
}
  1. 点开能够看到注入一个 AsyncAnnotationBeanPostProcessor 。它实现了 BeanPostProcessor 接口,因此它是一个后处理器,用于将 Spring AOPAdvisor 应用于给定的 bean 。从而该bean上通过异步注解所定义的方法在调用时会被真正地异步调用起来。
代码语言:java
AI代码解释
复制
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    public ProxyAsyncConfiguration() {
    }

    @Bean(
        name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
    )
    @Role(2)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }

        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
        return bpp;
    }
}
  1. AsyncAnnotationBeanPostProcessor 的父类实现了 BeanFactoryAware ,那么会在 AsyncAnnotationBeanPostProcessor 实例化之后回调 setBeanFactory() 来实例化切面 AsyncAnnotationAdvisor
代码语言:java
AI代码解释
复制
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    //定义一个切面
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }

    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}
  1. AsyncAnnotationAdvisor 构造和声明切入的目标(切点)和代码增强(通知)。
代码语言:java
AI代码解释
复制
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
        //通知
		this.advice = buildAdvice(executor, exceptionHandler);
        //切入点
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
  1. 通知就是最终要执行的。buildAdvice 用于构建通知,主要是创建一个 AnnotationAsyncExecutionInterceptor 类型的拦截器,并且配置好使用的执行器和异常处理器。真正的异步执行的代码在 AsyncExecutionAspectSupport 中!
代码语言:java
AI代码解释
复制
protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		//配置拦截器
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}
  1. 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器。
代码语言:java
AI代码解释
复制
public void configure(@Nullable Supplier<Executor> defaultExecutor,
			@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    	//默认执行器
		this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
		this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
	}
  1. getDefaultExecutor() 方法,用来查找默认的执行器,父类 AsyncExecutionAspectSupport 首先寻找唯一一个类型为 TaskExecutor 的执行器并返回,若存在多个则寻找默认的执行器 taskExecutor ,若无法找到则直接返回null。子类 AsyncExecutionInterceptor 重写 getDefaultExecutor 方法,首先调用父类逻辑,返回null则配置一个名为 SimpleAsyncTaskExecutor 的执行器
代码语言:java
AI代码解释
复制
/**
 * 父类
 * 获取或构建此通知实例的默认执行器
 * 这里返回的执行器将被缓存以供后续使用
 * 默认实现搜索唯一的TaskExecutor的bean
 * 在上下文中,用于名为“taskExecutor”的Executor bean。
 * 如果两者都不是可解析的,这个实现将返回 null
 */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			// 搜索唯一的一个TaskExecutor类型的bean并返回
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
            //找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				if (logger.isInfoEnabled()) {
					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
				}
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
            //未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				logger.info("No task executor bean found for async processing: " +
						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
			}
			// Giving up -> either using local default executor or none at all...
		}
	}
	return null;
}


/**
 * 子类
 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器
 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx ,为什么有了自己的线程池有可能异步用到了自己的线程池配置。

我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke()determineAsyncExecutor 用来决策使用哪个执行器

代码语言:java
AI代码解释
复制
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    //在缓存的执行器中选择一个对应方法的执行器
    AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
    if (executor == null) {
        //获取@Async注解中的value(指定的执行器)
        String qualifier = this.getExecutorQualifier(method);
        Executor targetExecutor;
        if (StringUtils.hasLength(qualifier)) {
            //获取指定执行器的bean
            targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
        } else {
            //选择默认的执行器
            targetExecutor = (Executor)this.defaultExecutor.get();
        }

        if (targetExecutor == null) {
            return null;
        }

        executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
        //将执行器进行缓存
        this.executors.put(method, executor);
    }

    return (AsyncTaskExecutor)executor;
}

当有了执行器调用 doSubmit 方法将任务加入到执行器中。

异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:

不复用线程,也就是说为每个任务新起一个线程。但是可以通过 concurrencyLimit 属性来控制并发线程数量,但是默认情况下不做限制( concurrencyLimit 取值为-1)。

因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!

总结

本文主要以看源码的方式来了解异步注解 @Async 是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spring源码之Async注解
实现了 BeanFactoryAware 接口,初始化 AsyncAnnotationBeanPostProcessor 时会调用内部的**setBeanFactory() **方法设置切面
全栈程序员站长
2022/09/18
4930
Spring源码之Async注解
Spring异步核心@Async注解的前世今生
可以很明显的发现,它使用的是线程池SimpleAsyncTaskExecutor,这也是Spring默认给我们提供的线程池(其实它不是一个真正的线程池,后面会有讲述)。下面原理部分讲解后,你就能知道怎么让它使用我们自定义的线程池了。
大忽悠爱学习
2022/09/29
1.2K0
Spring异步核心@Async注解的前世今生
异步编程 - 09 Spring框架中的异步执行_@Async注解异步执行原理&源码解析
在Spring中调用线程将在调用含有@Async注释的方法时立即返回,Spring是如何做到的呢?其实是其对标注@Async注解的类做了代理,比如下面的类Async-AnnotationExample。
小小工匠
2023/09/09
5060
【小家Spring】Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)
在开发过程中,我们会遇到很多使用线程池的业务场景,例如异步短信通知、异步记录操作日志。大多数使用线程池的场景,就是会将一些可以进行异步操作的业务放在线程池中去完成。
YourBatman
2019/09/03
7.2K1
【小家Spring】Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)
Spring中异步注解@Async的使用、原理及使用时可能导致的问题
很多同学碰到了下面这个问题,添加了Spring提供的一个异步注解@Async循环依赖无法被解决了,下面是一些读者的留言跟群里同学碰到的问题:
程序员DMZ
2020/07/24
1.7K0
Spring中异步注解@Async的使用、原理及使用时可能导致的问题
你知道 @Async 是怎么让方法异步执行的吗?
@Async 是通过注解标记来开启方法的异步执行的;对于注解的底层实现,除了 java 原生提供那种依赖编译期植入的之外,其他的基本都差不多,即运行时通过反射等方式拦截到打了注解的类或者方法,然后执行时进行横切拦截;另外这里还有一个点就是方法异步执行,所以对于 @Async 的剖析,就一定绕不开两个基本的知识点,就是代理和线程池。 在了解到这些之后,我们来拆解下 @Async 的基本原理。
科技新语
2022/12/06
9380
你知道 @Async 是怎么让方法异步执行的吗?
不仅会用@Async,我把源码也梳理了一遍(下)
终于到了源码分析的环节了,在这之前我已经写过了两篇文章专门分析这个@Async了,还没看过的同学先去看下哈。
java思维导图
2019/09/29
1.1K0
不仅会用@Async,我把源码也梳理了一遍(下)
Spring异步编程
在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其实时返回结果,甚至不关心执行成功与否,比如:
叔牙
2021/04/26
1.9K0
Spring的AOP应用--基于注解的多线程
Spring 对多线程的支持也是基于AOP机制的,开启注解为@EnableAsync,该注解会import一个类:AsyncConfigurationSelector,该类作为Spring用于引入外部配置,其实现如下,默认情况下,引入了配置类:ProxyAsyncConfiguration
默 语
2024/11/22
1210
Spring 异步实现原理与实战分享
最近因为全链路压测项目需要对用户自定义线程池 Bean 进行适配工作,我们知道全链路压测的核心思想是对流量压测进行标记,因此我们需要给压测的流量请求进行打标,并在链路中进行传递,那么问题来了,如果项目中使用了多线程处理业务,就会造成父子线程间无法传递压测打标数据,不过可以利用阿里开源的 ttl 解决这个问题。
张乘辉
2020/06/19
8030
不看绝对后悔的@Async深度解析【不仅仅是源码那么简单】
在整理老的业务逻辑代码时候发现好多接口实现上面都标记了 @Async注解。我本身对这个注解使用的比较少,异步逻辑我都习惯自定义ThreadPoolExecutor工具类。正好借着这次梳理代码结构,来看看 @Async这个注解到底在玩什么?
柏炎
2022/08/23
1.8K0
不看绝对后悔的@Async深度解析【不仅仅是源码那么简单】
关于Spring中的@Async注解以及为什么不建议使用 - Java技术债务
Async 注解是 Java 8 中的一个注解,用于标识一个方法是异步执行的。当一个方法被标记为 Async 时,该方法将在一个新的线程中执行,并且可以立即返回一个 CompletableFuture 对象。使用 CompletableFuture 可以更轻松地管理异步计算的结果。下面是一个使用 Async 注解的示例代码:
Java技术债务
2024/06/21
3020
聊聊spring的async注解
这里讨论一下getAsyncExecutor这里定义null的情况。 spring-context-4.3.9.RELEASE-sources.jar!/org/springframework/scheduling/annotation/AbstractAsyncConfiguration.java
code4it
2018/09/17
1.8K0
SpringBoot线程池开发最佳实践!
即ThreadPoolTaskExecutor,但SpringBoot不同版本有区别。
JavaEdge
2025/06/01
1650
SpringBoot线程池开发最佳实践!
读spring @Async的源码让我收获了什么?
对于从事后端开发的同学来说,为了提升系统性能异步是必须要使用的技术之一。通常我们可以通过:线程、线程池、定时任务 和 回调等方法来实现异步,其中用得最多的可能是线程和线程池。
苏三说技术
2020/11/09
5350
读spring @Async的源码让我收获了什么?
springboot event线程池总结
现在公司都是使用的spring全家桶,所以技术面都会在spring boot,这方面我也是新手了,学习与总结并进,新旧知识连贯
码农戏码
2021/03/23
3.5K0
线上问题-关于@Async
线程数统计:32278 个SimpleAsyncTaskExecutor,开了好多线程啊! 应该是线程太多
温安适
2022/01/10
5430
深入理解Spring系列之十五:@Async实现原理
对于异步方法调用,从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法。调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行。
JavaQ
2019/05/15
20.3K0
深入理解Spring系列之十五:@Async实现原理
阿里面试:说说@Async实现原理?
@Async 是 Spring 3.0 提供的一个注解,用于标识某类(下的公共方法)或某方法会执行异步调用。
磊哥
2024/07/05
1450
Spring中@Import的各种用法以及ImportAware接口
@Import注解提供了和XML中<import/>元素等价的功能,实现导入的一个或多个配置类。@Import即可以在类上使用,也可以作为元注解使用。
Coder小黑
2019/12/18
1.5K0
推荐阅读
相关推荐
Spring源码之Async注解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档