Advertisement

java线程池初始化过程_线程池原理及Java线程池使用

阅读量:

线程池的原理及实现

1、线程池简介:

多线程技术主要针对同一时刻在同一处理器单元内有多条指令同时执行的问题。这种技术设计使得系统能够有效利用处理器资源,在有限资源下最大限度地提升系统性能。它不仅减少了空闲时间还显著提升了整体处理效率。

假设某台服务器完成某个任务所需时间为:T₁创建相应线程所花费的时间段;接着,在该线程中进行任务处理的时间段为T₂;最后,在处理完成后将该线段程序从内存中释放出去所需要的时间段则为T₃

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

一个线程池包括以下四个基本组成部分:

ThreadPool:旨在建立和维护一个高效的多核处理框架,并负责管理所有相关的 pool 资源;其中涉及的主要操作包括如创建新 line pool、移除现有 line pool 以及新增相关任务。

池工(PoolWorker):指代的池工,在无作业时保持空闲状态;能够轮流地接受新作业。

任务接口(Task):每个任务必须实现的接口用于工作线程调度任务执行,并规定了任务入口及收尾流程等;

任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术主要关注于优化设置T1、T3时间的操作方法和技术手段。它的目标是显著提升服务器程序的整体性能。该技术通过将T1、T3分别安排在服务器程序启动阶段以及结束阶段(或者选择性地分配到某些空闲时段),从而避免了服务器程序在处理客户请求时因T1、T3操作而导致的额外开销。

该线程池通过优化时间分配策略,在T1和T3时间段内有效地进行了资源调度。同时该系统在提升性能的同时,大大减少了创建新线程的数量。观察以下案例分析:

假设某服务器预计每天将接收约N=5\times 1e4个请求,并且每个请求都需要独立运行在一个新进程中进行处理。在多进程中管理机制下,默认情况下会设置固定数量的子进程(即进程池大小),因此总数量将受限于该配置值。若未采用进程池管理,则会导致开销高达N=5\times 1e4个独立进程中运行的任务(即每秒需处理约五万次)。通常情况下,默认配置值会设置在远低于需求量级。因此,在实际应用中,在不影响当前业务逻辑的前提下(即不需要同时处理这么多的数量),这样的设计能够有效避免了因过度开销而导致的效率损失。

在代码实现中,并未对任务接口进行具体实现;相反地,在将Runnable对象注入到线程池管理器后(即之后), ThreadPool 接管剩余的工作

package mine.util.thread;

import java.util.LinkedList;

import java.util.List;

/**

  • 线程池类,线程管理器:创建线程,执行任务,销毁线程,获取线程基本信息

*/

