Advertisement

Java多线程-什么是线程池

阅读量:

问题

  • 如何维持Worker?
  • 在run方法中存在一个while循环,在getTask操作时Worker被阻塞于阻塞队列的消费端。
  • 如何导致Worker失效?
  • 在getTask过程中,核心池中的线程耗尽后将返回null值;随后在run方法中的while循环将被终止并退出该流程。

ThreadPoolExecutor

构造函数
复制代码
 public ThreadPoolExecutor(int corePoolSize,

    
                           int maximumPoolSize,
    
                           long keepAliveTime,
    
                           TimeUnit unit,
    
                           BlockingQueue<Runnable> workQueue,
    
                           ThreadFactory threadFactory,
    
                           RejectedExecutionHandler handler) {
    
     if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
    
     throw new IllegalArgumentException();
    
     if (workQueue == null || threadFactory == null || handler == null)
    
     throw new NullPointerException();
    
     this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    
     this.corePoolSize = corePoolSize;
    
     this.maximumPoolSize = maximumPoolSize;
    
     this.workQueue = workQueue;
    
     this.keepAliveTime = unit.toNanos(keepAliveTime);
    
     this.threadFactory = threadFactory;
    
     this.handler = handler;
    
 }
状态参数
复制代码
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    
 private static final int COUNT_BITS = Integer.SIZE - 3;
    
 private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 29个1
    
  
    
 // runState is stored in the high-order bits
    
 private static final int RUNNING    = -1 << COUNT_BITS; // 111 + 29个0
    
 private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 + 29个0
    
 private static final int STOP       =  1 << COUNT_BITS; // 001 + 29个0
    
 private static final int TIDYING    =  2 << COUNT_BITS; // 010 + 29个0
    
 private static final int TERMINATED =  3 << COUNT_BITS; // 011 + 29个0
    
  
    
 // Packing and unpacking ctl
    
 private static int runStateOf(int c)     { return c & ~CAPACITY; } // 取c前3位
    
 private static int workerCountOf(int c)  { return c & CAPACITY; }  // 取c后29位
    
 private static int ctlOf(int rs, int wc) { return rs | wc; }
  • ctl初始值:111 + 29个0
execute
复制代码
 public void execute(Runnable command) {

    
     if (command == null)
    
     throw new NullPointerException();
    
     int c = ctl.get();
    
     if (workerCountOf(c) < corePoolSize) { // 第1次时workerCountOf(c)为0
    
     if (addWorker(command, true))
    
         return;
    
     c = ctl.get();
    
     }
    
     if (isRunning(c) && workQueue.offer(command)) {
    
     int recheck = ctl.get();
    
     if (!isRunning(recheck) && remove(command))
    
         reject(command);
    
     else if (workerCountOf(recheck) == 0)
    
         addWorker(null, false);
    
     }
    
     else if (!addWorker(command, false))
    
     reject(command);
    
 }
    
  
    
 // Worker是个AQS,也是个Runnable
    
 private boolean addWorker(Runnable firstTask, boolean core) {
    
     retry:
    
     for (;;) {
    
     int c = ctl.get();
    
     int rs = runStateOf(c);
    
  
    
     // 大于SHUTDOWN || 等于SHUTDOWN && firstTask不为空 || 等于SHUTDOWN && workQueue为空,返回false
    
     // 含义是:仅在等于SHUTDOWN时,会接着消耗workQueue
    
     if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
    
         return false;
    
  
    
     for (;;) {
    
         int wc = workerCountOf(c);
    
         if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
    
             return false; // 超CAPACITY || 超size,返回false
    
         if (compareAndIncrementWorkerCount(c)) // 111 + 29个0 -> 111 + 28个0 + 1
    
             break retry; // +1成功,直接break
    
         c = ctl.get();  // Re-read ctl
    
         if (runStateOf(c) != rs)
    
             continue retry;
    
         // else CAS failed due to workerCount change; retry inner loop
    
     }
    
     }
    
  
    
     boolean workerStarted = false;
    
     boolean workerAdded = false;
    
     Worker w = null;
    
     try {
    
     w = new Worker(firstTask); // Worker extends AQS implements Runnable
    
     final Thread t = w.thread;
    
     if (t != null) {
    
         final ReentrantLock mainLock = this.mainLock; // 非公平ReentrantLock
    
         mainLock.lock();
    
         try {
    
             // Recheck while holding lock.
    
             // Back out on ThreadFactory failure or if shut down before lock acquired.
    
             int rs = runStateOf(ctl.get());
    
  
    
             if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
    
                 if (t.isAlive()) // precheck that t is startable
    
                     throw new IllegalThreadStateException();
    
                 workers.add(w); // workers:HashSet<Worker>
    
                 int s = workers.size();
    
                 if (s > largestPoolSize) // int largestPoolSize
    
                     largestPoolSize = s;
    
                 workerAdded = true;
    
             }
    
         } finally {
    
             mainLock.unlock();
    
         }
    
         if (workerAdded) {
    
             t.start();
    
             workerStarted = true;
    
         }
    
     }
    
     } finally {
    
     if (! workerStarted)
    
         addWorkerFailed(w);
    
     }
    
     return workerStarted;
    
 }
    
  
    
 Worker(Runnable firstTask) {
    
     setState(-1); // inhibit interrupts until runWorker
    
     this.firstTask = firstTask;
    
     this.thread = getThreadFactory().newThread(this);
    
 }
  • 通过最后定义Worker构造函数的方式可以看出,在thread参数中传递了与Worker相关的数据,并且其核心任务即为firstTask属性的处理
    • 可以确定,在t.start()处所执行的操作实际上是该线程的run方法运行过程
