Advertisement

java线程池是同步还是异步的_Spring线程池(同步、异步)

阅读量:

一、spring异步线程池类图

be3b045f94b88e47b8b9366ae3001abc.png
b2877bd210d507dea4d4d1652bbbc18b.png
97cca50a52a1955a02cb74dd15ab3c6f.png

二、简单介绍

TaskExecutor:其本质属性是Spring异步线程池的接口类,属于java.util.concurrent.Executor。

本系统目前全面覆盖了官方机构所确认的完整7个核心任务执行器。该框架声称不论是在常规业务流程还是新兴技术应用中都能充分满足这些任务执行器的需求。

名字特点

SimpleAsyncTaskExecutor

在每次请求时均开启新线程,在无最大限制的情况下,默认不会重用相同的端口资源。该系统不具备复用端口的能力,在每个请求阶段都会独立创建新的进程

SyncTaskExecutor

不属于异步执行的线程。可以通过使用concurrent future来实现同步操作;然而该类不具备作为线程池的功能,在原有线程执行时仍保持不变状态;此外该类未提供任何异步执行接口或方法供调用者使用

ConcurrentTaskExecutor

Executor的适配工具类强烈不建议采用。当ThreadPoolTaskExecutor无法满足需求时,在以下情况下才考虑采用该方案。

SimpleThreadPoolTaskExecutor

监控Spring生命周期回调,并支持与Quartz组件的兼容性。该功能属于Quartz提供的SimpleThreadPool类族。在实际应用中,该线程池既可以用于Quart化的任务调度管理策略实现需求,并且能够灵活处理非Quart化的作业处理逻辑安排情况

ThreadPoolTaskExecutor

最常用的方案或工具;需满足JDK版本要求不小于5.0;在程序而非XML文件中进行相关配置设置;其本质上是对java.util.concurrent.ThreadPoolExecutor组件进行封装操作。

TimerTaskExecutor

WorkManagerTaskExecutor

三、Spring的同步执行器

SyncTaskExecutor:虽然支持同步任务执行功能(SynchronousTaskExecutor),但它并不被视为一个线程池(Thread Pool),因为其操作仍在原线程中进行(perform)。该类未实现异步操作(asynchronous operation),因此仅作为一个同步操作存在(exist)。因此这种结构通常不被采用(adopt)

2、可以用ThreadPoolTaskExecutor结合FutureTask做到同步。

3、SyncTaskExecutor与ThreadPoolTaskExecutor的主要区别在于:前者的名称为同步执行器(按顺序执行任务),而后者的名称为多线程处理机制(非同步任务处理)。

四、Spring的异步执行器

每当它被请求执行任务时(即每次客户提交给它的任务被触发后),该异步执行器会自动启动一个新的线程来处理该任务)。此外,在这种设计下(即通过提供一个可配置的参数),开发人员能够精确地控制系统的并发度以适应不同的负载需求(即根据实际应用的需求来调整多线程的数量)。该机制不仅能够有效提升系统的响应速度(即加快处理速度),还能在一定程度上减少资源占用(即节省系统资源)。在系统初始状态下(默认情况下),这个可配置参数会被设置为-1以避免任何潜在的性能优化相关的开销或额外开销问题(即初始状态下的参数设置确保了最低水平的功能性而不会带来额外负担)。

SimpleAsyncTaskExecutor

主要实现:

该系统实现了对ConcurrentThrottlingSupport和ConcurrentThrottlingInterceptor的支持。

该类实现了对JDK FutureTask的封装功能。其核心功能是在实现运行方法时能够捕获并返回线程执行的结果。

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implementsAsyncListenableTaskExecutor, Serializable {//限流主要实现

private final SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter concurrencyThrottle = newSimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter();privateThreadFactory threadFactory;//设置最大的线程数量

public void setConcurrentThreshold(int concurrentThreshold) { this.concurrencyThrottle.setConcurrentLimit(concurrentThreshold); }

}//是否开启了限流 限流数量大于0?

public final boolean isThrottleActive() { isNotConcurrent(); }