public final class ThreadPool {

// 线程池中默认线程的个数为5

private static int worker_num = 5;

// 工作线程

private WorkThread[] workThrads;

// 未处理的任务

private static volatile int finished_task = 0;

// 任务队列,作为一个缓冲,List线程不安全

private List taskQueue = new LinkedList();

private static ThreadPool threadPool;

// 创建具有默认线程个数的线程池

private ThreadPool() {

this(5);

}

// 创建线程池,worker_num为线程池中工作线程的个数

private ThreadPool(int worker_num) {

ThreadPool.worker_num = worker_num;

workThrads = new WorkThread[worker_num];

for (int i = 0; i < worker_num; i++) {

workThrads[i] = new WorkThread();

workThrads[i].start();// 开启线程池中的线程

}

}

// 单态模式,获得一个默认线程个数的线程池

public static ThreadPool getThreadPool() {

return getThreadPool(ThreadPool.worker_num);

}

单态模式下配置了一个具有特定工作线数量的ReflectionUtils.getReflectionPools()方法。其中工人数必须大于零,并且该数值直接指示工作线的数量

// worker_num<=0创建默认的工作线程个数

public static ThreadPool getThreadPool(int worker_num1) {

if (worker_num1 <= 0)

worker_num1 = ThreadPool.worker_num;

if (threadPool == null)

threadPool = new ThreadPool(worker_num1);

return threadPool;

}

// 安排处理某个具体的任务,其实在本质上就是将该任务加入队列中等待处理,而其执行时间则由线程池资源管理器进行调度安排

public void execute(Runnable task) {

synchronized (taskQueue) {

taskQueue.add(task);

taskQueue.notify();

}

}

将多个任务批量提交到队列中其本质是将这些任务加入到一个统一的处理队列中具体的执行时机则由线程池管理系统来调度和确定

public void execute(Runnable[] task) {

synchronized (taskQueue) {

for (Runnable t : task)

taskQueue.add(t);

taskQueue.notify();

}

}

批量执行任务,其实只是将多个任务一次性提交给线程池处理,具体执行时间由线程池调度系统来安排

public void execute(List task) {

synchronized (taskQueue) {

for (Runnable t : task)

taskQueue.add(t);

taskQueue.notify();

}

}

删除整个线程池,该方法将确保只有当所有任务均已完成时才会彻底清除所有线程,而如果在此期间有任何任务未完成,则会暂时搁置销毁操作

public void destroy() {

while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// 工作线程停止工作,且置为null

for (int i = 0; i < worker_num; i++) {

workThrads[i].stopWorker();

workThrads[i] = null;

}

threadPool=null;

taskQueue.clear();// 清空任务队列

}

// 返回工作线程的个数

public int getWorkThreadNumber() {

return worker_num;

}

// 返回已输出的任务数量,其中仅记录于任务队列中的每个任务,但需要注意的是,这些记录可能并不代表实际执行完成的情况。

public int getFinishedTasknumber() {

return finished_task;

}

// 返回任务队列的长度,即还没处理的任务个数

public int getWaitTasknumber() {

return taskQueue.size();

}

// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数

@Override

public String toString() {

return "WorkThread number:" + worker_num + " finished task number:"

+ finished_task + " wait task number:" + getWaitTasknumber();

}

/**

  • 内部类,工作线程

*/

private class WorkThread extends Thread {

// 该工作线程是否有效,用于结束该工作线程

private boolean isRunning = true;

/*

  • 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待

*/

@Override

public void run() {

Runnable r = null;

while (isRunning) {// 注意,若线程无效则自然结束run方法,该线程就没用了

synchronized (taskQueue) {

while (isRunning && taskQueue.isEmpty()) {// 队列为空

try {

taskQueue.wait(20);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

if (!taskQueue.isEmpty())

r = taskQueue.remove(0);// 取出任务

}

if (r != null) {

r.run();// 执行任务

}

finished_task++;

r = null;

}

}

// 停止工作,让该线程自然执行完run方法,自然结束

public void stopWorker() {

isRunning = false;

}

}

}

测试代码:

package mine.util.thread;

//测试线程池

public class TestThreadPool {

public static void main(String[] args) {

// 创建3个线程的线程池

ThreadPool t = ThreadPool.getThreadPool(3);

t.execute(new Runnable[] { new Task(), new Task(), new Task() });

t.execute(new Runnable[] { new Task(), new Task(), new Task() });

System.out.println(t);

t.destroy();// 所有线程都执行完成才destory

System.out.println(t);

}

// 任务类

static class Task implements Runnable {

private static volatile int i = 1;

@Override

public void run() {// 执行任务

System.out.println("任务 " + (i++) + " 完成");

}

}

}

运行结果:

WorkThread number:3 finished task number:0 wait task number:6

任务 1 完成

任务 2 完成

任务 3 完成

任务 4 完成

任务 5 完成

任务 6 完成

WorkThread number:3 finished task number:6 wait task number:0

因为缺乏明确的任务接口, 输入的任务可为任意预先定义的功能. 因此, 线程池无法精确识别该任务是否确实已完成. 真正完成状态即为该任务的所有run方法均已成功执行.

2、java类库中提供的线程池简介:

在Java中提供的线程池拥有更高的性能,并希望掌握线程池的基本运作机制。通过查阅文档或资料库中的实现细节就能较为熟悉地理解其工作原理。

1
2

Java 并发编程:线程池的使用

在前面的文章中描述的那样,在处理线程时我们主动地去创建一个线程。这种方法相对来说会显得更加容易实现。然而这却会导致性能上的优化需求难以满足。

当系统的多路并行处理任务数量极大时,并且每个处理任务的时间极为短暂就能完成。然而这样的状态会导致大量频繁创建和销毁线程从而大幅降低系统效率这是因为每次创建和销毁都需要一定的时间开销进而影响整体性能

那么是否存在某种方法可以让线程复用?完成任务后不被销毁,则可以继续执行其他任务。

在Java编程中使用线程池可以实现类似的效果。我们今天将深入探讨Java中的线程池机制,并详细解析其运行原理以及实际应用方法。首先我们将聚焦于ThreadPoolExecutor类及其核心方法的学习,并随后提供具体的使用案例。最后我们将探讨如何科学地配置线程池规模以优化系统性能。

以下是本文的目录大纲:

一.Java中的ThreadPoolExecutor类

二.深入剖析线程池实现原理

三.使用示例

四.如何合理配置线程池的大小

若有不正之处请多多谅解,并欢迎批评指正。

Java中的ThreadPoolExecutor类

java.util.concurrent.ThreadPoolExecutor类扮演着在线程池中关键角色所扮演的类角色,在深入掌握这一技术基础之前,请确保您对该知识点有全面的理解。在本节中我们将为您展示该组件的具体代码实现过程

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

.....

public 线程池执行器(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)

BlockingQueue workQueue);

被声明为public的时间单位中的核心作业池执行者(即核心作业池大小)

BlockingQueue workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue,RejectedExecutionHandler handler);

