源码:java.util.concurrent.ThreadPoolExecutor 源码解析

1. TODO


2. 脑图

  1. Xmind

  2. Edraw

  3. Hexo 地址
    👉 http://blog.wangjia.ink/2025/09/03/源码:java.util.concurrent.ThreadPoolExecutor源码解析/


3. 基础部分

3.1. ThreadPoolExecutor 概述

ThreadPoolExecutor 是一个具体类,继承了 java.util.concurrent.AbstractExecutorService

[!NOTE] 注意事项

  1. 详见源码:AbstractExecutorService
    1. obsidian 内部链接:
      1. 源码:java.util.concurrent.AbstractExecutorService源码解析
    2. Hexo 链接:
      1. http://blog.wangjia.ink/2025/11/03/源码:java.util.concurrent.AbstractExecutorService源码解析/

3.2. ThreadPoolExecutor 相关状态

  1. TERMINATED011
    1. 表示线程池关闭
  2. TIDYING010
    1. 表示线程池终止前的过渡状态
    2. 只有任务队列为空、所有任务停止、工作线程的数量为 0
  3. STOP001
    1. 表示因为调用了 ExecutorService#shutdownNow 而进入的状态
    2. 正在运行的任务会被停止,并且返回还没开始执行的 Runnable 任务列表,线程池不会再接收新的任务
    3. 需要注意的是:
      1. 如果任务本身并没有响应中断,那么正在执行任务的线程可能不会立刻停下,直到执行完这个任务
  4. SHUTDOWN000
    1. 表示因为调用了 ExecutorService#shutdown 而进入的状态
    2. 已提交的任务会继续执行,但线程池不会再接收新的任务
  5. RUNNING111
    1. 表示线程池的初始状态
    2. 已提交的任务会继续执行,线程池也接收新的任务

[!NOTE] 注意事项

  1. ThreadPoolExecutor 的状态,我们更常把他叫做 “线程池的状态”
  2. 线程池的状态,根据严重程度来看:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
  3. 取消任务和停止任务还是有所区别的:取消任务是指取消那些尚未执行的任务。而停止任务则是任务正在执行时,你通过中断去让它停下来。如果任务能够响应中断,那就算是成功停止。但问题在于,如果任务没有响应中断,那么即使你发出了中断请求,它仍然会继续执行。所以 “停止任务” 并不一定能让它 “停止下来”

4. 内部类

4.1. Worker

详见源码:ThreadPoolExecutor.Worker

  1. obsidian 内部链接:
    1. 源码:java.util.concurrent.ThreadPoolExecutor.Worker源码解析
  2. Hexo 链接:
    1. http://blog.wangjia.ink/2025/11/04/源码:java.util.concurrent.ThreadPoolExecutor.Worker源码解析/

AbortPolicy

AbortPolicyThreadPoolExecutor 内置的拒绝策略,用于直接抛出 RejectedExecutionException 非受检异常

1
2
3
4
5
6
7
8
9
10
11
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}

}

DiscardPolicy

DiscardPolicyThreadPoolExecutor 内置的拒绝策略,用于直接丢弃该任务

1
2
3
4
5
6
7
8
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

}

CallerRunsPolicy

CallerRunsPolicyThreadPoolExecutor 内置的拒绝策略,用于让调用 Executor#execute 的线程自己去执行这个任务。简单来说就是:谁派来的活,谁自己去干

1
2
3
4
5
6
7
8
9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}

}

DiscardOldestPolicy

DiscardOldestPolicyThreadPoolExecutor 内置的拒绝策略,用于直接丢弃任务队列中排队最久的任务,然后尝试把这个任务投递到任务队列

1
2
3
4
5
6
7
8
9
10
11
12
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}

}

5. 核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* ============================================
* 属性:private static final int COUNT_MASK
* --------------------------------------------
* 1. 访问修饰符
* 1. private
* 1. 独守秘密(类内部)
* 2. 由于是别人封装好的,只能尝试类的内部方法
*
* 2. 非访问修饰符
* 1. final
* 1. 不能修改该属性的值
* 2. 必须在定义时直接赋值,或者在构造函数中保证赋值
*
* 3. 作用
* 1. 表示 ThreadPoolExecutor 的状态和工作线程的个数
* 2. ThreadPoolExecutor 的状态,我们更常把他叫做 “线程池的状态”
*
* 4. 初始值
* 1. 线程池的状态为 RUNNING,工作线程为 0
*
* 5. 详细介绍
*
* ctl 是一个 int 类型的变量,共有 32 bit,其中高 3 bit 表示线程池的状态,低 29 bit 表示工作线程的个数
*
* ============================================
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 表示保存线程池个数的位数(低 bit)
private static final int COUNT_BITS = Integer.SIZE - 3;

