源码:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject 源码解析

1. TODO


2. 脑图

  1. Xmind

  2. Edraw

  3. Hexo 地址
    👉 http://blog.wangjia.ink/2025/08/31/源码:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject源码解析/


3. 基础部分

3.1. AbstractQueuedSynchronizer.ConditionObject 概述

ConditionObject 是一个具体类,是 java.util.concurrent.locks.AbstractQueuedSynchronizer. 的普通内部类,实现了 java.util.concurrent.locks.Conditionjava.io.Serializable

ConditionObject 维护了一个由 AbstractQueuedSynchronizer.ConditionNode 构建的,基于单向链表的队列。所谓的 “AQS 的条件队列” 其实就是在说 ConditionObject 维护的这条件队列

[!NOTE] 注意事项

  1. 详见源码:AbstractQueuedSynchronizer
    1. obsidian 内部链接:
      1. 源码:java.util.concurrent.locks.AbstractQueuedSynchronizer源码解析
    2. Hexo 链接:
      1. http://blog.wangjia.ink/2025/09/01/源码:java.util.concurrent.locks.AbstractQueuedSynchronizer源码解析/
  2. 详见源码:Condition
    1. obsidian 内部链接:
      1. 源码:java.util.concurrent.locks.Condition源码解析
    2. Hexo 链接:
      1. http://blog.wangjia.ink/2025/08/31/源码:java.util.concurrent.locks.Condition源码解析/
  3. 详见源码:Serializable
    1. obsidian 内部链接:
      1. 源码:java.io.Serializable源码解析
    2. Hexo 链接:
      1. http://blog.wangjia.ink/2025/10/28/源码:java.io.Serializable源码解析/

4. 核心属性

1
2
3
4
5
// 表示 AQS 的条件队列的首哨兵节点
private transient ConditionNode firstWaiter;

// 表示 AQS 的条件队列的尾哨兵节点
private transient ConditionNode lastWaiter;

5. 具体方法

5.1. 具体方法(无)

5.1.1. private int enableWait(ConditionNode node)

private int enableWait(ConditionNode node) 用于 把当前线程包装成一个 ConditionNode,然后将该节点链接到条件队列(单向链表)中,并释放锁资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* ============================================
* 1. 访问修饰符
* 1. private
* 1. 独守秘密(类内部)
* 2. 由于是别人封装好的,类内部是做不到了,访问和操作是做不到了
*
* 2. 非访问修饰符
*
* 3. 方法参数
* 1. ConditionNode node
* 1. 一个空的 ConditionNode,当前线程就是包装在这个 Node 中
*
* 4. 返回值
* 1. 在进入 await() 之前,同步器(AQS)的 state 值
*
* 5. 抛出异常
* 1. IllegalMonitorStateException
*
* 6. 使用示例
*
* ============================================
*/
private int enableWait(ConditionNode node) {

// 如果独占锁是否被当前线程持有(Condition 是专门用于独占锁的)
if (isHeldExclusively()) {

// 将当前线程封装到 Node 的 waiter 中
node.waiter = Thread.currentThread();

// 将 node 的状态设置为 COND(条件队列) + 等待中
node.setStatusRelaxed(COND | WAITING);

// 将 Node 链接到条件队列中(单向链表)
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;

// 记录当前 state
int savedState = getState();

// 让 state 清零,彻底释放
if (release(savedState))
return savedState;
}
// 如果独占锁不是被当前线程持有,则将该 Node 的状态设置为 CANCELLED 并抛出 throw new IllegalMonitorStateException();
node.status = CANCELLED;
throw new IllegalMonitorStateException();

}

5.1.2. private boolean canReacquire(ConditionNode node)

private boolean canReacquire(ConditionNode node) 用于 判断某个在 Condition 上等待的节点,被 Conditionsignal 后,是否已经链接到同步队列中

需要注意的是,线程在被 Condition 的 signal() 唤醒后,按设计一定会转移到同步队列中去竞争锁。但在高并发环境下,节点的入队过程并非一步到位,可能出现链表指针尚未完全修复或节点已被取消等等 “中间态”。因此,需要通过这个方法进行校验,确保节点确实已经稳定地进入同步队列,才算真正具备重新竞争锁的资格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean canReacquire(ConditionNode node) {
Node p;
/**
* node != null 是最基础的检查,传入 Node 不能为 null
*
* (p = node.prev) != null 用于检查这个节点是否有前驱,
* 因为在 AQS 的同步队列中,除了头节点,其他节点都必须有 prev
* 如果 prev 为 null,说明它不在同步队列里
* 不要猜测该节点是否是队头节点,因为同步队列一定会存在一个头节点,而且是一个哑节点
*
* (p.next == node || isEnqueued(node)) 中的 p.next == node
* 是看这个节点的前驱节点的 next 是否正确指向自己,如果是,说明这个节点确实在同步队列中
* 但是因为在高并发下,节点刚被加进队列,可能还没来得及完全修复 p.next,这种情况下,单靠这个判断会出错
* 所以通过 isEnqueued(node) 来遍历确认它确实在同步队列中
*
*/
return node != null && (p = node.prev) != null &&
(p.next == node || isEnqueued(node));
}