公共线程池执行器具有指定的核心池大小、最大池大小、存活时间和时间单位参数。

BlockingQueue类型的队列工作队列, ThreadFactory线程工厂, RejectedExecutionHandler拒绝执行处理程序);

...

}

通过查看上述代码可以看出,默认情况下新线程池会使用现有的资源池来完成初始化设置过程;实际上,在深入分析每个构建函数的具体实现时发现,默认情况下新线程池会使用现有的资源池来完成初始化设置过程。

下面解释下一下构造器中各个参数的含义:

corePoolSize:核心池的大小这一参数与后续介绍的线程池实现原理有着密切的关系。在线程池初始化之后,默认状态下,该线程池尚未分配任何子线程用于执行当前的任务。除非调用了prestartAllCoreThreads()或prestartCoreThread()这两个方法,在没有任务到来之前会启动相应的子线程。从这两个方法的名字就能看出其功能定位:即在线路空闲时会预先启动相应的子线程。当子线程数量达到corePoolSize时会将新到达的任务queued到缓存队列中。

maximumPoolSize:该参数用于描述线程池的最大吞吐量;这是一个关键性的配置项;它主要用于描述在线程池运行过程中能够处理的最大并发请求数量。

keepAliveTime:表示在线程空闲期间最多维持多长时间才会退出。在默认情况下只有当核心队列中的线程数目超过corePoolSize时才会发挥作用,在这种情况下它会一直维持到核心队列中的线程数目减少至不超过corePoolSize为止;然而如果调用允许核心队列空闲超限的方法,则即使核心队列中的线程数目已经减少至不超过corePoolSize(即为空的状态)。

类中该参数的时间单位共有7个取值,在TimeUnit类中有7个静态属性。

TimeUnit.DAYS; //天

TimeUnit.HOURS; //小时

TimeUnit.MINUTES; //分钟

TimeUnit.SECONDS; //秒

TimeUnit.MILLISECONDS; //毫秒

TimeUnit.MICROSECONDS; //微妙

TimeUnit.NANOSECONDS; //纳秒

工作队列用于存储需等待处理的任务, 这一参数的选择也至关重要, 将会直接影响到线程池的操作流程. 通常情况下, 这里提供了多种类型的阻塞队列供选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

ArrayBlockingQueue及PriorityBlockingQueue的应用较为有限,在大多数情况下通常采用LinkedBlockingQueue及Synchronous队列进行处理。在队列管理方面存在一定的关联性

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy: 丢弃当前正在执行的任务,并导致RejectedExecutionException异常被抛出。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:通过移除队列前端的任务,并不断重复这一过程来持续处理任务

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

具体参数的配置与线程池的关系将在下一节讲述。

通过查看提供的ThreadPoolExecutor类的代码内容可知,该类基于AbstractExecutorService实现了多线程执行框架的具体功能.具体实现细节如下:

public abstract class AbstractExecutorService implements ExecutorService

protected RunnableFuture newTaskFor(Runnable runnable, T value) { };

protected RunnableFuture newTaskFor(Callable callable) { };

public Future> submit(Runnable task) {};

public Future submit(Runnable task, T result) { };

public Future submit(Callable task) { };

private T doInvokeAny(Collection extends Callable> tasks,

boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

};

public T invokeAny(Collection extends Callable> tasks)

throws InterruptedException, ExecutionException {

};

public T invokeAny(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

};

public List> invokeAll(Collection extends Callable> tasks)

throws InterruptedException {

};

public List> invokeAll(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException {

};

}

该抽象类实现了该接口