/**
* ============================================
* 属性:private static final int COUNT_MASK
* --------------------------------------------
* 1. 访问修饰符
* 1. private
* 1. 独守秘密(类内部)
* 2. 由于是别人封装好的,只能尝试类的内部方法
*
* 2. 非访问修饰符
* 1. final
* 1. 不能修改该属性的值
* 2. 必须在定义时直接赋值,或者在构造函数中保证赋值
* 1. static
* 1. 可以通过类名直接访问该属性
*
* 3. 作用
* 1. 表示保存线程池状态的位数(高 bit)
*
* 4. 初始值
* 1. (1 << COUNT_BITS) - 1
* 2. 即 00011111 11111111 11111111 11111111
*
* 5. 详细介绍
*
* 1 的二进制:
* 00000000 00000000 00000000 00000001
*
* 1 << 29:
* 把二进制数整体往左挪 29 位,左边溢出的丢掉,右边空出来的补零
* 00100000 00000000 00000000 00000000
*
* (1 << 29) - 1:
* 00011111 11111111 11111111 11111111
*
* ============================================
*/
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

/**
* ============================================
* 属性:private static final int RUNNING
* --------------------------------------------
* 1. 访问修饰符
* 1. private
* 1. 独守秘密(类内部)
* 2. 由于是别人封装好的,只能尝试类的内部方法
*
* 2. 非访问修饰符
* 1. final
* 1. 不能修改该属性的值
* 2. 必须在定义时直接赋值,或者在构造函数中保证赋值
* 1. static
* 1. 可以通过类名直接访问该属性
*
* 3. 作用
* 1. 用于界定线程池的 RUNNING 状态
*
* 4. 初始值
* 1. -1 << COUNT_BITS
* 2. 即 11100000 00000000 00000000 00000000
*
* 5. 详细介绍
*
* -1 的二进制:
* 11111111 11111111 11111111 11111111
*
* -1 << 29:
* 11100000 00000000 00000000 00000000
*
* ============================================
*/
private static final int RUNNING = -1 << COUNT_BITS;

// 用于界定线程池的 SHUTDOWN 状态(000)
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 用于界定线程池的 STOP 状态(001)
private static final int STOP = 1 << COUNT_BITS;

// 用于界定线程池的 TIDYING 状态(010)
private static final int TIDYING = 2 << COUNT_BITS;

// 用于界定线程池的 TERMINATED 状态(011)
private static final int TERMINATED = 3 << COUNT_BITS;

// 表示 TPE 的任务队列(单向队列),如果任务不能作为核心工作线程的 "启动任务",则被投递到该队列
private final BlockingQueue<Runnable> workQueue;

private final ReentrantLock mainLock = new ReentrantLock();

// 表示 TPE 的 Worker 集合,所有 Worker 都会被投递到该集合
private final HashSet<Worker> workers = new HashSet<>();

// 表示 TPE 使用 AQS 创建的条件队列(双向),所有因为调用 ExecutorService#awaitTermination 而阻塞的 Thread 实例会被投递到该队列
private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

private volatile long keepAliveTime;

private volatile boolean allowCoreThreadTimeOut;

private volatile int corePoolSize;

private volatile int maximumPoolSize;

private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");

6. 构造方法

6.1. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

