源码:java.util.concurrent.ForkJoinPool 源码解析

1. TODO


2. 脑图

  1. Xmind

  2. Edraw

  3. Hexo 地址
    👉 http://blog.wangjia.ink/2025/11/14/源码:java.util.concurrent.ForkJoinPool源码解析/


3. 基础部分

3.1. ForkJoinPool 概述

ForkJoinPool 是一个具体类,继承了 java.util.concurrent.AbstractExecutorService

ForkJoinPool 被设计用于高效执行那些可以被 ”递归拆分“ 的任务,其核心特性是:

  1. 分而治之
  2. 递归拆分
  3. 工作窃取

[!NOTE] 注意事项

  1. 详见源码:AbstractExecutorService
    1. obsidian 内部链接:
      1. 源码:java.util.concurrent.AbstractExecutorService源码解析
    2. Hexo 链接:
      1. http://blog.wangjia.ink/2025/11/03/源码:java.util.concurrent.AbstractExecutorService源码解析/
  2. 对于 ForkJoinPoolForkJoinTask<V>RecursiveActionRecursiveTask<V>,可以说是整个 JUC 中最复杂的一部分,Doug Lea 当初为它们投入了大量心血。对于我们而言,只需要掌握它们的使用方式即可,没必要对源码进行过度深究

3.2. ForkJoinPool 相关流程

我们知道:ThreadPoolExecutorScheduledThreadPoolExecutor 的线程执行流程大致为:本地线程 ➔ Thread#run ➔ Worker#run

ForkJoinPool 并没有使用 Worker,甚至没有使用原生的 Thread,而是直接使用了 ForkJoinWorkerThreadForkJoinWorkerThread 继承了 Thread

你可能会疑惑:为什么要让 ForkJoinWorkerThread 继承 Thread,而不是继续使用 Worker 或者在 Worker 的基础上 “魔改” 呢?原因很简单,因为我们要为每个线程单独维护一个任务队列

每个线程都会维护一个 ForkJoinPool.WorkQueue,这是一个双端队列。并且这些队列都会被注册到 ForkJoinPool->queues

当线程去执行一个大任务 A 时,可以把 A 拆分成多个子任务(例如 BCD),这些子任务会被投递到该本地线程维护的任务队列中。然后你是不是认为 “线程需要阻塞等待 BCD 的完成呢?”