我们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {

void shutdown();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException;

Future submit(Callable task);

Future submit(Runnable task, T result);

Future> submit(Runnable task);

List> invokeAll(Collection extends Callable> tasks)

throws InterruptedException;

List> invokeAll(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException;

T invokeAny(Collection extends Callable> tasks)

throws InterruptedException, ExecutionException;

T invokeAny(Collection extends Callable> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

而ExecutorService继而继承了Executor接口,并对其实现进行查看。

public interface Executor {

void execute(Runnable command);

}

到这里的话,大家大概都明白这几个类之间的关系了吧。

Executor作为一个顶级接口,在其内部仅定义了一个名为execute(Runnable)的方法,并规定该方法的返回值为无值类型。其参数属于Runnable类别类型。从字面上理解即该方法的功能就是负责执行传入的任务。

继而ExecutorService接口继承了Executor接口,并提供了以下几种方法:submit、invokeAll、invokeAny以及shutDown等。

AbstractExecutorService遵循了ExecutorService接口,并大致涵盖了其所有声明的方法。

然后ThreadPoolExecutor继承了类AbstractExecutorService。

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()

submit()

shutdown()

shutdownNow()

该方法实际上是Executor类中声明的一个公共接口,在ThreadPoolExecutor类中实现了其具体细节。它是所有线程池实现的核心功能之一,并且可以通过它将任务提交给线程池进行调度执行。

该提交(submit)操作是在ExecutorService类中被定义的一个重要功能。然而与常用的execute()方法存在显著差异的是,在AbstractExecutorService类已经实现了这一操作的具体实现。而在并行任务处理的核心组件中的ThreadPoolExecutor类并未对这一操作进行额外的封装或重写。同时这一操作的主要用途也是为了将任务提交至线程池进行处理。然而与常用的execute()方法存在显著差异的是其特别之处在于能够及时获取到任务执行的结果信息。通过查看该提交操作的具体实现细节可以发现其背后的工作原理实际上是与execute()方法相同的。不过尽管两者功能相似但这种设计使得我们可以更加灵活地管理各个子线程之间的协作关系进而提升系统的整体性能表现。值得注意的是这种机制巧妙地利用了Java面向未来编程模型中的Future接口来传递和管理各子线程执行的状态信息相关的技术细节将在下一章中进行深入探讨

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

例如:getQueue()getPoolSize()getActiveCount()以及getCompletedTaskCount()等方法均用于获取与线程池相关的属性信息。如对API感兴趣的朋友可自行查阅文档。

深入剖析线程池实现原理

在上一节我们对ThreadPoolExecutor进行了阐述,在本节我们将决定采用以下几种方式来详细阐述线程池的具体实现原理:

线程池状态

任务的执行

线程池中的线程初始化

任务缓存队列及排队策略

任务拒绝策略

线程池的关闭

线程池容量的动态调整

1.线程池状态

特别重要的是,在ThreadPoolExecutor类体中定义了一个volatile类型的特殊变量,并在其中设置了几个静态final常量来标识线程池的各种运行状态:

volatile int runState;

static final int RUNNING = 0;

static final int SHUTDOWN = 1;

static final int STOP = 2;

static final int TERMINATED = 3;

runState代表当前运行中的线程池状态,并且它作为一个volatile类型的变量能够确保各线程间的可见性

下面的几个static final变量表示runState可能的几个取值。

当创建线程池后,初始时,线程池处于RUNNING状态;

当执行shutdown()方法时,则该线程池达到了SHUTDOWN状态。此时该线程池不再能够接收新的请求;它将等待所有现有任务完成处理。

一旦调用了shutdownNow()方法,则线程池将处于STOP状态;这意味着线程池将无法接收新的请求,并主动寻求终止当前运行的任务。

当线程池处在SHUTDOWN或STOP状态下,并且所有的作业线程已销毁时,在缓存队列为空并任务完成之后(或者在任务结束之后),将该线程池设置为TERMINATED状态。

2.任务的执行

在掌握任务从线程池提交到完成的整个流程之前

private static final BlockingQueue workQueue;

private final ReentrantLock mainLockVariable = new ReentrantLock(); // 用于管理线程池状态的锁,例如控制线程池规模

//、runState等)的改变都要使用这个锁

private final HashSet workers = new HashSet(); //用来存放工作集

private volatile long keepAliveTime; //线程存货时间

private volatile boolean coreThreadSurvivalTimeout; //该字段指示是否可为核心线程分配存活超时时间.

private _private volatile int CoreThreadPoolCapacity; //该核心队列容量表示当线程池中的线程数目超过此值时,默认将新增请求重定向至任务缓存队列

private volatile int maximumPoolSize; //线程池最大能容忍的线程数

private volatile int poolSize; //线程池中当前的线程数

private volatile RejectedExecutionHandler handler; //任务拒绝策略

private volatile ThreadFactory threadFactory; //负责创建新线程

private int largestPoolSize; // 该变量用于记录线程池中的最大线程数量

private long completedTaskCount; //用来记录已经执行完毕的任务个数

每个变量的作用都已经明确具体说明了,请重点阐述corePoolSize、maximumPoolSize和largestPoolSize这三个关键参数各自的功能。

corePoolSize常被翻译为核心池大小;然而实际上我认为这就是线程池的规模。例如:

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

如果新任务数量以惊人的速度增长超过了现有员工的工作效率时,工厂主管可能会采取应对策略,比如临时招聘4名工人

然后就将任务也分配给这4个临时工人做;

如果谈论着14名工人完成工作的效率较低时,则工厂主管很可能必须采取措施拒绝接收后续的任务并放弃处理先前的工作。

当这14名工人中出现空闲情况时(即有工人闲置),而新增的任务规模增长速度较为缓慢,则工厂主管可能会考虑裁减临时员工4人,并维持原有的10名全职员工岗位以避免增加用工成本。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

换句话说, corePoolSize即是线程池的规模; 而对于maximize Pool Size而言, 在任务负载骤增的情况下, 这是一种应对策略.

不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

largestPoolSize是一个用于记录目的的关键变量,在系统运行过程中追踪历史上的最高线程数量,并与该系统的总承载能力没有关联

下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

在ThreadPoolExecutor类中,默认的核心任务提交机制由execute()方法负责;即使采用submit方法来提交任务,在这些submit操作中最后都会被映射为执行execute()方法;因此我们只需深入理解并掌握execute() 方法的工作原理即可:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

因为使用的是或条件运算符(OR condition),从而首先计算前半部分的值。当线程池中的现有线程数量不低于核心池规模时,在满足特定条件下即可直接跳转至下面相应的if语句块处理。

当线程池中的可用线程数量低于核心池的规模时,则随后继续执行后半段任务,即执行相应的请求处理逻辑。

addIfUnderCorePoolSize(command)

一旦该addIfUnderCorePoolSize方法返回false值时,则会继续执行下方的if语句块;否则该方法将立即终止处理当前流程。

如果该方法执行后返回False值,则应继续进行判断:

if (runState == RUNNING && workQueue.offer(command))

当且仅当当前线程池的状态为RUNNING时,请将该任务加入到任务缓存队列中;否则,请执行以下操作。

addIfUnderMaximumPoolSize(command)

如果尝试addIfUnderMaximumPoolSize方法出现错误,则启动reject函数以完成任务拒绝处理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

当前操作中,请注意以下条件:若当前线程池的状态为\texttt{RUNNING}并已完成将该操作加入到待处理队列的操作,则会继续进行后续判断。

if (runState != RUNNING || poolSize == 0)

这条判断语句旨在避免在线程处理过程中出现资源耗尽的情况。当向任务缓存队列中加入此任务时,并且考虑到可能存在异常导致其他线程紧急关掉进程池,则采取这一判断作为补救措施;在这种情况下应当执行以下操作:

ensureQueuedTaskHandled(command)

启动应急响应流程,在名称标识上确保 所有添加至任务缓存队列的任务都能够被正确地纳入缓存并获得后续的处理。

接下来我们探讨两个核心方法的实施:addIfUnderCorePoolSizeaddIfUnderMaximumPoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask); //创建线程去执行firstTask任务

} finally {

mainLock.unlock();

}

if (t == null)

return false;

t.start();

return true;

}

