Java并发专题——AQS ConditionObject源码剖析

举报
容器小A 发表于 2017/09/29 16:44:37 2017/09/29
【摘要】 ConditionObject ConditionObject在并发编程中是非常有用的,它实现了Condition接口,往往和Lock结合起来用。

ConditionObject

ConditionObject在并发编程中是非常有用的,它实现了Condition接口,往往和Lock结合起来用。

Condition接口源码如下

Java 代码
01public interface Condition {
02    /** Causes the current thread to wait until it is signalled or interrupted */
03    void await() throws InterruptedException;
04    void awaitUninterruptibly();
05    long awaitNanos(long nanosTimeout) throws InterruptedException;
06    boolean await(long time, TimeUnit unit) throws InterruptedException;
07    boolean awaitUntil(Date deadline) throws InterruptedException;
08 
09    /** Wakes up one waiting thread */
10    void signal();
11 
12    /** Wakes up all waiting threads */
13    void signalAll();
14}

主要就是await和signal方法

看到这里,就似曾相识了。

- synchronized和wait, notify, notifyAll

- ReentrantLock和Condition的await, signal, signalAll

他们几乎实现了相同的功能。

在之前的文章中,介绍过synchronized, wait, notify, notifyAll,详细见:

http://3ms.huawei.com/hi/blog/714989_2131651.html?h=h

在那篇文章中,最后给出了一个用synchronized, wait, notifyAll实现阻塞队列的demo,然而JDK源码中的ArrayBlockingQueue是用ReentrantLock结合Condition来实现的。

我们结合ArrayBlockingQueue的源码分析一下,使用ReentrantLock和Condition实现的优势。

Java 代码
01public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
02    final Object[] items;
03    final ReentrantLock lock;
04    private final Condition notEmpty;
05    private final Condition notFull;
06    int takeIndex;
07    int putIndex;
08    int count;
09 
10    public ArrayBlockingQueue(int capacity) {
11        this(capacity, false);
12    }
13    public ArrayBlockingQueue(int capacity, boolean fair) {
14        this.items = new Object[capacity];
15        lock = new ReentrantLock(fair);
16        notEmpty = lock.newCondition();
17        notFull = lock.newCondition();
18    }
19    public void put(E e) throws InterruptedException {
20        checkNotNull(e);
21        final ReentrantLock lock = this.lock;
22        lock.lockInterruptibly();
23        try {
24            while (count == items.length)
25                notFull.await();
26            enqueue(e);
27        } finally {
28            lock.unlock();
29        }
30    }
31    public E take() throws InterruptedException {
32        final ReentrantLock lock = this.lock;
33        lock.lockInterruptibly();
34        try {
35            while (count == 0)
36                notEmpty.await();
37            return dequeue();
38        } finally {
39            lock.unlock();
40        }
41    }
42    private void enqueue(E x) {
43        final Object[] items = this.items;
44        items[putIndex] = x;
45        if (++putIndex == items.length)
46            putIndex = 0;
47        count++;
48        notEmpty.signal();
49    }
50    private void dequeue() {
51        final Object[] items = this.items;
52        E x = items[takeIndex];
53        items[takeIndex] = null;
54        if (++takeIndex == items.length)
55            takeIndex = 0;
56        count--;
57        if (itrs != null)
58            itrs.elementDequeued();
59        notFull.signal();
60        return x;
61    }
62}

以上是ArrayBlockingQueue的部分源码

1)put方法在队列满的时候,会阻塞调用线程,等待notFull条件;未满的时候,会唤醒等待notEmpty条件的线程

2)take方法在队列为空时,会阻塞调用线程,等待notEmpty条件;不为空时,会唤醒等待notFull条件的线程

形象点解释

1)生产者线程每次生产后(put方法),只唤醒被阻塞的消费者线程

2)消费者线程每次消费后(take方法),只唤醒被阻塞的生产者线程

很明显,这种唤醒方式效率更加高效。

看一下lock.newCondition()方法

Java 代码
01public class ReentrantLock implements Lock {
02    public Condition newCondition() {
03        return sync.newCondition();
04    }
05     
06    abstract static class Sync extends AbstractQueuedSynchronizer {
07        final ConditionObject newCondition() {
08            return new ConditionObject();
09        }
10    }
11}
12 
13public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
14    public class ConditionObject implements Condition {
15        /** First node of condition queue. */
16        private transient Node firstWaiter;
17         
18        /** Last node of condition queue. */
19        private transient Node lastWaiter;
20         
21        /** Mode meaning to reinterrupt on exit from wait */
22        private static final int REINTERRUPT = 1;
23        /** Mode meaning to throw InterruptedException on exit from wait */
24        private static final int THROW_IE = -1;
25         
26        public ConditionObject() { }
27         
28        // ......
29    }
30}