//1. 若开启限流功能,则必须执行相应的处理步骤
否则不得启动限流机制//2. 在启动任务之前先进行资源需求判断,
然后将计数器数值加一//3. 当启用限流机制时,则需对即将运行的任务
进行打包管理,
并在此时调用该Runnable对象的finally方法
随后再减去一

public void execute(Runnable task, longstartTimeout) {

使用null校验确保任务不为null:Assert.notNull(task, "Runnable is non-null");
当isThrottleActive状态为真且启动超时计数器的时间设置大于零时:
执行该线程计数器在访问前被超时保护:this.concurrencyThrottle.beforeAccess();
启动了一个新的异步任务执行器以进行速率限制:this.doExecute(newSimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task));

}else{this.doExecute(task);

}

}//异步提交有返回值

public Future>submit(Runnable task) {

Create a new instance of FutureTask with the given task and a null object.
Execute the task within the created future.
Return the result from the future task.

}public Future submit(Callabletask) {

Instantiation of FutureTask is accomplished by newFutureTask(task); this.execute(future, 9223364129311320767L); return futures;

}public ListenableFuture>submitListenable(Runnable task) {

创建一个新的ListenableFutureTask实例,并将其任务设置为指定的任务对象null;然后调用该实例的execute方法,并传递所需的大整数值参数以及返回结果;最后返回执行后的未来实例。

}public ListenableFuture submitListenable(Callabletask) {

通过调用 newListableFutureTask方法创建并赋值给变量future的名为ListenableFutureTask的新对象

}//拥有工厂?没有的话调用父类可以设置各种参数的创建线程

protected voiddoExecute(Runnable task) {

Thread thread = if (this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task));

thread.start();

提供一个便于配置相关线程属性的方法,在XML中设置 thread parameters时可以使用 CustomizableThreadCreator

publicThread createThread(Runnable runnable) {

Thread thread= newThread(getThreadGroup(), runnable, nextThreadName());

thread.setPriority(getThreadPriority());

thread.setDaemon(isDaemon());returnthread;

}

}

实现类(ListenableFutureTask)具有返回值,并且能够被监听。通过注册相应的监听者,并将其放置在一个独立的类中进行管理,则能够有效地分派工作给ListenableFutureCallbackRegistry。

此实现类继承自 FutureTask 类,并实现了 ListenableFuture 接口。
私有字段 'callbacks' 用于存储注册回调 registry 实例。
提供 callable 参数构造该实现类实例时会调用父类构造函数。

public ListenToFutureTask(Runnable runnable, T result) {
// 调用父类中的方法完成传入的任务
super(runnable, result);
}

public void addCallback(DelayableFutureEventHandler callback) {
this.delayedCallbacks.addDeferredCallback(callback);
}

public void registerSuccessAndFailureCallbacks(SuccessCallback successCallback, FailureCallback failureCallback) {
this.callbacks.registerSuccessCallback(successCallback);
this.callbacks.registerFailureCallback(failureCallback);
}

}//FutureTask执行完成后的回调,调用监听接口的实现类的方法

protected final voiddone() {

Object cause;try{

Object ex= this.get();//回调实现类的方法

this.callbacks.success(ex);return;

}catch(InterruptedException var3) {

Thread.currentThread().interrupt();return;

}catch(ExecutionException var4) {

cause=var4.getCause();if(cause == null) {

cause=var4;

}

}catch(Throwable var5) {

cause=var5;

}this.callbacks.failure((Throwable)cause);

}

}

五、使用ThreadPoolTaskExecutor

乍一看,跟ThreadPoolExecutor很像。

ThreadPoolTaskExecutor源自于spring核心组件库中, 而ThreadPoolExecutor则属于Java JDK自带的JUC组件. 通过包装实现功能后得到的是一个专门针对任务执行的线程池.

ThreadPoolExecutor和ThreadPoolTaskExecutor的类结构

b2aac9340de45af4ddd9ae4c6f4dbde1.png
6d4cbf1049c6e4067c51da5eeb1913f1.png