该方法具体实现了当线程池规模低于核心容量时的操作流程。通过名称可以看出其作用机制是在线程池规模低于核心容量时进行操作。具体实现过程如下:首先获取锁,在在线程池状态变化的情况下,通过if语句判断当前线程数目是否少于核心容量(注:这里的描述稍微调整了顺序)。为了回答疑问,在execute()方法之前并没有加锁进行判断(注:这里做了补充说明但严格遵循了仅修改表达方式的要求)。只有当在线程数目少于核心容量时才会调用该方法(注:此处进行了同义替换)。由于前面未加锁操作,在execute()方法中可能已经检测到poolSize小于corePoolSize的情况(注:进行了逻辑上的同义转换)。因为有可能在其他线程中调用了shutdown或者shutdownNow方法(注:进行了同义转换),所以需要在此处继续判断(注:进行了同义转换)。随后检查线程池的状态是否处于运行状态(注:进行了同义转换),最后执行相应的任务操作(注:进行了同义转换)。

t = addThread(firstTask);

该方案具有重要意义,并且其输入参数指定为提交的任务。该方案返回值类型为Thread。随后,在后续步骤中判断变量t是否为空:当t为空时,则表示线程创建失败(即poolSize大于等于corePoolSize或者runState不等于RUNNING);否则将t启动为新线程

我们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {

Worker w = new Worker(firstTask);

Thread t = threadFactory.newThread(w); //创建一个线程,执行任务

if (t != null) {

w.thread = t; //将创建的线程的引用赋值为w的成员变量

workers.add(w);

int nt = ++poolSize; //当前线程数加1

if (nt > largestPoolSize)

largestPoolSize = nt;

}

return t;

}

在addThread方法执行过程中:
首先通过提交的任务生成一个Worker实例。
随后利用线程工厂threadFactory生产出一个新的线程实例t。
接着将新生成的线程实例t赋值给Worker对象中的thread成员变量。
最后将修改后的Worker实例w整合到workers集合中。

下面我们看一下Worker类的实现:

private final class Worker implements Runnable {

private final ReentrantLock runLock = new ReentrantLock();

private Runnable firstTask;

volatile long completedTasks;

Thread thread;

Worker(Runnable firstTask) {

this.firstTask = firstTask;

}

boolean isActive() {

return runLock.isLocked();

}

void interruptIfIdle() {

final ReentrantLock runLock = this.runLock;

if (runLock.tryLock()) {

try {

if (thread != Thread.currentThread())

thread.interrupt();

} finally {

runLock.unlock();

}

}

}

void interruptNow() {

thread.interrupt();

}

private void runTask(Runnable task) {

final ReentrantLock runLock = this.runLock;

runLock.lock();

try {

if (runState < STOP &&

Thread.interrupted() &&

runState >= STOP)

boolean ran = false;

beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据

//为了完成这个功能需求, 我必须重新实现该方法以及其后的afterExecute方法, 收集相关数据, 例如某个任务的运行时长等信息

try {

task.run();

ran = true;

afterExecute(task, null);

++completedTasks;

} catch (RuntimeException ex) {

if (!ran)

afterExecute(task, ex);

throw ex;

}

} finally {

runLock.unlock();

}

}

public void run() {

try {

Runnable task = firstTask;

firstTask = null;

while (task != null || (task = getTask()) != null) {

runTask(task);

task = null;

}

} finally {

workerDone(this); //当任务队列中没有任务时,进行清理工作

}

}

}

它实际上是实现了Runnable接口,并且其效果与下面这段代码大致相同:

Thread t = new Thread(w);

相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。

因为Worker实现了Runnable接口,所以其核心必然是run()方法

public void run() {

try {

Runnable task = firstTask;

firstTask = null;

while (task != null || (task = getTask()) != null) {

runTask(task);

task = null;

}

} finally {

workerDone(this);

}

}

从run方法的实现可以看出,它首先启动的是由构造器传递过来的任务firstTask.在完成firstTask之后,在while循环中持续调用getTask()获取新任务来执行.那么这些新任务又该如何获取?自然会从任务缓存队列中获取.getTask属于ThreadPoolExecutor类,并不是Worker类中的方法,下面是getTask方法的具体实现

Runnable getTask() {

for (;;) {

try {

int state = runState;

if (state > SHUTDOWN)

return null;

Runnable r;

if (state == SHUTDOWN) // Help drain queue

r = workQueue.poll();

else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //否则如果线程数大于核心池大小或者允许核心池线程设置超时时间,

//则通过poll取任务,若等待一定的时间取不到任务,则返回null

r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

else

r = workQueue.take();

if (r != null)

return r;

if (调用workerCanExit()函数返回true) { 当变量r未被赋值(即为null)时,则检查当前的worker是否具备退出资格. }

if (runState >= SHUTDOWN) // Wake up others

interruptIdleWorkers(); //中断处于空闲状态的worker

return null;

}

// Else retry

} catch (InterruptedException ie) {

// On interruption, re-check runState

}

}

}

在获取任务时首先要检查当前的线程池状态,在该状态下如果当前任务运行状态处于关闭状态(此时为停止或终止),则将返回null值。

如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

如果当前线程池的任务数量超过核心池大小corePoolSize,并且允许核心池中的线程设置空闲存活时间,则用于获取任务;该方法会在指定时间段内进行等待;若无法获取到任务,则返回null。

接着检查变量r的值是否为None。若r的值是None,则会调用workersExitStatus函数来确定该Worker的状态;我们可以查看workersExitStatus函数的具体实现:

private boolean workerCanExit() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

boolean canExit;

//如果runState大于等于STOP,或者任务缓存队列为空了

支持在核心池的线程中设定空闲存活期,并且当核心池中的线程数量多于1个时

try {

canExit = runState >= STOP ||

workQueue.isEmpty() ||

(allowCoreThreadTimeOut &&

poolSize > Math.max(1, corePoolSize));

} finally {

mainLock.unlock();

}

return canExit;

}

换句话说,在以下条件下:当线程池的状态为STOP时、任务队列为空的情况以及核心池线程被允许设置空闲存活时间(前提是线程数量超过1)时,则可以让Worker退出。一旦让Worker退出就会触发interru tIdle Workers() 操作以中 断当前空闲状态下的 Worker,请问interru tIdle Workers() 的具体实现是什么呢?

void interruptIdleWorkers() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) //实际上调用的是worker的interruptIfIdle()方法

w.interruptIfIdle();

} finally {

mainLock.unlock();

}

}

通过分析可知,该方法实际上调用了worker的 interruptIfIdle() 函数体内执行的操作。

void interruptIfIdle() {

final ReentrantLock runLock = this.runLock;

if (runLock.tryLock()) { //需要注意的是,在调用tryLock()之前必须确保当前Worker没有正在进行的任务
runLock.hold(); //一旦成功获得锁后即可对该资源进行操作
}

//如果成功获取了锁,说明当前worker处于空闲状态

try {

if (thread != Thread.currentThread())

thread.interrupt();

} finally {

runLock.unlock();

}

}

}

