沧澜的博客

芝兰生于幽谷,不以无人而不芳


  • 首页

  • 归档

  • 分类

  • 标签

  • 搜索
软件思想 SpringBoot 领域驱动设计 算法 中间件 计算机网络 MySQL 数据库 javascript 极客时间 分布式架构 Jenkins JVM 多线程 Java基础 CentOS安装 编译OpenJDK 持续集成 杂谈

抽象资源同步器框架AQS原理(三)——条件队列

发表于 2020-12-24 | 分类于 多线程 | 0 | 阅读次数 332

条件队列是什么

背景

了解这个问题之前,我们先来了解下为什么需要wait与notify?

网上有一个经典的栗子:图书馆有一本《Java并发编程艺术》,张三首先借走了(持有锁),李四也想借这个本书,但是不能每天都去图书馆看一下(轮训)张三是否归还,张三也不是每天都来说我还要看几天,这时候就需要一个等待(wait)通知(notify)机制,只要张三看完了归还了再通知李四去借

可以简单说,等待通知机制就是避免轮询造成的性能影响,若轮训造成的性能影响大于线程上下文切换和调度延时(等待过程),使用wait与notify更佳,否则直接轮训即可(加锁、释放锁也会导致比较多的上下文切换和调度延时),举个极端栗子:假如上文中 张三 一天就看完了这本书李四也不需要等待通知自然在第二天就能借到这本书

CAS无锁模式就是一种极端设计,因为在大部分场景下,加锁执行的独占逻辑很快并且轮训消耗比等待唤醒更小,需要根据业务分析应该选择哪一种模式

而条件队列和wait与notify都是一样的思想,并且前者改进并新增了一些方法,支持特定多条件单独唤醒等,下面我们借着小栗子来学习下条件队列


条件队列

条件队列是AbstractQueuedSynchronizer的内部类ConditionObject实现的,同步队列和条件队列都是成储存等待状态的线程的队列,从前文也可以知道同步队列唤醒后是可以抢占资源的,而条件队列是不能直接去获取,而是从条件队列转换为同步队列后排队获取。同步队列的唤醒后是线程去尝试获取锁,而条件队列的唤醒后则是把线程从条件队列移到同步队列中

image.png

当然一个线程要么是在同步队列中,要么是在条件队列中,不可能同时存在这两个队列里面,在进入条件队列前,是一个已占有资源状态,而从条件队列出来时才会进入同步队列中,这里我们继续借用ReentrantLock来了解条件队列的执行流程


场景分析