接下来的一部分,是 ForkJoinPool 设计最精妙的地方:

  1. 当一个任务 A 被拆分成子任务 BCD,并被投递到任务队列后,按理说当前线程应该阻塞并等待 BCD 的完成。然而并非如此:当前线程确实在等待,但并没有阻塞。相反,它会继续执行当前线程维护的任务队列中的其他任务
  2. 假如执行到了 B,在执行 B 的过程中,B 又可以进一步拆分成更小的子任务 EFG,并被投递到任务队列后,同样在等待子任务的结果的同时,当前线程并不会立即阻塞,而是会继续执行当前线程维护的任务队列中的其他任务。以此类推…直到任务被拆分到足够小
  3. 当子任务完成后,它的结果会返回给父任务。父任务在收到所有子任务结果后,再将汇总结果返回给自己的父任务。以此类推…直到最初的大任务最终得到完整的计算结果
  4. 需要注意的是:
    1. 如果当前线程维护的任务队列中没有其他任务了,它会根据 ForkJoinPool->queues 随机选择一个其他线程维护的任务队列,从队尾 “窃取” 一个任务来执行
    2. 如果当前线程维护的任务队列中没有其他任务了,也 “窃取” 不到任务了,就会有以下两种情况:
      1. 如果当前线程还在等待任务
        1. 本地线程进入阻塞状态,Thread 实例进入 WAITING 状态,并被投递到(XXXXX,肯定是 ForkJOinPoolTask 的队列啊) Task 的某队列,等待被唤醒(LockSupport.unpark)、被中断
        2. 本地线程进入阻塞状态,Thread 实例进入 WAITING 状态后,能响应中断。线程被唤醒,重新获得 CPU 时间片后,会抛出 InterruptedException 异常,并清除 Thread 实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由 ForkJoinPool 处理好了。一旦发生中断,为了不污染我们的代码,ForkJoinPool 会抛出 CancellationException 非受检异常
      2. 如果当前线程未在等待任务(“空闲”)
        1. 如果线程池中的线程的数量 ≤ corePoolSize
          1. 本地线程进入阻塞状态,Thread 实例进入 WAITING 状态,并被投递到 ForkJoinPool 的某队列,等待被唤醒(LockSupport.unpark)、被中断(Thread#interrupt
          2. 本地线程进入阻塞状态,Thread 实例进入 WAITING 状态后,能响应中断。线程被唤醒,重新获得 CPU 时间片后,会抛出 InterruptedException 异常,并清除 Thread 实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由 ForkJoinPool 处理好了。一旦发生中断,我们不会感知到任何异常
        2. 如果线程池中的线程的数量 >corePoolSize
          1. 本地线程进入阻塞状态,Thread 实例进入 TIMED_WAITING 状态,并被投递到 ForkJoinPool 的某队列,等待被唤醒(LockSupport.unpark)、被中断(Thread#interrupt)、阻塞超时
            1. 如果超时,ForkJoinPool 会销毁该线程
          2. 本地线程进入阻塞状态,Thread 实例进入 TIMED_WAITING 状态后,能响应中断。线程被唤醒,重新获得 CPU 时间片后,会抛出 InterruptedException 异常,并清除 Thread 实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由 ForkJoinPool 处理好了。一旦发生中断,我们不会感知到任何异常

我们以 1 累加到 10000 为例,来看一看究竟是怎么执行的:

[!NOTE] 注意事项

  1. 猴哥的烦恼箱 (。•́︿•̀。)
    1. 为什么我们不能采取 ForkJoinPool 的 “一个任务尚未完成,但是可以去执行其他任务” 思想,用于同步阻塞 IO 工作模式下的服务端,从而避免 “伪异步 + 异步编程回调”?
    2. 为什么不 “魔改” Worker,在 Worker 中添加任务队列呢?
    3. 为什么当前线程维护的任务队列中没有其他任务了,也 “窃取” 不到任务了,就要阻塞?
      1. 防止 CPU 空转

4. 构造方法

4.1. public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime,TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* ============================================
* 构造方法:
* public ForkJoinPool(int parallelism,
* ForkJoinWorkerThreadFactory factory,
* UncaughtExceptionHandler handler,
* boolean asyncMode,
* int corePoolSize,
* int maximumPoolSize,
* int minimumRunnable,
* Predicate<? super ForkJoinPool> saturate,
* long keepAliveTime,
* TimeUnit unit)
* --------------------------------------------
* 1. 访问修饰符
* 1. public
* 1. 天下皆公(EveryWhere)
*
* 2. 非访问修饰符
*
* 3. 方法参数
* 1. int parallelism
* 1. 指定我们期望线程池中 ”活跃“ 线程的线程数量
* 2. 所谓的 “活跃” 是指:正在执行任务或正在 ”窃取“ 任务的线程的线程数量
* 3. ForkJoinPool 会尽力维持这个数量,但不能保证
* 2. ForkJoinWorkerThreadFactory factory
* 1. 指定创建 ForkJoinWorkerThread 实例的线程工厂
* 3. UncaughtExceptionHandler handler
* 1. 指定未捕获异常的处理器
* 2. 当一个本地线程在执行任务时因未捕获的异常而终止(异常退出)时,会调用未捕获异常的处理器
* 4. boolean asyncMode
* 1. 指定本地线程从其维护的任务队列获取任务的顺序
* 2. 可选参数包括:
* 1. true
* 1. FIFO
* 2. false
* 1. LIFO
* 5. int corePoolSize
* 1. 指定线程池中 “存活” 的线程被允许的最小数量
* 2. 当某本地线程 “空闲” 的时间超过 keepAliveTime unit,会在销毁前检查该参数,然后再决定是否销毁
* 3. 需要注意的是:该参数可以为 0
* 6. int maximumPoolSize
* 1. 指定线程池中的线程被允许的最大数量
* 7. int minimumRunnable
* 1. 指定线程池中没有调用 ForkJoinTask#join 的线程被允许的最小数量
*
* 1. 指定可运行的工作线程被允许的最小数量
* 2. 如果低于,将创建新的工作线程,直到达到 maximumPoolSize
* 8. Predicate<? super ForkJoinPool> saturate
* 1. 指定饱和策略
* 2. 当没有调用 ForkJoinTask#join 的线程低于 minimumRunnable,但是也达到了 maximumPoolSize 时,会调用饱和策略
* 9. long keepAliveTime
* 1. 指定线程 “空闲”被允许的最长时间为:keepAliveTime unit
* 2. 所谓的 “空闲” 是指:某本地线程维护的任务队列没有任务了,并且也 “窃取” 不到任务了
* 3. 当某本地线程 “空闲” 的时间超过 keepAliveTime unit,
* 10. TimeUnit unit
* 1. 指定线程 “空闲”被允许的最长时间为:keepAliveTime unit
*
* 4. 抛出异常
*
* 5. 构造示例
*
* ============================================
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode,
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate<? super ForkJoinPool> saturate,
long keepAliveTime,
TimeUnit unit) {

checkPermission();
int p = parallelism;
if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
throw new IllegalArgumentException();
if (factory == null || unit == null)
throw new NullPointerException();
this.factory = factory;
this.ueh = handler;
this.saturate = saturate;
this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
this.mode = p | (asyncMode ? FIFO : 0);
this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
this.registrationLock = new ReentrantLock();
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";

}

源码:java.util.concurrent.ForkJoinPool 源码解析
https://wangjia5289.github.io/2025/11/14/源码:java.util.concurrent.ForkJoinPool源码解析/
Author
咸阳猴🐒
Posted on
November 14, 2025
Licensed under