ThreadPoolTaskExecutor 源码及配置参数

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implementsSchedulingTaskExecutor {private final Object poolSizeMonitor = newObject();private int corePoolSize = 1;private int maxPoolSize = 2147483647;private int keepAliveSeconds = 60;private boolean allowCoreThreadTimeOut = false;private int queueCapacity = 2147483647;private ThreadPoolExecutor threadPoolExecutor; //这里就用到了ThreadPoolExecutor

该类是通过调用 threadPoolExecutor 来完成初始化任务的程序模块。 BlockingQueue 作为一个阻塞队列,在当前阶段我们暂不关注其具体细节。然而其背后的实现完全依赖于 threadPoolExecutor 进行操作,请确保 threadPoolExecutor 的相关配置参数设置正确以保证系统的正常运行。

public ThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,)

TimeUnit unit,

BlockingQueueworkQueue,

ThreadFactory threadFactory,

当核心池大小小于零或者最大池大小小于等于零时,请确保最大池大小不小于核心池大小的同时也要保证最大池存活时间非负数。若工作队列为空或者线程工厂为空则视为无效配置请立即抛出NullPointerException错误信息。随后请将核心池大小最大池大小工作队列以及存活时间参数赋值给相应的类字段并完成线程工厂的初始化设置以确保系统的正常运行状态

}

配置参数:

corePoolSize:线程池维护线程的最小数量。

maximumPoolSize:线程池维护线程的最大数量。

keepAliveTime:空闲线程的存活时间。

TimeUnit unit:时间单位,现有纳秒,微秒,毫秒,秒枚举值。

BlockingQueue workQueue:持有等待执行的任务队列。

RejectedExecutionHandler handler:负责阻止任务执行;存在两种不同的场景会导致这种情况发生。

在execute方法中,在addIfUnderMaximumPoolSize(command)返回false时(即线程池已达到饱和),则触发此逻辑。

当调用execute方法时,在其中检查runState是否为RUNNING状态且池大小不为零的情况下(如果系统已处于关闭状态),则会将该命令提交给队列任务处理服务以处理;其中可能触发拒绝此命令的行为。

ThreadPoolExecutor池子的处理流程如下:

当池子大小小于corePoolSize就新建线程,并处理请求。

当容器规模达到设定值时(即corePoolSize),所有新请求将被分配到工作队列中,并由空闲线程从该队列中依次获取并执行相应的任务。

当工作队列处于已满负荷状态时(即无法继续接收新任务),将启动新的线程加入队列中,并依次处理相应的请求;若此时队列的最大容量达到最大池子大小,则采用RejectedExecutionHandler进行拒绝处理。

此外,在池子中的线程数量超过corePoolSize时(即核心队列容量),那些多余的线程会在keepAliveTime时间内保持空闲状态;在无法处理任何请求的情况下将被终止。

其主要会优先创建CorePoolSize线程。在不断增加线程数量时,这些新线程会被暂时存入队列中。当CorePoolSize和队列都已满载时,系统会自动启动新的线程资源池。在达到最大池大小限MaxPoolSize时,将会触发指定的异常错误org.springframework.core.task.TaskRejectedException。

另外,在MaxPoolSize的配置中如果其值超过了系统支持的线程数量,则会导致java.lang.OutOfMemoryError: unable to create new native thread异常发生

Reject策略预定义有四种:

ThreadPoolExecutor中的AbortPolicy策略被定义为预设方案;当处理程序遭到拒绝时,该行为将触发运行时异常RejectedExecutionException.

ThreadPoolExecutor中的CallerRunsPolicy策略下,调用者所在的线程将执行相应的任务;一旦 ThreadPoolExecutor 被关闭,则该任务将被丢弃。

ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃。

ThreadPoolExecutor.DiscardOldestPolicy 策略通过移除工作队列头部未关闭的任务来维持负载均衡,并在任务再次失败时重复此操作。

线程池创建示例

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置最大线程池规模为指定值,该参数表示在同一个时间段内最多可启动的独立子任务数量

executor的核心作业池规模被配置为2;当缓冲区满载时(即缓冲队列已满),系统会动态申请更多作业以处理超出当前核心资源的情况