ThreadPoolExecutor 虽然提供了 4 个构造方法,但是本质上都是调用这个构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* ============================================
* 构造方法:
* public ThreadPoolExecutor(
* int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue<Runnable> workQueue,
* ThreadFactory threadFactory,
* RejectedExecutionHandler handler
* )
* --------------------------------------------
* 1. 访问修饰符
* 1. public
* 1. 天下皆公(EveryWhere)
*
* 2. 非访问修饰符
*
* 3. 方法参数
* 1. int corePoolSize
* 1. 指定线程池中的核心工作线程被允许的最大数量
* 2. 需要注意的是:
* 1. 该参数可以为 0
* 2. int maximumPoolSize
* 1. 指定线程池中的工作线程被允许的最大数量
* 2. 需要注意的是:
* 1. 工作线程的数量 = 核心工作线程的数量 + 非核心工作线程的数量
* 3. long keepAliveTime
* 1. 指定非核心工作线程 “空闲” 被允许的最长时间为 keepAliveTime unit
* 2. 需要注意的是:
* 1. 非核心线程 “空闲” 超时后会被销毁
* 2. 核心线程 “空闲” 超时后不会被销毁
* 4. TimeUnit unit
* 1. 指定非核心工作线程 “空闲” 被允许的最长时间为 keepAliveTime unit
* 5. BlockingQueue<Runnable> workQueue
* 1. 任务在没有核心工作线程处理时,先被投递到这个队列中
* 2. 需要注意的是,是没有核心工作线程处理,而不是工作线程处理
* 6. ThreadFactory threadFactory
* 1. 指定创建 Thread 实例的线程工厂
* 7. RejectedExecutionHandler handler
* 1. 指定拒绝策略
* 2. 当任务队列已满并且线程池中的工作线程数量已经达到了 maximumPoolSize,这时候再投递任务,就需要执行拒绝策略
* 3. 需要注意的是:ThreadPoolExecutor 内置了 4 个拒绝策略,分别是
* 1. ThreadPoolExecutor.AbortPolicy
* 2. ThreadPoolExecutor.DiscardPolicy
* 3. ThreadPoolExecutor.CallerRunsPolicy
* 4. ThreadPoolExecutor.DiscardOldestPolicy
*
* 5. 抛出异常
*
* 6. 构造实例
*
* ThreadPoolExecutor executor = new ThreadPoolExecutor(
* 2,
* 4,
* 10,
* TimeUnit.SECONDS,
* new ArrayBlockingQueue<>(2),
* new ThreadFactory() {
* private int count = 1;
* public Thread newThread(Runnable r) {
* return new Thread(r, "Worker-" + count++);
* }
* },
* new ThreadPoolExecutor.AbortPolicy()
* );
*
* ============================================
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

7. 实例方法

7.1. 实例具体方法

7.1.1. 具体方法(普通)

7.1.1.1. boolean addWorker(Runnable firstTask, boolean core)

该方法用于创建并启动一个 Worder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
private boolean addWorker(Runnable firstTask, boolean core) {

// 为循环起一个 Label
retry:

// 不断调用 AtomicInteger#get 获取 ctl 属性
for (int c = ctl.get();;) {

// 如果
// 1. 调用 ThreadPoolExecutor#runStateAtLeast,发现线程池的状态至少为 SHUTDOWN(即 TERMINATED、TIDYING、STOP、SHUTDOWN)
// 2. 并且
// 1. 要么调用 ThreadPoolExecutor#runStateAtLeast,发现线程池的状态至少为 STOP(即 TERMINATED、TIDYING、STOP)
// 2. 要么 firstTask 方法参数为 null
// 3. 要么调用 Collection#isEmpty,发现任务队列为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || firstTask != null|| workQueue.isEmpty()))
return false;

for (;;) {

// 如果调用 ThreadPoolExecutor#workerCountOf,发现线程池中的工作线程的数量 > 本次检查的线程数量上限
//
// (core ? corePoolSize : maximumPoolSize) 是指:如果 core 为 true,本次检查的线程数量上限为核心工作线程被允许的最大数量。如果 core 为 false,本次检查的线程数量上限为工作线程被允许的最大数量
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;

// 如果调用 ThreadPoolExecutor#compareAndIncrementWorkerCount,成功将工作线程 + 1
if (compareAndIncrementWorkerCount(c))
break retry;

// 调用 AtomicInteger#get 获取 ctl 属性
c = ctl.get();

// 如果调用 ThreadPoolExecutor#runStateAtLeast,发现线程池的状态至少为 SHUTDOWN(即 TERMINATED、TIDYING、STOP、SHUTDOWN)
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 创建 Worker 实例
w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {

if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();

// 调用 HashSet#add,向 ThreadPoolExecutor->workers 添加新创建的 Worker 实例
workers.add(w);

workerAdded = true;

int s = workers.size();

if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {

mainLock.unlock();
}
if (workerAdded) {

// 启动 Thread 实例
t.start();

workerStarted = true;
}
}
} finally {

if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

}

7.1.1.2. void runWorker(Worker w)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock();

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}

}

7.1.2. 具体方法(实现)

7.1.2.1. Executor 中接口方法的实现
7.1.2.1.1. void execute(Runnable command)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {

if (command == null)
throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);

}

7.1.2.2. ExecutorService 中接口方法的实现
7.1.2.2.1. void shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

7.1.2.2.2. List<Runnable> shutdownNow()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

7.1.2.2.3. boolean awaitTermination(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}


源码:java.util.concurrent.ThreadPoolExecutor 源码解析
https://wangjia5289.github.io/2025/09/03/源码:java.util.concurrent.ThreadPoolExecutor源码解析/
Author
咸阳猴🐒
Posted on
September 3, 2025
Licensed under