前言: 最近因为全链路压测项目需要对用户自定义线程池Bean进行适配工作,我们知道全链路压测的核心思想是对流量压测进行标记,因此我们需要给压测的流量请求进行打标,并在链路中进行传递,那么问题来了,如果项目中使用了多线程处理业务,就会造成父子线程间无法传递压测打标数据,不过可以利用阿里开源的ttl解决这个问题。 全链路压测项目的宗旨就是不让用户感知这个项目的存在,因此我们不可能让用户去对其线程池进行改造的,我们需要主动去适配用户自定义的线程池。 在适配过程的过程中无非就是将线程池替换成ttl去解决,可通过代理或者替换Bean的方式实现,这方面不是本文的内容,本文主要是深入Spring异步实现的原理,让大家对Spring异步编程不再陌生! 正文:运行原理分析 过一遍源码分析,才能知道其中的一些细节原理,这也是不可避免的过程,虽然我也不想在文章中贴过多的源码,但如果不从源码中得出原因,很可能你会知其然不知其所以然。下面就尽量跟着源码走一遍它的运行机制是怎么样的,我把我自己的理解也会尽量详细地描述出来,在这里我会将其关联的源码贴出来分析,这些源码都有其相互关联性,可能你看到后面还会回来再看一遍。注册通知器过程 开启Spring异步编程之需要一个注解即可:EnableAsync Springboot中有非常多Enable的注解,其目的是显式开启某一个功能特性,这也是一个非常典型的编程模型。 EnableAsync注解注入了一个AsyncConfigurationSelector类,这个类目的就是为了注入ProxyAsyncConfiguration自动配置类,它的父类AbstractAsyncConfiguration做了件事情: org。springframework。scheduling。annotation。AbstractAsyncConfigurationsetConfigurers 我们可以实现AsyncConfigurer接口的方式去自定义一个线程池Bean,这个后面会会讲到,源码所示,这里目的是为了这个bean,并将其定义的线程池对象和异常处理对象保存到AsyncConfiguration中,用于创建AsyncAnnotationBeanPostProcessor。 这两个对象后面源码分析会再次遇上。 而这个配置类就是为了注册一个名为AsyncAnnotationBeanPostProcessor的bean,如其名,它是一个BeanPostProcessor处理器,它的类继承结构如下所示: 从类继承结构可以看出,AsyncAnnotationBeanPostProcessor实现了BeanPostProcessor和BeanFactoryAware,因此AsyncAnnotationBeanPostProcessor会在setBeanFactory方法中做了Spring异步编程中最为重要的一步,创建一个针对Async注解的通知器AsyncAnnotationAdvisor(叫做切面貌似也可以),这个通知器主要用于拦截被Async注解的方法。同时,bean实例初始化过程会被AsyncAnnotationBeanPostProcessor拦截处理,处理过程会将符合条件的bean注册AsyncAnnotationAdvisor: org。springframework。aop。framework。AbstractAdvisingBeanPostProcessorpostProcessAfterInitialization 创建通知器过程 接下来我们就分析AsyncAnnotationAdvisor是如何创建的。 AsyncAnnotationAdvisor实现了PointcutAdvisor接口,因此需要同时实现getPointcut和getAdvice方法,而这两个方法的实际内容有以上红框创建实现。 到这里我们已经知道,Spring的异步实现原理,是利用SpringAOP切面编程实现的,通过BeanPostProcessor拦截处理符合条件的bean,并将切面织入,实现切面增强处理。 SpringAOP编程核心概念:Advice:通知,切面的一种实现,可以完成简单的织入功能。通知定义了增强代码切入到目标代码的时间点,是目标方法执行之前执行,还是执行之后执行等。切入点定义切入的位置,通知定义切入的时间; Pointcut:切点,切入点指切面具体织入的方法; Advisor:切面的另一种实现,能够将通知以更为复杂的方式织入到目标对象中,是将通知包装为更复杂切面的装配器。 因此我们需要创建一个切面和切入点:buildAdvice: buildAdvice方法可知,切面是一个AnnotationAsyncExecutionInterceptor类,该类实现了MethodInterceptor接口,其invoke方法即为拦截处理的核心源码,后面会进行详细分析。buildPointcut: 从AsyncAnnotationAdvisor构造器中可以看出,buildPointcut方法目的就是为了创建Async注解的切入点。通知器拦截处理过程 前面我们已经知道,拦截切面是一个AnnotationAsyncExecutionInterceptor类,我们直接定位到invoke方法一探究竟: org。springframework。aop。interceptor。AsyncExecutionInterceptorinvoke 拦截处理的核心逻辑就是这么简单,也没啥好分析的,无非就是匹配方法指定的线程池,接着构建执行单元Callable,最后调用doSubmit方法执行。如何匹配线程池? 重点在于如何匹配线程池,这也是后面实战分析的重点内容,因此我们需要在这里详细分析匹配线程池的一些策略细节。 org。springframework。aop。interceptor。AsyncExecutionAspectSupportdetermineAsyncExecutor getExecutorQualifier方法目的是获取Async注解上的value值,value值即线程池Bean的名称,如果获取到的targetExecutor不是Spring类型的线程池,则使用TaskExecutorAdapter进行适配,这也是为什么我们直接创建Executor类型的线程池Spring也是支持的原因。 从以上源码逻辑可看出如果我们使用Async注解时value值为空,Spring就会使用defaultExecutor,defaultExecutor是什么时候赋值的呢?上面内容已经有提及,在buildAdvice方法创建AnnotationAsyncExecutionInterceptor时调用了其configure方法,如下: org。springframework。aop。interceptor。AsyncExecutionAspectSupportconfigure 原来当defaultExecutor和exceptionHandler是当初从ProxyAsyncConfiguration中获取用户自定义的AsyncConfigurer实现类而来的,那么如果defaultExecutor不存在怎么办?从源码可看出,defaultExecutor其实是一个SingletonSupplier类型,如果调用get方法不存在,则使用默认值,默认值为:()getDefaultExecutor(this。beanFactory); org。springframework。aop。interceptor。AsyncExecutionAspectSupportgetDefaultExecutor 注意第一个红框的注释,此时Spring寻找默认的线程池Bean为指定Spring的TaskExecutor类型,并非Executor类型,如果Bean容器中没有找到TaskExecutor类型的Bean,则继续寻找默认为以下名称的Bean:publicstaticfinalStringDEFAULTTASKEXECUTORBEANNAMEtaskExecutor; 那么如果都没有找到怎么办呢?在这个方法直接返回null了,AsyncExecutionInterceptor类覆写了这个方法: org。springframework。aop。interceptor。AsyncExecutionInterceptorgetDefaultExecutor 如果没有找到,则直接创建一个SimpleAsyncTaskExecutor类作为Async注解底层使用的线程池。 从匹配线程池源码得知,如果你创建的线程池Bean非TaskExecutor类型并且没有使用实现AsyncConfigurer接口方式创建线程池,就需要主动指定线程池Bean名称,否则Spring会使用默认策略。总结 利用BeanPostProcessor机制在Bean初始化过程中创建一个AsyncAnnotationAdvisor切面,并且符合条件的Bean生成代理对象并将AsyncAnnotationAdvisor切面添加到代理中。 可以看出Spring的很多功能都是围绕着SpringIOC和AOP实现的。Spring默认线程池策略分析 有时候为了方便,我们不自定义创建线程池bean时,Spring默认会为我们提供什么样的线程池呢? 我们先来看下结果: 很奇怪,明明我们都没有在项目中自定义线程池Bean,按照以上源码的分析结果来看,此时Spring选择的是SimpleAsyncTaskExecutor才对,莫非是supergetDefaultExecutor方法找到了线程池Bean? 从以上截图确实是找到了,而且类型还是ThreadPoolTaskExecutor类型的,那可以推断出Spring一定是在某个地方创建了一个ThreadPoolTaskExecutor类型的Bean。 果然,在springbootautoconfigure2。1。3。RELEASE中,会在org。springframework。boot。autoconfigure。task。TaskExecutionAutoConfiguration中自动创建一个默认的ThreadPoolTaskExecutorbean,getDefaultExecutor方法会在容器中找到这个bean,并将其作为默认的Async注解的执行线程池。 这里我为什么要标注版本呢?因为某些低版本的springbootautoconfigure,是没有TaskExecutionAutoConfiguration的,此时Spring就会选择SimpleAsyncTaskExecutor。 org。springframework。boot。autoconfigure。task。TaskExecutionAutoConfiguration 从以上源码可以看出,默认的线程池的参数还可以手动在properties中配置,这意味着不需要主动创建线程池的情况下,也可以通过properties配置文件更改线程池相关参数。创建线程池Bean的几种方式 1、直接创建一个Bean的方式,这貌似是最多人使用的方式,可以创建多个线程池Bean,使用时指定线程池Bean名称:Bean(myTaskExecutor1)publicExecutorgetThreadPoolTaskExecutor1(){finalThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();set。。。returnexecutor;}Bean(myTaskExecutor2)publicExecutorgetThreadPoolTaskExecutor2(){finalThreadPoolExecutorexecutornewThreadPoolExecutor();set。。。returnexecutor;} 2、实现AsyncConfigurer接口方式:ComponentpublicclassAsyncConfigurerTestimplementsAsyncConfigurer{privatestaticfinalLoggerLOGGERLoggerFactory。getLogger(AsyncConfigurerTest。class);OverridepublicExecutorgetAsyncExecutor(){ThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();set。。。returnexecutor;}OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex,method,params){LOGGER。info(Exceptionmessage:{},ex。getMessage(),ex);LOGGER。info(Methodname:{},method。getName());for(Objectparam:params){LOGGER。info(Parametervalue:{},param);}};}} 这种方式可以方便定义异常处理的逻辑,不过从源码分析可以看出,项目中只能存在一个AsyncConfigurer的配置,意味着我们只能通过AsyncConfigurer配置一个自定义的线程池Bean。 3、利用springbootautoconfigure在properties配置线程池参数: 前面讲到了Spring默认线程池策略,这里利用springbootautoconfigure默认创建一个ThreadPoolTaskExecutor,通过properties自定义线程池相关参数。 这个方式的缺点就是类型固定为ThreadPoolTaskExecutor,且只能有一个线程池。总结了很多有关于java面试的资料,希望能够帮助正在学习java的小伙伴。由于资料过多不便发表文章,创作不易,望小伙伴们能够给我一些动力继续创建更好的java类学习资料文章, 请多多支持和关注小作,别忘了点赞评论转发。右上角私信我回复【03】即可领取免费学习资料谢谢啦! 作者:张乘辉 原文出处:https:mp。weixin。qq。comsmNDwZfPK90RoGmJGHCjKrw