Executor的最大线程池大小被指定为4,并在启动任务之前用于存储临时作业请求。

executor.setQueueCapacity(1000); // 配置存活期为 1,00秒。一旦核心线程之外的线程存活期超出该值,则会自动终止

executor.setKeepAliveSeconds(1);//设置线程名称前缀

executor.setThreadNamePrefix("xxxx-");//设置拒绝策略

设置拒绝执行处理器为新创建的一个基于停止策略的线程池执行器

executor.initialize();

六、关于Spring的Async

自Spring框架版本3.0.0引入了@Async注解,并可直接用于标记方法以便实现异步调用。当调用者在执行时立即返回后,其实际操作将被提交到Spring TaskExecutor的任务队列中,并由指定线程池中的线程处理完成。

在项目应用中,@Async调用线程池,推荐使用自定义线程池的模式。

前提是要添加@EnableAsync注解开启异步调用。

@Async应用默认线程池

Spring应用默认采用线程池运行,在@Async注解的应用中,默认情况下未设置线程池名称。通过查阅源码发现,默认情况下@Async使用的是SimpleAsyncTaskExecutor类。

无返回值调用

基于@Async无返回值的方法,在类中直接调用该方法(建议在其上添加注解)。若需要捕获并抛出异常,则手动创建一个异常并将该异常抛出。

/*** 带参数的异步调用 异步方法可以传入参数

  • 对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉

@params/@Asyncpublic voidasyncInvokeWithException(String s) {

log.info("asyncInvokeWithParameter, parameter={}", s); throw new IllegalArgumentException(s);

}

有返回值Future调用

/*** 异常调用返回Future

对于返回值为Future的情况而言,在这种情况下AsyncUncaughtExceptionHandler不会进行处理。因此,在这种情况下必须在方法体内主动捕获可能发生的异常情况以确保程序能够正常运行。

  • 或者在调用方在调用Futrue.get时捕获异常进行处理

*@parami

*@return

*/@Asyncpublic Future asyncInvokeReturnFuture(inti) {

log.info("asyncInvokeReturnFuture, parementer={}", i);

Futurefuture;try{

Thread.sleep(1000 * 1);

future被赋值为一个AsyncResult对象,
该对象包含字符串"success:"加上i;
然后抛出一个新的IllegalArgumentException,
其信息为字符串"a";

}catch(InterruptedException e) {

future= new AsyncResult("error");

}catch(IllegalArgumentException e){

future= new AsyncResult("error-IllegalArgumentException");

}returnfuture;

}

有返回值CompletableFuture调用

CompletableFuture不采用@Async注解能够实现利用系统提供的线程池资源完成业务处理所需的任务

JDK 5新增了Future接口这一新特性,其主要用于描述异步计算的结果。尽管Future及其相关使用方法为实现异步执行任务提供了便利,但对结果进行获取却显得颇为不便,仅能采用阻塞或轮询等方式以获得任务的结果。其中,阻塞的方式显然与我们期望的异步编程理念背道而驰,相比之下,轮询的方法不仅造成了不必要的CPU资源消耗,还无法及时获取计算结果

CompletionStage代表异步计算过程中的某个阶段,在执行完后可能会引发另一个阶段的启动

一个阶段的计算执行可能是Function、Consumer或Runnable。例如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())

一个阶段可能在其后续阶段完成时触发执行或者由多个并行的任务共同触发:支持Java 8及以后版本的CompletableFuture则提供了极其强大的Future扩展功能能够显著简化异步编程中的复杂性还具备函数式编程的支持能力允许通过回调机制处理计算结果并提供转换与组合CompletableFuture的方法

它可能表示为一个完整地实现了一个Future对象或者是一个阶段(CompletionStage)。该阶段提供当计算完成后触发某些函数或执行某些动作的功能。

它实现了Future和CompletionStage接口

/*** 数据查询线程池*/

静态私有常量POOL_THREAD_POOL被赋值为新ThreadPoolExecutor实例,
其参数设置为:
暖 periods为5秒,
timeout period设定为2分钟,
garbage collection threshold采用秒作为单位