下面代码中是两个线程交替输出数字和字母的例子(下面叫他们两个线程分别为字母线程和数字线程),因为是交替输出,所以它们对应两个Condition条件:字母输出、数字输出,每次由数字输出后,就需要阻塞当前线程然后交给字母线程输出,直到字母线程输出后唤醒数字线程,数字线程继续输出...如此交替唤醒,直到数字线程输出完成后,数字线程结束,且此时不再需要阻塞字母线程,直到字母线程也输出完毕,流程结束。

    public static void main(String[] args) {
        String number = "1234567890";
        String letter = "abcdefghijklmnopqrstuvwxyz";
        char[] numberChars = number.toCharArray();
        char[] letterChars = letter.toCharArray();

        ReentrantLock lock = new ReentrantLock();
        Condition nCondition = lock.newCondition();
        Condition lCondition = lock.newCondition();

        new Thread(() -> {
	    // 循环输出数字
            for (int i = 0; i < numberChars.length; i++) {
                lock.lock();
                try {
                    System.out.println("==== number : " + numberChars[i] + " =====");
                    lCondition.signal();
                    nCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }).start();

        new Thread(() -> {
            // 等待一个start的信号 TODO
	    // 循环输出字母
            for (int i = 0; i < letterChars.length; i++) {
                lock.lock();
                try {
                    System.out.println("==== letter : " + letterChars[i] + " =====");
                    nCondition.signal();
                    if (i < numberChars.length - 1) {
                        lCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }).start();
    }

上面代码已经小节过一次,这里不再赘述,字母线程和数字线程分别会拿到资源进行对应的输出,并唤醒对方

在未调用Condition.await()方法时,完全是独占锁逻辑,我们已经很熟悉不再赘述,此时AQS的内存结构如下

image.png

源码解析

JDK版本 8u251

构造函数
    // 调用sync对象的newCondition方法
    public Condition newCondition() {
        return sync.newCondition();
    }
    // 每调用一次newCondition就返回一个新的ConditionObject对象	
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    public class ConditionObject implements Condition, java.io.Serializable {
        /**
         * First node of condition queue.
         */
        private transient Node firstWaiter;
        /**
         * Last node of condition queue.
         */
        private transient Node lastWaiter;
    }

在ReentrantLock总每次调用newCondition就会返回新的ConditionObject对象,ConditionObject是唯一 一个实现了Condition的实现类,firstWaiter和lastWaiter分别代表条件队列的队头和队尾。这里我们从await方法看起

await方法
    public final void await() throws InterruptedException {
        // 如果线程已经中断,则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 包装当前线程为Node节点 放到队列末尾
        Node node = addConditionWaiter();
        // 折起 下面看...
        ...
    }

上面逻辑主要有这么几步:

  1. 判断当前线程是否中断,如果是中断则抛出中断异常(同理,awaitUninterruptibly是不响应中断的)
  2. 调用addConditionWaiter将当前节点包装成Node节点,并放入条件队列末尾,如下:
    // 初始化Condition队列,包装当前线程成Node节点并放到尾结点 返回当前节点
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 如果尾结点不为CONDITION状态 需要进行清理
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            // 上面方法可能删除了部分节点 包括尾节点 更新lastWaiter 节点
            t = lastWaiter;
        }
        /**
         * 包装当前线程为一个Node节点,并且节点状态为CONDITION
         *  t是尾结点,如果为null说明未进行初始化 将first指向当前节点代表初始化
         *  如果不为空 则将当前节点 赋值为 next
         */
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

addConditionWaiter方法首先会判断条件队列尾结点是否是CONDITION状态,在条件队列中节点初始化状态就为CONDITION,如果尾结点不为CONDITION说明此时有现成取消,unlinkCancelledWaiters方法就是删除条件队列中已经取消的节点;随后将当前线程包装成一个Node节点,并且节点的状态为CONDITION(-2),并添加到条件队列的尾节点返回

    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            /**
             * 从头开始遍历 trail用来保存上一个节点CONDITION状态的节点
             *  如果当前Node不为CONDITION
             *   则将上一个节点的nextWaiter指向当前节点的next 即删除当前节点
             */
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            } else
                trail = t;
            t = next;
        }
    }

unlinkCancelledWaiters方法如果代码中注释所说:从头开始遍历条件队列,删除所有不为CONDITION状态的节点,而trail用来保存上一个节点CONDITION状态的节点

随后返回到await方法中:

    public final void await() throws InterruptedException {
        // 如果线程已经中断,则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 包装当前线程为Node节点 放到队列末尾
        Node node = addConditionWaiter();
        // 调用release方法释放当前线程资源,会唤醒其他线程抢占资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 判断当前Node是否在同步队列上
        while (!isOnSyncQueue(node)) {
            // 如果不在同步队列上 就直接挂起
            LockSupport.park(this);
            /**
             * 这里会有一个判断是否中断 因为如果中断,是会直接从park中返回
             *  checkInterruptWhileWaiting 检测是否中断
             *   0 返回没有被中断过
             *   THROW_IE 在signal流程前取消 需要抛出异常
             *   REINTERRUPT 当signal流程发生在中断流程之前时 此时不需要 需要对节点状态取消
             */
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 下面的代码是其他线程唤醒后 执行 折叠 等下看
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

接上面步骤: 3. 调用fullyRelease方法释放当前所有线程的所有资源,fullyRelease会调用release方法释放当前线程的所有资源,返回线程占有的资源数,代码如下:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            /**
             * 获取当前state数 调用release释放资源 并返回线程占有的资源数
             */
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            // 如果释放中抛出异常,则会将当前节点置为CANCELLED
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

fullyRelease方法中没有特别特殊的逻辑,对release方法可以查看第一篇独占锁部分解读,这里我们回到await方法:

  1. isOnSyncQueue方法会判断当前线程是否在同步队列中,第一次进来当然不在同步队列会直接进入park方法挂起,因为一旦其他线程调用signal拯救当前线程后,条件队列的头节点就会进入同步队列,并且不需要挂起在本方法这里

下面我们看一下isOnSyncQueue方法

    final boolean isOnSyncQueue(Node node) {
        /**
         * 如果第一次进入 肯定是CONDITION状态 直接返回false的
         *  因为 fullyRelease 会取消所有占用资源
         * node.prev 为空说明不在同步队列 因为同步队列创建节点前会先设置前驱节点
         */
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        /**
         * 后续其他线程调用signal时 会将此线程的Node节点入同步队列
         *  如果node后继不为空,说明已经在sync queue
         */
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        /**
         * 现在发现node处于一个状态:前驱不为空,但后继为空
         *  如果node是当前队尾肯定也是这种状态 enq进队尾时CAS设置tail失败时
         * 这里就从头检查同步队列中是否包含当前节点
         */
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (; ; ) {
            /**
             * 从尾遍历 寻找传入节点是否存在
             */
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

isOnSyncQueue方法逻辑比较清晰,上面两个if条件主要是快速判断当前节点是否存在于同步队列中,最下面这个方法主要是因为尾节点加入时非原子性,这里倒序遍历和独占锁唤醒有效线程逻辑一致;如果存在在同步队列中则返回true


我们继续回到await方法上:

    public final void await() throws InterruptedException {
	// 上面折叠... 
	...
        int interruptMode = 0;
        // 判断当前Node是否在同步队列上
        while (!isOnSyncQueue(node)) {
            // 如果不在同步队列上 就直接挂起
            LockSupport.park(this);
            /**
             * 这里会有一个判断是否中断 因为如果中断,是会直接从park中返回
             *  checkInterruptWhileWaiting 检测是否中断
             *   0 返回没有被中断过
             *   THROW_IE 在signal流程前取消 需要抛出异常
             *   REINTERRUPT 当signal流程发生在中断流程之前时 此时不需要 需要对节点状态取消
             */
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 下面的代码是其他线程唤醒后 执行 折叠 等下看
  	...
    }

接上await方法分析:

  1. isOnSyncQueue判断返回false代表不在同步队列上,就直接挂起,也是线程退出挂起模式的唯一条件;第一次线程进来肯定就会直接挂在这里了,需要其他线程调用signal救场;当然还有一种情况是当前挂起线程被其他线程调用了中断方法,会让当前线程直接从park中结束,而checkInterruptWhileWaiting方法就是检测当前线程是否被中断
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    final boolean transferAfterCancelledWait(Node node) {
        /**
         * 线程是否通过signal的标志是状态是否为0 如果为CONDITION
         *  说明线程是在await中中断的,那么需要入队,交给同步队列acquireQueued方法去处理
         * 否则是在signal之后中断的
         */
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        // 走到这里说明是其他线程调用了signal方法,那么自旋等待该节点入队完成
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

如果线程是中断返回,那么则需要判断是在其他线程调用signal之前还是之后取消,因为如果是在调用signal之后被取消,那么此时有其他线程会执行enq(node)将其入队同步队列,如果是signal之前那么需要在这里自己调用一次enq(node)入队,交给acquireQueued处理中断响应逻辑


经过await方法的LockSupport.park(this)方法后,本线程就会直接阻塞在这里,比如案例中,我们的数字线程调用await方法后,就会直接阻塞在这里,一直等待字母线程调用该条件的signal方法,我们画张图来理解下:

image.png

此时会换成字母线程输出,输出之后就会调用Condition.signal方法,继续源码解析


signal方法
    public final void signal() {
        /**
         * signal 方法 和await一样 调用前会判断当前线程是否持有锁
         *  如果不是持有锁状态 抛出IllegalMonitorStateException异常
         *
         * 随后就会 signal 将条件队列第一个节点加入同步队列
         */
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

signal方法就和notify方法功能类似,来通知被阻塞线程,当然,这里需要判断此线程调用signal方法时是获独占锁状态的,否则抛出IllegalMonitorStateException异常;随后就会拿到条件队列中的firstWaiter,如果不为空将唤醒它

    private void doSignal(Node first) {
        do {
            /**
             * 将first的next指向当前firstWaiter 如果next为空
             *  说明队列也为空,将lastWaiter更新为空
             * 如果不为空,那么将first的nextWaiter设置为空
             *  且调用transferForSignal将当前线程节点加入同步队列 可能唤醒头结点线程
             * 唤醒后也是进入acquireQueued方法去抢占资源了 交给了独占锁逻辑处理
             */
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
            /**
             *  transferForSignal 主要 加入同步队列 (释放信号) 可能唤醒线程
             *
             *  如果transferForSignal更新失败 说明有其他线程取消了
             *   返回false 那么此时拿到最新的firstWaiter 继续循环
             */
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }

doSignal方法是一个循环,主要逻辑有:

  1. 判断first节点是否还有后继节点,如果没有说明将队列将更新为空队列(因为当前first要出队了),如果有则将first的nextWaiter赋值给firstWaiter,即删除头结点
  2. 调用transferForSignal方法将当前节点加入到同步队列,并可能会唤醒当前节点的线程。如果加入失败,说明此时有其他线程并发操作取消了,重新拿取firstWaiter进行重试
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        /**
         * 如果在等待队列中 并且没有被其他线程取消的话就是CONDITION状态
         *  这里使用CAS的方式操作就是防止其他线程取消操作 如果其他线程已经更新为CANCEL
         *  此时就不在尝试 返回false
         * 这里不存在并发场景 因为调用transferForSignal之前会判断独占锁状态
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        /**
         * 这里 enq方法返回的是 node的前驱节点
         *  如果前驱节点的waitStatus大于0说明前驱节点取消了
         *   那么直接唤醒当前线程
         *  如果 ws > 0 不满足 即  ws <= 0 那么 就需要将前驱节点更新为SIGNAL
         *  为什么要这么做呢?
         *   因为acquireQueued方法保证了每个入队的节点都保证前驱为SIGNAL来标识后续节点处于等待状态
         *   这里compareAndSetWaitStatus(p, ws, Node.SIGNAL)和LockSupport.unpark(node.thread)
         *    都是为了间接保证当前入队节点的前驱是一个SIGNAL状态
         *   因为唤醒当前线程后 在await方法中 acquireQueued方法就会执行抢占资源和更新前驱节点状态的逻辑
         * 如果是当前线程更新首节点状态成功,入同步队列 那么就返回true
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

transferForSignal方法比较长,但实际上就两步:

  1. 将当前节点的状态从CONDITION更新为0,如果失败就代表其他线程调用了取消,操作失败返回false,重新获取头结点
  2. 上面操作成功,代表只有当前线程操作doSignal方法,调用enq(node)方法会将当前线程加入到同步队列,并返回前驱节点,如果前驱节点状态大于0说明前驱已经取消,则调用LockSupport.unpark(node.thread)唤醒当前线程去进入acquireQueued方法删除取消节点并尝试抢占资源;如果前驱节点未取消会尝试使用CAS的方式更新前驱节点的状态为SIGNAL,失败则会唤醒节点线程进入acquireQueued方法更新前驱节点的状态,保证入队的节点前驱节点必定为SIGNAL(因为前驱节点唤醒后置节点的条件是节点状态小于等于0);(虽然会引起不必要的唤醒,但是acquireQueued方法是兼容这种情况的)

一旦线程从条件队列入队到同步队列,那么其他线程在释放资源后就会唤醒同步队列的第一个有效节点线程,并且回到await中LockSupport.park(this)处醒来:

    public final void await() throws InterruptedException {
        // 如果线程已经中断,则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 包装当前线程为Node节点 放到队列末尾
        Node node = addConditionWaiter();
        // 调用release方法释放当前线程资源,会唤醒其他线程抢占资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 判断当前Node是否在同步队列上
        while (!isOnSyncQueue(node)) {
            // 如果不在同步队列上 就直接挂起
            LockSupport.park(this);
            /**
             * 这里会有一个判断是否中断 因为如果中断,是会直接从park中返回
             *  checkInterruptWhileWaiting 检测是否中断
             *   0 返回没有被中断过
             *   THROW_IE 在signal流程前取消 需要抛出异常
             *   REINTERRUPT 当signal流程发生在中断流程之前时 此时不需要 需要对节点状态取消
             * 并且会调用transferAfterCancelledWait方法将中断线程加入同步队列
             */
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        /**
         * 从上面循环跳出有两种情况
         *  1. 线程中断 此时需要检测是在什么时候发生的 checkInterruptWhileWaiting 进行相应的处理
         *  2. 被其他线程调用signal方法加入同步队列唤醒,此时已经在同步队列中 isOnSyncQueue 返回true
         *  上面两个情况都会在同步队列中 那么进入下个acquireQueued方法中都会兼容处理上面两种情况
         *  并且将fullyRelease释放的资源 重新获取回来 interruptMode是之前被中断的模式
         *
         *  acquireQueued 返回true的条件是在acquireQueued方法中获取锁失败并且进入了同步队列
         *   方法中发生了中断 那么就需要判断interruptMode 除了THROW_IE 只有0 和 REINTERRUPT
         *   即这里为了处理acquireQueued方法中发生的中断 之前interruptMode为0 但acquireQueued却中断了
         *   如果在acquireQueued方法中发生了中断 就把interruptMode升级为REINTERRUPT
         *   会在reportInterruptAfterWait中重新设置标志位
         */
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        /**
         * signal 将节点加入同步队列时会调用 first.nextWaiter = null
         *  这里是兼容中断线程的逻辑
         */
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 最后会根据不同的interruptMode来做出不同的响应
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

在park处返回后,就会进入checkInterruptWhileWaiting方法判断是否中断过,如果没有中断,会直接break跳出循环;如果有中断,那么在isOnSyncQueue判断中也会返回true,退出循环。不同的是,在下面会对中断线程进行处理:

  1. 理论上此线程有两种方式被唤醒:
    1. 线程中断 此时需要检测是在什么时候发生的 checkInterruptWhileWaiting 进行相应的处理,并调用transferAfterCancelledWait方法将线程节点加入同步队列中
    2. 被其他线程调用signal方法加入同步队列唤醒,此时已经在同步队列中 isOnSyncQueue 返回true
  2. 上面两种模式,都会进入到acquireQueued方法中处理,如果是中断被唤醒,await本方法是需要检测在await时中断后抛出异常的(与之对应的还有awaitUninterruptibly不响应中断),其他情况只需要设置中断状态,调用acquireQueued方法不会响应中断,但也会记录acquireQueued方法中发生的中断,作为返回值返回
  3. 调用acquireQueued方法会将之前fullyRelease方法释放的资源数传入,即在这里会重新获取回来
  4. 如果在acquireQueued方法执行时发生了中断,会返回中断状态会记录成REINTERRUPT在reportInterruptAfterWait中处理
  5. 如果是中断退出循环等待状态,那么此时节点的nextWaiter不为空,因为signal 和 signalAll都会将nextWaiter设置为空,这里需要清除一下取消节点
  6. 最后判断interruptMode != 0,调用reportInterruptAfterWait方法对中断模式进行处理
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        /**
         * 在await方法中 其他线程执行signal前进行了中断
         *  那么需要对中断进行响应 抛出InterruptedException异常
         * 其他模式中断 设置标志位
         */
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
为什么要重新设置标志位呢?

LockSupport.park()会响应中断,但同样会重置中断标志位,acquireQueued方法可能会让线程再次挂起(调用LockSupport.park()方法,会丢失之前的中断标志位),这里会判断之前是否中断,如果已经发生过中断就需要在设置一次中断标志位,附github测试演示


调用signal方法的流程分析到此就结束了,signalAll方法与此同理,不过会加入所有节点到同步队列:

    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            /**
             * 与doSignal方法不同的是 会循环所有的节点
             *  并调用transferForSignal将其加入到同步队列
             */
            transferForSignal(first);
            first = next;
        } while (first != null);
    }

回到栗子,我们的数字线程已经被阻塞在await方法上,等待字母线程调用signal方法,假如字母线程调用signal方法,那么此时的情况应该是:

image.png

此时线程还未切换,紧接着字母线程调用await方法:

image.png

字母线程调用await方法后,会释放占用锁状态并唤醒同步队列等待的数字线程,将自己封装成Node节点加入条件队列,直到数字线程调用signal那时...如果交替唤醒,交替输出,直到程序运行结束


小节

条件队列是平行存在同步队列存在于AbstractQueuedSynchronizer类中,一个同步队列对应多个条件队列,好似一个锁对应多个条件唤醒互不干扰。每当调用一次await方法就会释放当前线程的所有资源并且加入条件队列中,每当调用一次signal方法就会将条件队列中的头节点加入到同步队列中


小疑问

为什么Java要求wait与notify(await和signal)一定要在同步代码块中呢?

个人理解:需要保证等待和通知的顺序性,回到开篇问题,如果通知进行了提前,李四被提前通知(消费者被通知消费notify)来图书馆取书,但张三正准备而且在去图书馆路上,并且到了图书馆才会归还(生产者完成态时调用wait),那么此时的李四该怎么通知张三我已经到了呢?万一张三说今天下雨不归还了(线程切换)?

使用同步代码块的原因就是需要保证wait方法是优先调用时,持有资源可以保证是独占且优先的(因为抢占锁的总是第一个),且是提前于notify的,即避免lost wake up问题


参考

JDK 源码
AQS深入理解系列(四)Condition接口的实现
一文带你快速掌握AQS
wait为什么要在同步块中使用? 为什么sleep就不用再同步块中?

  • 本文作者: 沧澜
  • 本文链接: https://www.meetxiyu.cn/archives/抽象资源同步器框架AQS原理(三)——条件队列
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 软件思想 # SpringBoot # 领域驱动设计 # 算法 # 中间件 # 计算机网络 # MySQL # 数据库 # javascript # 极客时间 # 分布式架构 # Jenkins # JVM # 多线程 # Java基础 # CentOS安装 # 编译OpenJDK # 持续集成 # 杂谈
抽象资源同步器框架AQS原理(二)——共享模式
谈谈我的2020
  • 文章目录
  • 站点概览
沧澜

沧澜

芝兰生于幽谷,不以无人而不芳
君子修身养德,不以穷困而改志

74 日志
19 分类
19 标签
RSS
Creative Commons
0%
© 2019 — 2026 蜀ICP备19039166号
由 Halo 强力驱动
|
主题 - NexT.Mist v5.1.4