run
复制代码
 public void run() {

    
     runWorker(this); // this是Worker
    
 }
    
  
    
 final void runWorker(Worker w) {
    
     Thread wt = Thread.currentThread();
    
     Runnable task = w.firstTask;
    
     w.firstTask = null;
    
     w.unlock(); // allow interrupts
    
     boolean completedAbruptly = true;
    
     try {
    
     while (task != null || (task = getTask()) != null) { // firstTask不为空,或者workQueue不为空,轮询执行任务
    
         w.lock();
    
         if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
    
             wt.interrupt();
    
         try {
    
             beforeExecute(wt, task);
    
             Throwable thrown = null;
    
             try {
    
                 task.run();
    
             } catch (RuntimeException x) {
    
                 thrown = x; throw x;
    
             } catch (Error x) {
    
                 thrown = x; throw x;
    
             } catch (Throwable x) {
    
                 thrown = x; throw new Error(x);
    
             } finally {
    
                 afterExecute(task, thrown);
    
             }
    
         } finally {
    
             task = null;
    
             w.completedTasks++;
    
             w.unlock();
    
         }
    
     }
    
     completedAbruptly = false;
    
     } finally {
    
     processWorkerExit(w, completedAbruptly);
    
     }
    
 }
  • 执行流程如下:运行 worker 生成新的 Worker(编号为 ctl+1),随后启动 worker 程序(t.start),该程序会调用 run 函数,并依次执行 runWorker 和 task 的相关操作。
  • 当 corePoolSize 未达到最大值时(即 corePoolSize 未满),新加入的任务会被分配至新增 Worker 上,并且这些 Worker 都会被分配到 workQueue 队列中等待处理。
  • 当 corePoolSize 已达到最大值(即 corePoolSize 满)时,则新加入的任务会被分配至 workQueue 队列中等待处理。
  • 如果 workQueue 队列已经满了,则会触发 maximumPoolSize 的设置以应对超出容量的情况。
  • 超出 corePoolSize 的工人们,在完成工作队列中的所有任务后会进入 processWorkerExit 过程,并将自己从 workers 列表中移除(即自失效)。
ThreadFactory
复制代码
 // default

    
 Executors.defaultThreadFactory()
    
  
    
 static class DefaultThreadFactory implements ThreadFactory {
    
     private static final AtomicInteger poolNumber = new AtomicInteger(1);
    
     private final ThreadGroup group;
    
     private final AtomicInteger threadNumber = new AtomicInteger(1);
    
     private final String namePrefix;
    
  
    
     DefaultThreadFactory() {
    
     SecurityManager s = System.getSecurityManager();
    
     group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    
     namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    
     }
    
  
    
     public Thread newThread(Runnable r) {
    
     Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
    
     if (t.isDaemon())
    
         t.setDaemon(false);
    
     if (t.getPriority() != Thread.NORM_PRIORITY)
    
         t.setPriority(Thread.NORM_PRIORITY);
    
     return t;
    
     }
    
 }
  • 创建新的线段。
    • 配置为非daemon模式。
    • 指定为普通优先级。
    • 在该线程池中运行时,默认情况下会使用由线程工厂生成并返回的线段。
reject策略
  • 当maximumPoolSize依旧被耗光时
    • 内置4种类型,默认设置为AbortPolicy
    • AbortPolicy - 以简单粗暴的方式抛异常
复制代码
 public static class AbortPolicy implements RejectedExecutionHandler {

    
     public AbortPolicy() {}
    
  
    
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    
     throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    
     }
    
 }
  • DiscardPolicy - 丢弃任务,啥也不干
复制代码
 public static class DiscardPolicy implements RejectedExecutionHandler {

    
     public DiscardPolicy() {}
    
  
    
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    
     }
    
 }

DiscardOldestPolicy - 该策略被设计用于移除工作队列中最先加入的所有命令,并处理所有后续加入的工作

复制代码
 public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    
     public DiscardOldestPolicy() {}
    
  
    
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    
     if (!e.isShutdown()) {
    
         e.getQueue().poll();
    
         e.execute(r);
    
     }
    
     }
    
 }
  • CallerRunsPolicy - 调用线程池方法的线程同步执行run方法
复制代码
 public static class CallerRunsPolicy implements RejectedExecutionHandler {

    
     public CallerRunsPolicy() {}
    
  
    
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    
     if (!e.isShutdown()) {
    
         r.run();
    
     }
    
     }
    
 }
线程池状态
  • RUNNING
    • 创建后的初始状态
    • 可接收新任务
    • 可处理workQueue任务
  • SHUTDOWN
    • RUNNING -> shutdown() -> SHUTDOWN
    • 不接收新任务
    • 可处理workQueue任务
  • STOP
    • RUNNING | SHUTDOWN -> shutdownNow() -> STOP
    • 不接收新任务
    • 不处理workQueue任务
    • 中断正在处理的任务
  • TIDYING
    • SHUTDOWN -> workQueue 和 执行中变空 -> TIDYING
    • STOP -> 执行中变空 -> TIDYING
    • 标识:workerCountOf(ctl) == 0
    • 会执行terminated()方法做后续处理,默认为空
  • TERMINATED
    • TIDYING -> terminated() -> TERMINATED
    • 线程池彻底终止

全部评论 (0)

还没有任何评论哟~