在这里采用了非常巧妙的设计方案,在设计过程中可能会引入一个专门负责分配新工作的机制。如果我们在设计一个并行计算框架时考虑资源调度问题,则会在系统中引入一个专门负责分配新工作的机制。具体来说,在系统运行过程中每当检测到某个空闲的计算节点,则会从工作负载管理器中取出一个新的未处理的任务并将该工作交由该空闲节点处理。但实际上,在这种情况下,并未采取上述方法。因为这种方式需要为专门负责分配工作的节点节点添加额外的管理逻辑,并且这样做实际上会导致整个系统的复杂度显著提升。因此在这种情况下我们采用了另一种更为简便的方式:每当完成一项计算后系统就会将该结果自动提交到工作负载管理器中以便后续的工作分配。

让我们进一步探讨addIfUnderMaximumPoolSize方法的具体实现原理及其与addIfUnderCorePoolSize方法之间的异同点。从功能设计的角度来看,在大多数情况下两者都采用了相似的操作流程以确保系统的稳定运行;然而,在具体的触发条件设定上存在显著差异:当线程池中的现有线程数量达到核心池规模上限且新增任务入队操作出现失败时,则会自动触发该机制进行任务重试处理。

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < maximumPoolSize && runState == RUNNING)

t = addThread(firstTask);

} finally {

mainLock.unlock();

}

if (t == null)

return false;

t.start();

return true;

}

我发现它与addIfUnderCorePoolSize方法的实现非常相似,仅仅是因为if语句中的判断条件不同

到这里为止, 大部分人应该已经对任务从提交到执行的过程有了大致的认识和了解

1)首先,要清楚corePoolSize和maximumPoolSize的含义;

2)其次,要知道Worker是用来起到什么作用的;

3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

每当当前的线程池中的线程数量低于设定值corePoolSize时,则每一个新到达的任务都会被用来生成一个新的子进程。

当当前线程池中的线程数目达到corePoolSize时,则每当有一个新任务到来时会被尝试加入到现有的任务缓存队列中;如果能够成功插入到队列中,则该新任务将被安排由空闲的服务器节点来处理;但如果插入操作未能成功(通常情况下认为...已经满了),则系统将启动资源分配机制以创建新的服务器节点来进行该新任务的处理。

如果当前任务池中的线程数目达到了最大值设定,则将启动任务拒绝机制以应对超出资源限制的情况。

当在线路中线路的数量超过了 corePoolSize 时,在线路空闲的时间达到了 keepAliveTime 之后,则该线路将会被移除直到此时线路的数量不超过 corePoolSize;若为核心线路设定存活期限,则在该线路的空闲时间达到了 keepAliveTime 后也会被移除。

3.线程池中的线程初始化

在默认状态下,在创建完线程池后,默认情况下其内部是没有线程的;只有当提交任务时才会生成新的线程

在实践中,在需要在线程池建立完成后立即启动线程的情况下,则可通过下面提到的两种方法来实现

prestartCoreThread():初始化一个核心线程;

prestartAllCoreThreads():初始化所有核心线程

下面是这2个方法的实现:

public boolean prestartCoreThread() {

return addIfUnderCorePoolSize(null); //注意传进去的参数是null

}

public int prestartAllCoreThreads() {

int n = 0;

while (addIfUnderCorePoolSize(null))//注意传进去的参数是null

++n;

return n;

}

请留意:传递给该方法的参数为null;参考第二部分的分析结果可知, 当传递给该方法的参数为null时, 最终执行的任务会阻塞在getTask方法中

r = workQueue.take();

即等待任务队列中有任务。

4.任务缓存队列及排队策略

在上文中, 我们多次提及了任务缓存队列, 即workQueue. 这个概念用于存储那些暂时未被处理的任务.

workQueue的类型为BlockingQueue,通常可以取下面三种类型:

ArrayBlockingQueue即为一种基于一种数组实现的先进先出队列结构,在实际应用中需预先定义其容量

LinkedBlockingQueue是以链表为基础实现的一种先进先出队列;若未预先指定容量,则默认容量为Integer.MAX_VALUE;

3)synchronousQueue:该队列具有特殊性,在其运作机制中未预先存储提交的任务,并采用直接创建新的线程的方式来处理新来的任务。

5.任务拒绝策略

一旦线程池的任务缓存队列已满同时其内部线程数量达到maximumPoolSize时,在新作业被触发的情况下会采用拒绝策略,并且通常会有四种不同的拒绝策略

ThreadPoolExecutor的AbortPolicy通过指定策略来放弃执行并触发相应的异常处理机制以实现资源的有效释放与错误管理

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy策略会舍去处于队列前端的任务,并不断重复这一过程以重新试一次执行这些被舍去的任务。

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

6.线程池的关闭

ThreadPoolExecutor 支持两种实现线程池关闭的操作方法: shutdown() 和 shutdownNow() 其中 shutdown() 用于终止所有注册在该 ThreadPoolExecutor 中的线程 而 shutdownNow() 则可以直接触发立即停止所有注册在该 ThreadPoolExecutor 中的线程

