java线程(四) 线程池原理
线程池
作用:限制线程数,管理线程、避免频繁创建和销毁线程造成性能损耗
ThreadPoolExecutor
作用
封装线程池的一系列逻辑,通过该类可创建线程池。
构造参数
- coreResourceNumber 核心资源数量,在运行任务前才会生成资源;一般情况下这些资源不会自动失效;除非调用
allowCoreThreadTimeOut方法设置为true,则允许长时间不使用而失效 - keepAliveTimeout 存活时间设置;当空闲时间超出指定阈值后将触发超时;默认情况下只有生成的核心资源数量对应的资源会在此情形下终止
- TimeUnit 指定等待超时的时间单位;支持7种不同的时间段选择
- workQueue 处理队列;当当前所有核心资源均被占用后新请求将被分配到该处理队列中等待处理;不同类型的处理队列可能具备不同的进队和出队逻辑
- maximumResourceNumber 最大可用资源数量;当阻塞状态下的处理请求达到该数值上限之后系统将尝试生成新的资源;如果当前总资源数目已经超越了最大限制,则该请求将会受到拒绝
- RejectHandler 拒绝处理机制;当阻塞状态下的请求数目已经超过了最大限制并且现有所有核心资源均被占用的情况下新请求将被直接拒绝;采用不同类型的拒绝策略会导致具体的处理执行流程有所差异
execute方法分析
works变量: 工作线程集合,存储处于运行状态的线程。
执行流程如下
- 首先获取当前运行中的线程数量。如果该数量少于
核心线程数corePoolSize值,则将当前任务封装为worker类实例,并通过创建一个Worker类实例来生成新的线程。该新线程会被自动注册到运行集合中,并立即启动以执行任务。 - 当
start方法被调用时会执行Worker类的run方法。在该方法中会首先等待并处理当前任务。当当前任务完成后,在阻塞队列workqueue中取出下一个等待的任务进行处理。若系统允许超时或当前可用的工人数低于核心阈值,则会在指定的时间间隔keepAliveTime内继续等待新任务的到来;但如果在规定时间内没有新的任务进入阻塞队列,则系统将停止处理。 - 当工作状态下的可用工人数少于核心数目时,在尝试增加至核心数目后仍不满足需求的情况下(即 CAS 操作失败),系统将在现有基础上不再增加工人数并停止处理;而若可用工人数多于核心数目,则会将新任务添加至阻塞队列中等待处理。
- 在无法将新任务成功添加至阻塞队列的情况下(即 CAS 操作失败),系统将会检查是否有可用的工作进程可供使用;如果有,则尝试将其添加至运行集合中以生成新的子进程;否则系统将会转而采用拒绝执行机制来处理请求。
- 当阻塞队列已满(即超过最大可容纳数目)时,则使用拒绝执行机制来处理请求。
工作队列
所有阻塞队列均基于BlockingQueue接口实现。 介绍常见阻塞队列的实现方式。 当一个队列被指定时间触发时,则通过Condition进行阻塞。 offer后通过唤醒机制来释放资源。 condition : ReentrantLock::newCondition获取的是一个ReentrantCondition对象。
所有阻塞队列均基于BlockingQueue接口实现。 介绍常见阻塞队列的实现方式。 当一个队列被指定时间触发时,则通过Condition进行阻塞。 offer后通过唤醒机制来释放资源。 condition : ReentrantLock::newCondition获取的是一个ReentrantCondition对象。
ArrayBlockingQueue
- 有限容量的阻塞队列(也称为有界队列),其特点在于具有限定容量,并采用FIFO顺序进行操作。进/出队操作均需使用同一个ReentrantLock进行加锁以确保互斥。
- 该算法采用基于数组的设计模式,在初始化阶段需预先设定一个固定容量。
LinkedBlockingQueue
- 有限大小的队列,在无需预先设定容量时,默认采用整型变量的最大值作为其存储空间上限。该结构中进队与出队操作均遵循先进先出原则,并共享一个同步机制来保证并发安全。
- 该结构内部使用双向链表形式实现数据存储机制,并支持预定义容量设置以满足特定应用场景需求。与基于数组实现的块式队列不同的是... 通过维护一个计数器来跟踪当前占用的空间资源,并且该设计允许设置最大允许容量限制以进一步优化资源利用效率。
PriorityBlockingQueue
- 优先级队列的工作原理是通过使用Comparator来比较和排序候选任务。
- 同一同步机制确保了所有进队和出队操作的安全一致性。
- 在基于最小堆的数据结构实现中使用动态数组作为存储空间,并支持无限容量扩展策略,在现有空间不足时自动增长至现有容量的一半。较小的任务具有较高的优先级。
SynchronousQueue
- 同步队列。每个进队都要等待另一个线程调用移除操作,否则一直阻塞。线程池调用的offer方法则不会去等待,直接存放后就返回。该队列底层使用
链表,通过cas和生产者消费者匹配模式避免锁操作,节点分为生产者和消费者两种.假设来了一个生产者,如果头部当前为生产者则入队,如果为消费者则通过cas与队列中的消费者cas匹配,无锁操作吞吐量高。 支持公平队列TransferQueue和非公平队列TransferStack,默认非公平。公平队列通过维护头尾节点实现,从头部开始匹配,发现是同个类型则把当前节点从尾节点插入。非公从头部开始匹配,发现是同个类型则头节点插入。- poll的时候,如果与头部相同类型,把当前线程封装成
SNode加入队列,通过LockSupport::park进入阻塞。当有offer进来匹配到时,通过SNode包含的Thread进行unpark唤醒这个线程。新进来的节点与头节点匹配时,把头节点的SNode的match设置成当前节点,被唤醒的。 - 带有效时间阻塞,通过park阻塞指定时间,醒来时会判断是否返回的匹配节点与阻塞时的节点是同一个,同一个是代表没有匹配成功。
- 按照这个队列的实现,链表上除了完成匹配的节点,要么为空,要么都是生产者节点,要么都是消费者节点。
- 队列分为三种状态
REQUEST拉去数据、DATA添加数据、FULFILLING匹配成功。poll方法对应REQUEST,offer方法对应DATA。进队时判断队列头与当前状态一样则将队列头替换为当前数据,旧的队列头为当前数据的next。通过将队列头cas为FULFILLING状态进行匹配,cas成功后则将当前队列头出队。
DelayQueue
延时式队列与无限制容量的前排系统结合使用,则该系统中的元素将根据其指定顺序进行存储与出排操作。实现getDelay()方法用于获取元素所需延迟时间;当返回值为零时不需额外等待;若数值大于零则需相应延长等待时间。
拒绝策略
当线程池达到最大处理能力时,新提交的任务将无法被提交至RejectedExecutionHandler
AbortPolicy 默认的拒绝策略,直接抛出异常
DiscardPolicy 该策略不做任何处理,也不抛出异常,相当于抛弃了任务
DiscardOldestPolicy 将会移除阻塞队列中的 oldest 任务,并通过 poll 方法从队列中弹出元素。即从头部弹出元素以释放资源。在这种情况下,则是最高优先度而非 oldest 的情况;之后将执行 execute() 任务
CallerRunsPolicy 如果执行失败则由主线程执行。
停止线程池执行
该线程池将被关闭以实现停止功能,在此操作下会暂停后续请求并等待待处理队列中的所有现有请求完成后再进行关闭
ForkJoinPool
自Java 7版本起推出的全新的线程池实现中包含了一些重要的改进功能。其中 ThreadPoolExecutor 线程池在处理单个耗时的任务时会存在不足之处:若某一任务运行时间过长,则会持续占用同一执行线程;其余所有线程则处于闲置状态。为此ForkJoinPool采用了分治法策略:将待执行的任务划分为若干子任务块;当子任务块划分至预设阈值后则由各个独立的执行单元分别处理。该机制允许用户根据实际需求指定最大运行线程数目:若未指定,则系统会自动采用当前处理器的核心数量作为最大线程数目。
该提交方法用于将任务封装为一个特定的对象类型——ForkJoinTask对象。此方法会在以下情况下执行:当接收的任务尚未被预先处理时(即尚未被预先处理),它会将其打包成指定类型的对象;而如果接收的任务已经是这种类型或其子类实例,则无需进行额外的打包操作以避免重复处理。
其父类为ForkJoinTask(抽象类),而RecursiveAction则负责处理无返回值的任务类型。为了使其功能得以实现,在继承了父类的基础上需重新编写compute方法以完成对计算过程以及将计算过程分解成若干小步骤的具体操作——即实现对计算过程进行细分与执行的过程。具体而言,在计算过程中若当前未达到预先设定的最大分割规模,则需将其分割为若干子进程以进一步处理;而后将这些子进程依次独立地投入运行中,在运行过程中采用的方式可划分为fork、join或invokeAll三种主要类型
RecursiveTask
A subclass of ForkJoinTask and an abstract class akin to RecursiveAction. Its main responsibility is to implement the compute method, which has a return value.
ForkJoinTask 执行子任务
该线程将多个子任务加入队列,并非立即等待这些任务的执行结果。这些未完成的任务将在后续操作中被提取或由其他线程主动获取后进行处理。
调用该方法类似于Thread类中的join方法,在此过程中线程会阻塞直至获得所有相关任务的执行结果。
通过该机制将当前子任务加入队列中,并由其他线程或后续代码从中提取并执行。
workQueues[]
该阻塞队列数组被设计成每个线程均独立拥有。当执行任务时会将其切分为子任务并通过随机数确定其所属的队列。这些子任务会被推送到对应的阻塞队列中。这些阻塞队列内部封装了一个用于管理工作的数组(即通过数组实现队列功能)。其中push操作仅限于当前线程完成(即该进程才会调用push方法),而pop操作则可由其他所有进程访问(即其他进程可以调用pop方法)。当某个进程的阻塞队列为空时系统会主动将其资源释放以便供其他进程使用(即这种资源释放的方式采用双端队列表现)。这种资源释放的方式采用双端队列表现并确保了高效且无竞争性的工作分配机制被称为work-stealing算法。
执行任务的三个方法
启动异步执行
提交异步执行,并获取ForkJoinTask;如果需要结果,则调用task.get()
调用同步等待以获取结果
在首次执行submit方法时, 将其封装为ForkJoinTask类, 并初始化一个workQueues数组(其大小设定为当前核心线程数最接近2的幂次方值的两倍)。随后初始化工作队列, 根据当前线程生成一个随机整数, 并将其映射至工作队列的位置索引, 将该workQueue对象按顺序添加至工作队列数组中, 同时也将具体的任务分配到该工作队列中。接着启动一个专门负责从队列中拉取任务的工作线程。当该线程从某一线程对应的队列中拉取完所有任务后, 会切换到其他对应的队列继续拉取任务, 直到目标工作队列中的数据耗尽为止。
Executors
Executors 提供了多种线程池,并支持便捷地建立这些线程池。通常采用 ThreadPoolExecutor 创建主线程池时会配置一些默认参数。
newCachedThreadPool
缓存机制实现的后台队列。该系统组件通过配置中未分配核心线程的方式设计,默认设置的最大进程数量可达计算机内存极限,并允许消息在网络状态下的60秒超时处理。采用同步机制的SynchronousQueue作为消息存储结构,并支持默认消息队列的消息路由策略。在配置中未分配任何核心线程的情况下,默认新请求将直接加入到默认消息队列中进行处理,并选择非公平的消息入队策略以确保响应及时性
- 第一个线程进入队列时,因为线程池调用的队列的不指定超时时间offer方法,此方法当队列头没数据时是直接返回的,由于最大线程数是Integer的最大值,所以会直接创建一个线程执行任务,执行结束后会带
超时时间keepAliveTime调用poll方法获取任务,此时会进队等待匹配。 - 此时来第二个任务时,该任务与该线程匹配到,此时会将队列头替换成下一个元素(又变成空),返回该任务,该线程执行该任务。
- 此时如果第二任务线程还没执行,那么第三个任务调用poll方法又是返回空,所以会再去创建线程。
- 如果两个线程都执行完,都带超时时间进队,那么都会进队,变成head=线程1,线程.next=线程2。
基于此分析可知,在每一个新到来的任务中都会执行以下操作:首先检查是否存在空闲的主线程;如果不存在,则生成一个新的子线程;如果存在,则会复用现有线程;并且最高允许开销的子线程数量被设定为整型的最大值即Integer.MAX_VALUE
newFixedThreadPool
固定式多核并行计算框架。通过配置参数中指定所需的并行处理核数创建该对象,并确保系统内核心计算单元的数量与最大开启的子任务数量保持一致。队列采用的是阻塞队列机制,默认情况下任务处理不超时且会持续占用主线程资源以完成所有计算任务。允许的最大queued任务数量设为整型数据类型的上限(即int的最大值),当当前已有的queued任务数量超过core worker的数量时会直接被加入到队列中等待处理;如果此时queue已经满了,则后续的新请求将被直接拒绝而不再进入queue。
新型单一线程池工具实现了高效的资源管理。该系统遵循固定的资源分配策略,并将所有作业统一调度至一个专门的队列中运行。
为实现定时与延时任务执行提供线程池支持。通过继承自ThreadPoolExecutor类创建ScheduledThreadPoolExecutor类的方式实现了该线程池。
- 配置指定的核心线程数目,并将最大数量设置为整型的最大值;同时将keepAliveTime设为0;该 ThreadPool 的执行方法仅限于当运行中的线程数量未超过核心数目时才会创建新线程去队列中拉取任务;否则不会生成新线程参与拉取工作;因此最大数量和保持存活时间参数在此场景下实际上不起作用;此外需要注意的是,在任何情况下都不会让总运行线程数目超过核心数目。
- 队列由类内部封装实现的一个延迟队列(DelayedWorkQueue)。该队列基于数组实现,在内存不足时会按1.5倍的比例进行扩展(小顶堆)。在使用 ScheduledThreadPoolExecutor 的 schedule 方法执行延迟任务时,系统会将每个任务封装成一个 ScheduledFutureTask 实例(包含任务本身以及从当前时间开始所需要的时间作为延时时间)。这个过程会在完成之后对所有等待执行的任务按照其启动执行的时间进行排序:时间较短的任务会被安排在前面;如果有多个任务具有相同的启动时间,则较早被加入队列的任务会被优先处理。
- 在每次新旧交替的任务被创建并完成之后,在没有超出核心数目限制的前提下系统会自动创建新的运行线程去从队首位置依次拉取并执行这些待办事项。
- 如果是按照固定速率运行的任务,则会在每次任务完成后计算间隔并将该间隔值与上一个任务开始的时间相加得到下一个任务应该启动的时间值;随后将其加入到当前的工作队列中以便后续处理。
newWorkStealingPool