看到最终ConditionObject是在AQS中定义的。

在分析await和signal方法之前,我们需要明确一下,在AQS中的锁等待队列和ConditionObject中的队列是不一样的,这是两个队列,不能混为一谈。

接下来重点分析ConditionObject中的await和signal方法

await

需要先理解await方法究竟干了什么

首先有个前提,能够调用await方法的线程已经获得了锁(AQS acquire成功了),在此基础上,调用await方法主要有以下功能

(1)将当前线程加入到等待队列中,这个队列不是AQS的队列,而是指定ConditionObject的队列(条件队列)

(2)释放当前线程占有的锁,让出许可

(3)如果当前线程不在AQS的等待队列中,则阻塞当前线程

看下await方法的源码

Java 代码
01public final void await() throws InterruptedException {
02    if (Thread.interrupted())
03        throw new InterruptedException();
04    Node node = addConditionWaiter();
05    int savedState = fullyRelease(node);
06    int interruptMode = 0;
07    while (!isOnSyncQueue(node)) {
08        LockSupport.park(this);
09        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
10            break;
11    }
12    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13        interruptMode = REINTERRUPT;
14    if (node.nextWaiter != null) // clean up if cancelled
15        unlinkCancelledWaiters();
16    if (interruptMode != 0)
17        reportInterruptAfterWait(interruptMode);
18}

其中addConditionWaiter就是将当前线程加入到条件队列

Java 代码
01/**
02 * Adds a new waiter to wait queue.
03 * @return its new wait node
04 */
05private Node addConditionWaiter() {
06    Node t = lastWaiter;
07    // If lastWaiter is cancelled, clean out.
08    if (t != null && t.waitStatus != Node.CONDITION) {
09        unlinkCancelledWaiters();
10        t = lastWaiter;
11    }
12    Node node = new Node(Thread.currentThread(), Node.CONDITION);
13    if (t == null)
14        firstWaiter = node;
15    else
16        t.nextWaiter = node;
17    lastWaiter = node;
18    return node;
19}

将节点加入到条件队列的尾部,其中多了个判断和unlinkCancelledWaiters,这个后面再介绍。

fullyRelease就是释放当前线程占有的锁,让出许可

Java 代码
01/**
02 * Invokes release with current state value; returns saved state.
03 * Cancels node and throws exception on failure.
04 * @param node the condition node for this wait
05 * @return previous sync state
06 */
07final int fullyRelease(Node node) {
08    boolean failed = true;
09    try {
10        int savedState = getState();
11        if (release(savedState)) {
12            failed = false;
13            return savedState;
14        } else {
15            throw new IllegalMonitorStateException();
16        }
17    } finally {
18        if (failed)
19            node.waitStatus = Node.CANCELLED;
20    }
21}

fullyRelease没太多可说的,就是调用release方法释放了许可。

isOnSyncQueue判断当前线程是否在AQS的等待队列中

Java 代码
01/**
02 * Returns true if a node, always one that was initially placed on
03 * a condition queue, is now waiting to reacquire on sync queue.
04 * @param node the node
05 * @return true if is reacquiring
06 */
07final boolean isOnSyncQueue(Node node) {
08    if (node.waitStatus == Node.CONDITION || node.prev == null)
09        return false;
10    if (node.next != null) // If has successor, it must be on queue
11        return true;
12    /*
13     * node.prev can be non-null, but not yet on queue because
14     * the CAS to place it on queue can fail. So we have to
15     * traverse from tail to make sure it actually made it.  It
16     * will always be near the tail in calls to this method, and
17     * unless the CAS failed (which is unlikely), it will be
18     * there, so we hardly ever traverse much.
19     */
20    return findNodeFromTail(node);
21}
22 
23/**
24 * Returns true if node is on sync queue by searching backwards from tail.
25 * Called only when needed by isOnSyncQueue.
26 * @return true if present
27 */
28private boolean findNodeFromTail(Node node) {
29    Node t = tail;
30    for (;;) {
31        if (t == node)
32            return true;
33        if (t == null)
34            return false;
35        t = t.prev;
36    }
37}

没太多可说的,注意waitStatus为CONDITION这种结点,只属于条件队列,不在AQS锁等待队列中。

可以看到,await源码中除了实现了上述的3个功能外,还有额外的操作。而这些操作,就是线程被唤醒后需要执行的操作。主要3个操作:acquireQueued, unlinkCancelledWaiters, reportInterruptAfterWait