5.1.3. private void doSignal(ConditionNode first, boolean all)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
// 把 firstWaiter 指向第二个节点
if ((firstWaiter = next) == null)
lastWaiter = null;
/**
* 把 first 节点上的 COND 标志清掉,并返回旧值
*
* 如果返回 COND,说明 first 节点还处于条件等待队列,进行 if 逻辑
*
* 进行这个检查的原因是,节点可能在 signal 之前就被取消了(超时/中断),就没资格进入同步队列了
*/
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 将 first 节点从条件队列转移到同步队列
enqueue(first);
// 如果不是 signalAll,不需要唤醒其他节点,退出 while 循环
if (!all)
break;
}
// 如果是 signalAll,将 first = next,继续 while 循环
first = next;
}
}

5.2. 具体方法(实现)

5.2.1. Condition 中的方法实现

5.2.1.1. public final void await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this);
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait();
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}

5.2.1.2. public final void awaitUninterruptibly()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final void awaitUninterruptibly() {

// 把当前线程包装成一个 ConditionNode,然后将该节点链接到条件队列(单向链表)中,并释放锁资源
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);

// 把 “阻塞原因” 对象(arg)写到当前 Thread 对象的内部字段 parkBlocker 上
LockSupport.setCurrentBlocker(this);

boolean interrupted = false, rejected = false;

// 如果某个在 Condition 上等待的节点,还没有被链接到同步队列中
while (!canReacquire(node)) {

// 如果被中断,设置中断标志位
if (Thread.interrupted())
interrupted = true;

// 如果 Node 的 Status 还有 COND,说明确实还在条件队列中,进入循环
else if ((node.status & COND) != 0) {
try {
// 如果不是使用 ForkJoinPool 只能控制何时阻塞,则立即进行阻塞
if (rejected)
node.block();
// 否则说明使用了 ForkJoinPool,使用 ForkJoinPool 的 managedBlock 智能控制何时阻塞
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
/**
* 如果 Node 的 status 不含 COND,可能是已移出条件队列进入同步队列了(因为这个流程可能非常快,上一刻还在条件队列,下一刻就在同步队列了)
* 在这种临界状态,调用 Thread.onSpinWait() 提示 CPU 进入高效自旋,再次调用 canReacquire(node) 看是否已经在同步队列了
*/
} else
Thread.onSpinWait();
}
// 如果某个在 Condition 上等待的节点,已经被链接到同步队列中,继续执行
// 把当前 Thread 对象的内部字段 parkBlocker 上的 “阻塞原因” 对象置为 null
LockSupport.setCurrentBlocker(null);

// 清除 Node 的 Status
node.clearStatus();

// 不断重试获取锁
acquire(node, savedState, false, false, false, 0L);

// 在方法执行过程中,可能导致中断标志位被清除,这里恢复中断标志位
if (interrupted)
Thread.currentThread().interrupt();
}

5.2.1.3. public final long awaitNanos(long nanosTimeout)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
long deadline = System.nanoTime() + nanos;
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
(nanos = deadline - System.nanoTime()) <= 0L) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else
LockSupport.parkNanos(this, nanos);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted)
throw new InterruptedException();
} else if (interrupted)
Thread.currentThread().interrupt();
long remaining = deadline - System.nanoTime(); // avoid overflow
return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}

5.2.1.4. public final boolean await(long time, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
long deadline = System.nanoTime() + nanos;
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
(nanos = deadline - System.nanoTime()) <= 0L) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else
LockSupport.parkNanos(this, nanos);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted)
throw new InterruptedException();
} else if (interrupted)
Thread.currentThread().interrupt();
return !cancelled;
}

5.2.1.5. public final boolean awaitUntil(Date deadline)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
System.currentTimeMillis() >= abstime) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else
LockSupport.parkUntil(this, abstime);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted)
throw new InterruptedException();
} else if (interrupted)
Thread.currentThread().interrupt();
return !cancelled;
}

5.2.1.6. public final void signal()
1
2
3
4
5
6
7
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}

5.2.1.7. public final void signalAll()
1
2
3
4
5
6
7
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}


源码:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject 源码解析
https://wangjia5289.github.io/2025/08/31/源码:java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject源码解析/
Author
咸阳猴🐒
Posted on
August 31, 2025
Licensed under