Last updated on 2025-11-23T21:13:46+08:00
1. TODO
2. 脑图
Xmind
Edraw
Hexo 地址 👉 http://blog.wangjia.ink/2025/09/03/源码:java.util.concurrent.ThreadPoolExecutor源码解析/
3. 基础部分 3.1. ThreadPoolExecutor 概述 ThreadPoolExecutor 是一个具体类,继承了 java.util.concurrent.AbstractExecutorService
[!NOTE] 注意事项
详见源码:AbstractExecutorService
obsidian 内部链接:
源码:java.util.concurrent.AbstractExecutorService源码解析
Hexo 链接:
http://blog.wangjia.ink/2025/11/03/源码:java.util.concurrent.AbstractExecutorService源码解析/
3.2. ThreadPoolExecutor 相关状态
TERMINATED(011)
表示线程池关闭
TIDYING(010)
表示线程池终止前的过渡状态
只有任务队列为空、所有任务停止、工作线程的数量为 0
STOP(001)
表示因为调用了 ExecutorService#shutdownNow 而进入的状态
正在运行的任务会被停止,并且返回还没开始执行的 Runnable 任务列表,线程池不会再接收新的任务
需要注意的是:
如果任务本身并没有响应中断,那么正在执行任务的线程可能不会立刻停下,直到执行完这个任务
SHUTDOWN(000)
表示因为调用了 ExecutorService#shutdown 而进入的状态
已提交的任务会继续执行,但线程池不会再接收新的任务
RUNNING(111)
表示线程池的初始状态
已提交的任务会继续执行,线程池也接收新的任务
[!NOTE] 注意事项
ThreadPoolExecutor 的状态,我们更常把他叫做 “线程池的状态”
线程池的状态,根据严重程度来看:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
取消任务和停止任务还是有所区别的:取消任务是指取消那些尚未执行的任务。而停止任务则是任务正在执行时,你通过中断去让它停下来。如果任务能够响应中断,那就算是成功停止。但问题在于,如果任务没有响应中断,那么即使你发出了中断请求,它仍然会继续执行。所以 “停止任务” 并不一定能让它 “停止下来”
4. 内部类 4.1. Worker 详见源码:ThreadPoolExecutor.Worker
obsidian 内部链接:
源码:java.util.concurrent.ThreadPoolExecutor.Worker源码解析
Hexo 链接:
http://blog.wangjia.ink/2025/11/04/源码:java.util.concurrent.ThreadPoolExecutor.Worker源码解析/
AbortPolicy AbortPolicy 是 ThreadPoolExecutor 内置的拒绝策略,用于直接抛出 RejectedExecutionException 非受检异常
1 2 3 4 5 6 7 8 9 10 11 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy DiscardPolicy 是 ThreadPoolExecutor 内置的拒绝策略,用于直接丢弃该任务
1 2 3 4 5 6 7 8 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
CallerRunsPolicy CallerRunsPolicy 是 ThreadPoolExecutor 内置的拒绝策略,用于让调用 Executor#execute 的线程自己去执行这个任务。简单来说就是:谁派来的活,谁自己去干
1 2 3 4 5 6 7 8 9 10 11 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy DiscardOldestPolicy 是 ThreadPoolExecutor 内置的拒绝策略,用于直接丢弃任务队列中排队最久的任务,然后尝试把这个任务投递到任务队列
1 2 3 4 5 6 7 8 9 10 11 12 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
5. 核心属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;private static final RejectedExecutionHandler defaultHandler = new AbortPolicy ();private static final RuntimePermission shutdownPerm = new RuntimePermission ("modifyThread" );
6. 构造方法 6.1. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ThreadPoolExecutor 虽然提供了 4 个构造方法,但是本质上都是调用这个构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
7. 实例方法 7.1. 实例具体方法 7.1.1. 具体方法(普通) 7.1.1.1. boolean addWorker(Runnable firstTask, boolean core) 该方法用于创建并启动一个 Worder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false ; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null )) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException (); workers.add(w); workerAdded = true ; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
7.1.1.2. void runWorker(Worker w) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null ); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
7.1.2. 具体方法(实现) 7.1.2.1. Executor 中接口方法的实现 7.1.2.1.1. void execute(Runnable command) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
7.1.2.2. ExecutorService 中接口方法的实现 7.1.2.2.1. void shutdown() 1 2 3 4 5 6 7 8 9 10 11 12 13 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
7.1.2.2.2. List<Runnable> shutdownNow() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
7.1.2.2.3. boolean awaitTermination(long timeout, TimeUnit unit) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L ) return false ; nanos = termination.awaitNanos(nanos); } return true ; } finally { mainLock.unlock(); } }