acquireQueued

被唤醒后,线程需要重新竞争锁,如果竞争不到还是会将自己阻塞,等待被唤醒重新竞争。这个方法具体实现在之前的文章中分析过。

unlinkCancelledWaiters

从条件队列中取消不是在CONDITION的线程节点

Java 代码
01private void unlinkCancelledWaiters() {
02    Node t = firstWaiter;
03    Node trail = null;
04    while (t != null) {
05        Node next = t.nextWaiter;
06        if (t.waitStatus != Node.CONDITION) {
07            t.nextWaiter = null;
08            if (trail == null)
09                firstWaiter = next;
10            else
11                trail.nextWaiter = next;
12            if (next == null)
13                lastWaiter = trail;
14        }
15        else
16            trail = t;
17        t = next;
18    }
19}

从条件队列中删除那些取消的节点

reportInterruptAfterWait

根据中断模式的不同,抛出InterruptedException或者重新中断当前线程。这个不详述。

很明显,线程被唤醒后开始还在await方法中,而得以继续执行下去的条件是:要么被中断了,要么isOnSyncQueue返回了true

由此可以想到,当线程被唤醒后,被加入到了AQS的等待队列中,因此signal操作中必然会有这一步。

signal

Java 代码
01/**
02 * Moves the longest-waiting thread, if one exists,
03 * from the wait queue for this condition to the wait queue for the owning lock.
04 */
05public final void signal() {
06    if (!isHeldExclusively())
07        throw new IllegalMonitorStateException();
08    Node first = firstWaiter;
09    if (first != null)
10        doSignal(first);
11}

这个方法对条件队列中的第一个节点,执行doSignal操作,其实就是将节点从条件队列移到了AQS的锁等待队列中。

Java 代码
01/**
02 * Removes and transfers nodes until hit non-cancelled one or null.
03 * Split out from signal in part to encourage compilers to inline the case of no waiters.
04 */
05private void doSignal(Node first) {
06    do {
07        if ((firstWaiter = first.nextWaiter) == null)
08            lastWaiter = null;
09        first.nextWaiter = null;
10    }
11    while (!transferForSignal(first) && (first = firstWaiter) != null);
12}

doSignal()方法是一个while循环,直到transferForSignal成功为止, transferForSignal就是将条件队列的头结点转移到AQS的锁队列。

Java 代码
01/**
02 * Transfers a node from a condition queue onto sync queue. Returns true if successful.
03 */
04final boolean transferForSignal(Node node) {
05    /** If cannot change waitStatus, the node has been cancelled */
06    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
07        return false;
08 
09    /**
10     * Splice onto queue and try to set waitStatus of predecessor to indicate
11     * that thread is (probably) waiting.  If cancelled or attempt to set waitStatus fails,
12     * wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
13     */
14    Node p = enq(node);
15    int ws = p.waitStatus;
16    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
17        LockSupport.unpark(node.thread);
18    return true;
19}

将节点从条件队列转移到锁等待队列,transferForSignal做了如下操作

(1)将节点状态从Node.CONDITION更新为0,如果更新失败,说明节点被取消了,则直接返回false

(2)将节点加入到AQS的锁等待队列中

(3)如果节点在AQS锁队列中的前驱节点被取消了或者将前驱节点设置为SIGNAL失败,则直接唤醒当前节点的线程;否则当前节点还需在AQS锁队列中等待唤醒。

signalAll

这个不说了,和signal的区别是,signalAll把条件队列中的所有节点都转移到了AQS锁队列中去了。

总结一下,signal/signalAll方法主要就是将线程从条件队列加入到了锁等待队列

流程图

下面的图片显示了大概的流程

(1)线程竞争锁,竞争失败时,会被阻塞到AQS的锁等待队列中。

(2)线程获取到了锁,调用Condition.await方法时,会主动将自己阻塞到ConditionObject条件队列中。

(3)线程获取到了锁,调用Condition.signal方法时,会将条件队列中被阻塞的线程转移到AQS的锁等待队列中。

可以和前面减少synchronized, wait, notify的文章中的图片对比看,其实是相似的。不同点就是可以有多个条件队列。

总结

如此,关于AQS中ConditionObject的实现就分析完毕。简而言之,就是实现了更强大的wait和notify,尽管synchronized经过不断优化,性能上已经和ReentrantLock相差不大了,但是在不少场合,ReentrantLock和ConditionObject确实有更实用的场景。

作者|张程

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:hwclouds.bbs@huawei.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。