说明

shutdownNow():...该函数用于立即停止线程池服务并强行中断当前运行的所有任务进程,在此过程中释放在等待处理的所有任务队列成员之后,则会返回当前未被调度执行的任一任务

7.线程池容量的动态调整

ThreadPoolExecutor支持了动态调整线程池容量大小的功能,并提供了核心组件包括核心池大小设置函数和最大池大小限制函数

setCorePoolSize:设置核心池大小

setMaximumPoolSize:设置线程池最大能创建的线程数目大小

当该参数逐渐增大时, ThreadPoolExecutor负责将现有线程进行赋值,并在处理当前任务的同时可能立即启动新的子线程以处理后续的任务

使用示例

在之前的章节中, 我们深入探讨了线程池的工作机制; 在此章节, 我们将详细说明其应用场景

public class Test {

public static void main(String[] args) {

ThreadPoolExecutor executor = of new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,)

new ArrayBlockingQueue(5));

for(int i=0;i<15;i++){

MyTask myTask = new MyTask(i);

executor.execute(myTask);

System.out.println("正在监控任务执行情况:" + executor.getPoolSize() + "个线程和" + executor.getQueueSize() + "个未处理的任务");

该执行器队列的大小为:" + executor.getQueue().size() + ";已执行的任务数量为:" + executor.getCompletedTaskCount() + ";print());

}

executor.shutdown();

}

}

class MyTask implements Runnable {

private int taskNum;

public MyTask(int num) {

this.taskNum = num;

}

@Override

public void run() {

System.out.println("正在执行task "+taskNum);

try {

Thread.currentThread().sleep(4000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("task "+taskNum+"执行完毕");

}

}

执行结果:

正在执行task 0

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 1

线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 2

线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 3

线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0

正在执行task 4

线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0

线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 10

线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 11

线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 12

线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 13

线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0

正在执行task 14

task 3执行完毕

task 0执行完毕

task 2执行完毕

task 1执行完毕

正在执行task 8

正在执行task 7

正在执行task 6

正在执行task 5

task 4执行完毕

task 10执行完毕

task 11执行完毕

task 13执行完毕

task 12执行完毕

正在执行task 9

task 14执行完毕

task 8执行完毕

task 5执行完毕

task 7执行完毕

task 6执行完毕

task 9执行完毕

通过运行结果可以看出,在特定条件下(即当一个高负载的任务被分配到一个已满载的多核处理器上),系统会采取一定的资源管理措施。具体而言,在这种情况下(即当单个核心的负载超过预设阈值),系统会首先将该任务加入到一个临时缓存区域中等待处理。一旦该临时缓存区域达到容量限制,则会导致系统启动新线程的过程以分批处理这些待完成的任务。在上述程序代码中修改for循环部分为一次性提交20个子任务后...

虽然在Java文档中并不推荐直接使用ThreadPoolExecutor,并且建议我们采用Executors类中的静态方法来构建线程池

new DefaultConcurrentThreadPool(); // instantiation of a thread pool with a maximum size equal to Integer.MAX_VALUE

Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池

Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

在对具体实现的分析中,发现这些工具实际上是调用并使用了ThreadPoolExecutor,并且已经预先设置了相关参数。

newFixedThreadPool配置的核心池大小和最大池大小数值相等,并采用的是LinkedBlockingQueue

新单线程执行器将corePoolSize和maximumPoolSize均设为1,并采用LinkedBlockingQueue

newCachedThreadPool将核心队列大小(corePoolSize)设置为零,并将其最大队列大小(maximumPoolSize)配置为Integer.MAX_VALUE;该系统架构基于同步队列(SynchronousQueue)设计方案,在任务到达时自动启动相应的计算单元;当主线程闲置超过60秒时则会终止该进程

在实践中,当Executors提供的这三个静态方法能够满足需求时,优先考虑使用它所包含的这三个方法更为合适。然而,在手动配置ThreadPoolExecutor的相关参数上会比较繁琐,建议根据具体任务的不同类型和规模来设置。

此外,在ThreadPoolExecutor无法满足需求时,或许我们可以自定义一个子类并进行相应的修改。

如何合理配置线程池的大小

本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。

一般需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

就目前而言,在使用该算法时,默认值只是一个参考基准数值,在实际应用中可能需要根据具体情况作出微调。具体来说,在配置相关参数时应结合实际运行情况进行优化;例如在开始阶段可以先将线程池大小设定为默认的参考值之后再通过监控任务运行状况系统的负载情况以及资源利用效率等多方面因素来优化参数设置以达到最佳性能效果。

参考资料:

《JDK API 1.6》

全部评论 (0)

还没有任何评论哟~