专栏原创出处:github-源笔记文件 (opens new window)github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。

Java 并发编程专栏系列笔记,系统性学习可访问个人复盘笔记-技术博客 Java 并发编程 (opens new window)

# Condition 是什么

Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition 对象关联的锁。

Condition 对象是由 Lock 对象 (调用 Lock 对象的 newCondition() 方法) 创建出来的,Condition 是依赖 Lock 对象的。

# Condition 提供方法说明

public interface Condition {

    // 当前线程进入等待。
    // 其他线程调用该 Condition 的 signal/signalAll 方法是被唤醒
    // 其他线程调用 interrupt 方法中断当前线程
    // 如果当前等待线程从 await 返回,表示该线程已经获取了 Condition 对象所在的锁
    void await() throws InterruptedException;
    
    // 在 await 方法基础上取消了响应中断的处理
    void awaitUninterruptibly();
    
    // 在 await 方法基础上支持超时返回
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    
    // 在 await 方法基础上支持超时返回
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    
    // 在 await 方法基础上支持超过截止时间返回
    boolean awaitUntil(Date deadline) throws InterruptedException;


    // 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁
    void signal();
    
    // 唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁
    void signalAll();
}

# Condition 实现分析

# Condition 实现分析-等待队列

  • 同步队列:同步队列是 AQS 中等待获取同步状态的队列。请阅读参考《锁-AbstractQueuedSynchronizer 介绍》及原理两篇内容。
  • 等待队列:等待队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程。

Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。 如果一个线程调用了 Condition.await 方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

    public class ConditionObject implements Condition, java.io.Serializable {
        /** 首节点. */
        private transient Node firstWaiter;
        /** 尾节点. */
        private transient Node lastWaiter;
        ...
    }

与监视器(wait/notify)的队列区别:

  • 在 Object 的监视器(wait/notify)模型上,一个对象拥有一个同步队列和等待队列
  • 并发包中的 Lock(确切地说是同步器 AQS) 拥有一个同步队列和多个等待队列

# Condition 实现分析-等待

调用 Condition 的 await 方法 (或以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。
当从 await 方法返回时,当前线程一定获取了 Condition 相关联的锁。

如果从队列的角度看 await 方法,当调用 await 方法时,相当于同步队列的首节点 (获取了锁的节点) 移动到 Condition 的等待队列中。

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); // 当前线程加入等待队列
        int savedState = fullyRelease(node); // 释放同步状态,也就是释放锁
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this); // 进入等待状态
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }

        // 唤醒节点的线程开始尝试获取同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

主要逻辑为:

  • 调用 await 方法的线程已经成功获取了锁的线程,也就是同步队列中的首节点。
  • 该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
  • 当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。
  • 如果不是通过其他线程调用 Condition.signal 方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException。

# Condition 实现分析-通知

调用 Condition 的 signal 方法,将会唤醒在等待队列中等待时间最长的节点 (首节点),在唤醒节点之前,会将节点移到同步队列中。

    public final void signal() {
        if (!isHeldExclusively()) //  当前线程必须是获取了锁的线程
           throw new IllegalMonitorStateException();
        Node first = firstWaiter; // 获取等待队列的首节点
        if (first != null)
            doSignal(first);
    }

    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
    }    
    
    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        Node p = enq(node); // 等待队列中的头节点线程安全地移动到同步队列
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 节点状态更改为待唤醒状态
            LockSupport.unpark(node.thread); // 唤醒节点
        return true;
    }    

主要逻辑为:

  • 调用 signal 方法条件是当前线程必须获取了锁,因此做了 isHeldExclusively 检查。
  • 调用同步器的 enq 方法,等待队列中的头节点线程安全地移动到同步队列。
  • 当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。
  • 被唤醒后的线程,将从 await 方法中的 while 循环中退出(isOnSyncQueue 方法返回 true,节点已经在同步队列中),进而调用同步器的 acquireQueued 方法加入到获取同步状态的竞争中。
  • 成功获取同步状态 (或者说锁) 之后,被唤醒的线程将从先前调用的 await 方法返回后继续执行后续的代码。

Condition 的 signalAll 方法,相当于对等待队列中的每个节点均执行一次 signal 方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

# 实现总结

正常情况下,一块并发执行的代码,某个线程拿到锁后开始执行,全部执行完成后释放锁。如果没有等待通知机制我们只能中断线程来让他退出,但是中断的条件比较难以判断。而且中断后,剩余的逻辑无法执行。

类似于(wait/notify)的经典范式。锁机制聚合实现 AQS 提供了 Condition 条件,我们可以像(wait/notify)机制一样,在某个条件下等待,条件满足后唤醒,线程之间就可以通信交流了。

# 总结思考

  • 为什么要设计 Condition,Object 的监视器(wait/notify)不能直接用在 Lock 上吗 ?

    • wait/notify 通过配合 synchronized 的 Monitor 对象实现同步队列等待队列的操作。
    • Condition 通过 AQS 内部实现的同步队列等待队列的操作。
  • 一个 Lock 返回多个 Condition 有什么用(lock.newCondition 每次创建一个 Condition 实例)?

    • 多个 Condition 条件可以互相配合执行,我们把每个条件抽象为一个等待队列,这些等待队列什么时候加入同步队列是由其他的条件决定的。
    • e.g. 生产和消费是两个条件,生产力过剩时,生产需要等待,生产完成后通知消费者继续消费。消费品为空时,消费需要等待,消费完成后通知生产者继续生产。
  • Condition/ReentrantLock 实现的多生产者消费者 (opens new window)

# 参考

  • 有关线程等待操作相关操作区别,参考本专栏《线程等待操作(sleep、wait、park、Condition)区别》
  • 并发编程的艺术
最后修改时间: 2/17/2020, 4:43:04 AM