timeUnit.milliseconds,创建了一个容量为1024的新LinkedBlockingQueue对象,并配置了一个自定义的线程池命名格式"selectThreadPoolExecutor-%d"后完成构建;该tradeMapper.countTradeLog(tradeSearchBean)方法用于获取交易对数目,返回值为整数值;该操作旨在统计所有交易记录总数

CompletableFuture countFuture =CompletableFuture

call tradeMapper.countTradeLog(tradeSearchBean)到SELECT_POOL_EXECUTOR中执行;//同步阻塞行为

CompletableFuture.allOf(countFuture).join();//获取结果

int count = countFuture.get();

默认线程池的弊端

遵循《阿里巴巴Java开发规范》的相关规定,在实际应用中应避免以下两种线程池配置方式:一是直接调用Executors类提供的接口创建线程池;二是采用系统默认配置下的自动启动生成式线程池配置模式。建议采用并行计算框架中的ThreadPoolExecutor实现。这种设计有助于帮助开发人员更好地理解并遵循线程池的最佳实践,并有效降低资源耗尽的可能性。深入探讨其各个成员方法的具体缺陷及其适用场景将是本节的重点内容。

newFixedThreadPool和newSingleThreadExecutor:它们的主要缺点是由于大量未处理的请求队列而导致占用过多的内存资源,并可能导致OutOfMemoryError(OOM)。

新缓存与定时线程池:核心问题是将最大线程池规模设为Integer.MAX_VALUE可能导致创建大量线程,并可能引发OOM问题

采用默认异步配置的@Async组件基于SimpleAsyncTaskExecutor实现。该组件在默认模式下为每个任务分配独立的一个线程。如果系统中持续地为新任务分配线程这将导致系统内存使用量急剧上升从而引发OutOfMemoryError异常。为了控制资源消耗和避免过载SimpleAsyncTaskExecutor通过配置项concurrencyLimit实现了资源管理。当设置的concurrencyLimit值大于等于零时启用限流机制而当concurrencyLimit被设置为-1时则表示禁用了限流功能此时该组件会在资源受限的情况下自动调整当前的任务负载以维持系统的稳定性基于以上描述我们可以得出结论即基于DefaultConfigurationSimpleAsyncTaskExecutor并非严格意义上的多核并行计算框架而是采用了单体式的工作模式以保证最大的可用性与灵活性

@Async应用自定义线程池

该系统提供了一种定制化的线程池管理方案。该方案可对系统中的细粒度线程池进行精细控制,并提供灵活调节线程池规模与结构设置的能力。该方案还具备完整的异常处理机制,在替换默认线程 pool 时,默认生成的自定义 line pool 类型必须唯一(不允许使用多个继承自 AsyncConfigureManager 的类)。支持以下几种具体的配置模式:

重新实现接口AsyncConfigurer

继承AsyncConfigurerSupport

配置由自定义的TaskExecutor替代内置的任务执行器

在Spring源码中查找@Async默认调用规则时,优先查找实现了AsyncConfigurer接口的具体类别,并发现该类名为AsyncConfigurerSupport。然而,默认情况下,默认配置未指定线程池和异步处理机制因此,在继承或重新实现此接口时,必须明确指定一个线程池配置。此外,在重写的getAsyncExecutor()方法中也需在重写的getAsyncExecutor()方法中明确定义 executor 实例或相关逻辑。

实现接口AsyncConfigurer

@Configuration
public async class AsyncConfiguration implements AsyncConfigurer {

@Bean("kingAsyncExecutor")publicThreadPoolTaskExecutor executor() {

通过新创建一个线程池任务执行器实例化类对象,并遵循其默认配置初始化。
指定核心线程池大小为10个。

executor.setCorePoolSize(corePoolSize);int maxPoolSize = 50;

executor.setMaxPoolSize(maxPoolSize);int queueCapacity = 10;

executor.setQueueCapacity(queueCapacity);

executor.setRejectionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

String threadNamePrefix= "kingDeeAsyncExecutor-";

executor.setThreadNamePrefix(threadNamePrefix);

设置执行器等待任务在关闭时完成标志为true;

使用 RequestContext Thread Factory 创建了一个默认的 threadFactory 实例;

执行器指定用于执行任务的线程工厂为threadFactory;
设置等待终止秒数为5秒;

executor.setAwaitTerminationSeconds(awaitTerminationSeconds);

executor.initialize();returnexecutor;

}

@OverridepublicExecutor getAsyncExecutor() {returnexecutor();

}

@OverridepublicAsynchronousUncaughtExceptionHandler getAsynchronousUncaughtExceptionHandler() {return (ex, method, params) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);

}

}

