源码:java.util.concurrent.ForkJoinPool 源码解析
1. TODO
2. 脑图
Xmind
Edraw
Hexo 地址
👉 http://blog.wangjia.ink/2025/11/14/源码:java.util.concurrent.ForkJoinPool源码解析/
3. 基础部分
3.1. ForkJoinPool 概述
ForkJoinPool 是一个具体类,继承了 java.util.concurrent.AbstractExecutorService
ForkJoinPool 被设计用于高效执行那些可以被 ”递归拆分“ 的任务,其核心特性是:
- 分而治之
- 递归拆分
- 工作窃取
[!NOTE] 注意事项
- 详见源码:
AbstractExecutorService- 对于
ForkJoinPool、ForkJoinTask<V>、RecursiveAction、RecursiveTask<V>,可以说是整个JUC中最复杂的一部分,Doug Lea当初为它们投入了大量心血。对于我们而言,只需要掌握它们的使用方式即可,没必要对源码进行过度深究
3.2. ForkJoinPool 相关流程
我们知道:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的线程执行流程大致为:本地线程 ➔ Thread#run ➔ Worker#run
而 ForkJoinPool 并没有使用 Worker,甚至没有使用原生的 Thread,而是直接使用了 ForkJoinWorkerThread(ForkJoinWorkerThread 继承了 Thread)
你可能会疑惑:为什么要让 ForkJoinWorkerThread 继承 Thread,而不是继续使用 Worker 或者在 Worker 的基础上 “魔改” 呢?原因很简单,因为我们要为每个线程单独维护一个任务队列
每个线程都会维护一个 ForkJoinPool.WorkQueue,这是一个双端队列。并且这些队列都会被注册到 ForkJoinPool->queues 中
当线程去执行一个大任务 A 时,可以把 A 拆分成多个子任务(例如 B、C、D),这些子任务会被投递到该本地线程维护的任务队列中。然后你是不是认为 “线程需要阻塞等待 B、C、D 的完成呢?”
接下来的一部分,是 ForkJoinPool 设计最精妙的地方:
- 当一个任务
A被拆分成子任务B、C、D,并被投递到任务队列后,按理说当前线程应该阻塞并等待B、C、D的完成。然而并非如此:当前线程确实在等待,但并没有阻塞。相反,它会继续执行当前线程维护的任务队列中的其他任务 - 假如执行到了
B,在执行B的过程中,B又可以进一步拆分成更小的子任务E、F、G,并被投递到任务队列后,同样在等待子任务的结果的同时,当前线程并不会立即阻塞,而是会继续执行当前线程维护的任务队列中的其他任务。以此类推…直到任务被拆分到足够小 - 当子任务完成后,它的结果会返回给父任务。父任务在收到所有子任务结果后,再将汇总结果返回给自己的父任务。以此类推…直到最初的大任务最终得到完整的计算结果
- 需要注意的是:
- 如果当前线程维护的任务队列中没有其他任务了,它会根据
ForkJoinPool->queues随机选择一个其他线程维护的任务队列,从队尾 “窃取” 一个任务来执行 - 如果当前线程维护的任务队列中没有其他任务了,也 “窃取” 不到任务了,就会有以下两种情况:
- 如果当前线程还在等待任务
- 本地线程进入阻塞状态,
Thread实例进入WAITING状态,并被投递到(XXXXX,肯定是 ForkJOinPoolTask 的队列啊)Task的某队列,等待被唤醒(LockSupport.unpark)、被中断 - 本地线程进入阻塞状态,
Thread实例进入WAITING状态后,能响应中断。线程被唤醒,重新获得CPU时间片后,会抛出InterruptedException异常,并清除Thread实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由ForkJoinPool处理好了。一旦发生中断,为了不污染我们的代码,ForkJoinPool会抛出CancellationException非受检异常
- 本地线程进入阻塞状态,
- 如果当前线程未在等待任务(“空闲”)
- 如果线程池中的线程的数量 ≤
corePoolSize- 本地线程进入阻塞状态,
Thread实例进入WAITING状态,并被投递到 ForkJoinPool 的某队列,等待被唤醒(LockSupport.unpark)、被中断(Thread#interrupt) - 本地线程进入阻塞状态,
Thread实例进入WAITING状态后,能响应中断。线程被唤醒,重新获得CPU时间片后,会抛出InterruptedException异常,并清除Thread实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由ForkJoinPool处理好了。一旦发生中断,我们不会感知到任何异常
- 本地线程进入阻塞状态,
- 如果线程池中的线程的数量 >
corePoolSize- 本地线程进入阻塞状态,
Thread实例进入TIMED_WAITING状态,并被投递到ForkJoinPool的某队列,等待被唤醒(LockSupport.unpark)、被中断(Thread#interrupt)、阻塞超时- 如果超时,
ForkJoinPool会销毁该线程
- 如果超时,
- 本地线程进入阻塞状态,
Thread实例进入TIMED_WAITING状态后,能响应中断。线程被唤醒,重新获得CPU时间片后,会抛出InterruptedException异常,并清除Thread实例的中断状态(异常退出或正常退出(发生异常),要看我们是否对该异常进行捕获并处理)。不过我们无需对此过多关注,因为相关逻辑已由ForkJoinPool处理好了。一旦发生中断,我们不会感知到任何异常
- 本地线程进入阻塞状态,
- 如果线程池中的线程的数量 ≤
- 如果当前线程还在等待任务
- 如果当前线程维护的任务队列中没有其他任务了,它会根据
我们以 1 累加到 10000 为例,来看一看究竟是怎么执行的:
[!NOTE] 注意事项
- 猴哥的烦恼箱
(。•́︿•̀。):
- 为什么我们不能采取
ForkJoinPool的 “一个任务尚未完成,但是可以去执行其他任务” 思想,用于同步阻塞IO工作模式下的服务端,从而避免 “伪异步 + 异步编程回调”?- 为什么不 “魔改”
Worker,在Worker中添加任务队列呢?- 为什么当前线程维护的任务队列中没有其他任务了,也 “窃取” 不到任务了,就要阻塞?
- 防止 CPU 空转
4. 构造方法
4.1. public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime,TimeUnit unit)
1 | |