继承AsyncConfigurerSupport

@Configuration

@EnableAsynchronousClass SpringAsyncConfigurer extends AsyncConfigureSupport {

@BeanpublicThreadPoolTaskExecutor asyncExecutor() {

ThreadPoolTaskExecutor threadPool= newThreadPoolTaskExecutor();

threadPool.setCorePoolSize(3);

threadPool.setMaxPoolSize(3);

threadPool.setWaitForTasksToCompleteOnShutdown(true);

threadPool.setAwaitTerminationSeconds(60 * 15);

threadPool.initialize();returnthreadPool;

}

@OverridepublicExecutor getAsyncExecutor() {returnasyncExecutor;

}

@Override
public static AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (params, method) -> ErrorLogger.getInstance().log(String.format("执行异步任务'%s'", method), ex);
}

}

}

配置自定义的TaskExecutor

因为AsyncConfigurer在源码中的默认线程池被设置为空状态,在运行时系统会自动调用beanFactory.getBean(TaskExecutor.class)来获取相应的资源。当Spring框架在未配置具体参数的情况下,默认会先检查是否存在名为TaskExecutor的具体Bean配置文件。如果仍然找不到,则会同时还会尝试获取基于TaskExecutor.class类本身的默认行为配置以完成任务执行功能。因此,在项目开发过程中为了实现自定义化的任务调度机制建议可以在项目中手动配置一个名称为TaskExecutor的具体Bean,并指定其中包含足够的资源以支持任务执行需求;或者也可以选择直接声明一个基于TaskExecutor类的任务执行 pools,默认情况下就会利用该类提供的资源实现任务处理功能。

@EnableAsync

@Configurationpublic classTaskPoolConfig {

@Bean(name=AsyncExecutionAspectSupport.DEFAULTTASK_EXECUTOR_BEAN_NAME)publicexecutor taskexecutor()) {}

ThreadPoolTaskExecutor executor = 实现为新线程池任务执行器();//核心队列的最大处理线程数

executor.setCorePoolSize(10);//最大线程数

executor.setMaxPoolSize(20);//队列容量

executor.setQueueCapacity(200);//活跃时间

executor.setKeepAliveSeconds(60);//线程名字前缀

executor.setThreadNamePrefix("taskExecutor-");

设置拒绝对执行的处理机制为新线程池执行策略;当前的执行器将被返回。

}

@Bean(name= "new_task")publicExecutor taskExecutor() {

executor = new ThreadPoolTaskExecutor();//设置核心线程池的规模参数为

executor.setCorePoolSize(10);//最大线程数

executor.setMaxPoolSize(20);//队列容量

executor.setQueueCapacity(200);//活跃时间

executor.setKeepAliveSeconds(60);//线程名字前缀

executor.setThreadNamePrefix("taskExecutor-");

Assign the RejectedExecutionHandler to use the CallerRunsPolicy; return the executor instance.

}

}

该异步执行的任务标记(可选), 可以配置为系统默认或自定义的线程池(替代于默认线程池)。在项目开发中, 可设置多个任务队列, 在执行异步操作时, 指定所需使用的队列名称, 例如 @Async("new_task")。

注意事项:

@Async异步任务需要添加事务支持步骤

@Async与@Transaction注解分别实现各自的功能模块,并在@Async注解的方法内部分别进行调用以确保满足特定的需求。

在调用拥有@Async注解的方法的父方法需要添加@Transaction注解。

参考:

全部评论 (0)

还没有任何评论哟~