ConditionObject
ConditionObject在并发编程中是非常有用的,它实现了Condition接口,往往和Lock结合起来用。
Condition接口源码如下
Java 代码01 | public interface Condition { |
02 | /** Causes the current thread to wait until it is signalled or interrupted */ |
03 | void await() throws InterruptedException; |
04 | void awaitUninterruptibly(); |
05 | long awaitNanos( long nanosTimeout) throws InterruptedException; |
06 | boolean await( long time, TimeUnit unit) throws InterruptedException; |
07 | boolean awaitUntil(Date deadline) throws InterruptedException; |
09 | /** Wakes up one waiting thread */ |
12 | /** Wakes up all waiting threads */ |
主要就是await和signal方法
看到这里,就似曾相识了。
- synchronized和wait, notify, notifyAll
- ReentrantLock和Condition的await, signal, signalAll
他们几乎实现了相同的功能。
在之前的文章中,介绍过synchronized, wait, notify, notifyAll,详细见:
http://3ms.huawei.com/hi/blog/714989_2131651.html?h=h
在那篇文章中,最后给出了一个用synchronized, wait, notifyAll实现阻塞队列的demo,然而JDK源码中的ArrayBlockingQueue是用ReentrantLock结合Condition来实现的。
我们结合ArrayBlockingQueue的源码分析一下,使用ReentrantLock和Condition实现的优势。
Java 代码01 | public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { |
03 | final ReentrantLock lock; |
04 | private final Condition notEmpty; |
05 | private final Condition notFull; |
10 | public ArrayBlockingQueue( int capacity) { |
11 | this (capacity, false ); |
13 | public ArrayBlockingQueue( int capacity, boolean fair) { |
14 | this .items = new Object[capacity]; |
15 | lock = new ReentrantLock(fair); |
16 | notEmpty = lock.newCondition(); |
17 | notFull = lock.newCondition(); |
19 | public void put(E e) throws InterruptedException { |
21 | final ReentrantLock lock = this .lock; |
22 | lock.lockInterruptibly(); |
24 | while (count == items.length) |
31 | public E take() throws InterruptedException { |
32 | final ReentrantLock lock = this .lock; |
33 | lock.lockInterruptibly(); |
42 | private void enqueue(E x) { |
43 | final Object[] items = this .items; |
45 | if (++putIndex == items.length) |
50 | private void dequeue() { |
51 | final Object[] items = this .items; |
52 | E x = items[takeIndex]; |
53 | items[takeIndex] = null ; |
54 | if (++takeIndex == items.length) |
58 | itrs.elementDequeued(); |
以上是ArrayBlockingQueue的部分源码
1)put方法在队列满的时候,会阻塞调用线程,等待notFull条件;未满的时候,会唤醒等待notEmpty条件的线程
2)take方法在队列为空时,会阻塞调用线程,等待notEmpty条件;不为空时,会唤醒等待notFull条件的线程
形象点解释
1)生产者线程每次生产后(put方法),只唤醒被阻塞的消费者线程
2)消费者线程每次消费后(take方法),只唤醒被阻塞的生产者线程
很明显,这种唤醒方式效率更加高效。
看一下lock.newCondition()方法
Java 代码01 | public class ReentrantLock implements Lock { |
02 | public Condition newCondition() { |
03 | return sync.newCondition(); |
06 | abstract static class Sync extends AbstractQueuedSynchronizer { |
07 | final ConditionObject newCondition() { |
08 | return new ConditionObject(); |
13 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer { |
14 | public class ConditionObject implements Condition { |
15 | /** First node of condition queue. */ |
16 | private transient Node firstWaiter; |
18 | /** Last node of condition queue. */ |
19 | private transient Node lastWaiter; |
21 | /** Mode meaning to reinterrupt on exit from wait */ |
22 | private static final int REINTERRUPT = 1 ; |
23 | /** Mode meaning to throw InterruptedException on exit from wait */ |
24 | private static final int THROW_IE = - 1 ; |
26 | public ConditionObject() { } |
看到最终ConditionObject是在AQS中定义的。
在分析await和signal方法之前,我们需要明确一下,在AQS中的锁等待队列和ConditionObject中的队列是不一样的,这是两个队列,不能混为一谈。
接下来重点分析ConditionObject中的await和signal方法
await
需要先理解await方法究竟干了什么
首先有个前提,能够调用await方法的线程已经获得了锁(AQS acquire成功了),在此基础上,调用await方法主要有以下功能
(1)将当前线程加入到等待队列中,这个队列不是AQS的队列,而是指定ConditionObject的队列(条件队列)
(2)释放当前线程占有的锁,让出许可
(3)如果当前线程不在AQS的等待队列中,则阻塞当前线程
看下await方法的源码
Java 代码01 | public final void await() throws InterruptedException { |
02 | if (Thread.interrupted()) |
03 | throw new InterruptedException(); |
04 | Node node = addConditionWaiter(); |
05 | int savedState = fullyRelease(node); |
06 | int interruptMode = 0 ; |
07 | while (!isOnSyncQueue(node)) { |
08 | LockSupport.park( this ); |
09 | if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) |
12 | if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
13 | interruptMode = REINTERRUPT; |
14 | if (node.nextWaiter != null ) |
15 | unlinkCancelledWaiters(); |
16 | if (interruptMode != 0 ) |
17 | reportInterruptAfterWait(interruptMode); |
其中addConditionWaiter就是将当前线程加入到条件队列
Java 代码02 | * Adds a new waiter to wait queue. |
03 | * @return its new wait node |
05 | private Node addConditionWaiter() { |
08 | if (t != null && t.waitStatus != Node.CONDITION) { |
09 | unlinkCancelledWaiters(); |
12 | Node node = new Node(Thread.currentThread(), Node.CONDITION); |
将节点加入到条件队列的尾部,其中多了个判断和unlinkCancelledWaiters,这个后面再介绍。
fullyRelease就是释放当前线程占有的锁,让出许可
Java 代码02 | * Invokes release with current state value; returns saved state. |
03 | * Cancels node and throws exception on failure. |
04 | * @param node the condition node for this wait |
05 | * @return previous sync state |
07 | final int fullyRelease(Node node) { |
08 | boolean failed = true ; |
10 | int savedState = getState(); |
11 | if (release(savedState)) { |
15 | throw new IllegalMonitorStateException(); |
19 | node.waitStatus = Node.CANCELLED; |
fullyRelease没太多可说的,就是调用release方法释放了许可。
isOnSyncQueue判断当前线程是否在AQS的等待队列中
Java 代码02 | * Returns true if a node, always one that was initially placed on |
03 | * a condition queue, is now waiting to reacquire on sync queue. |
04 | * @param node the node |
05 | * @return true if is reacquiring |
07 | final boolean isOnSyncQueue(Node node) { |
08 | if (node.waitStatus == Node.CONDITION || node.prev == null ) |
10 | if (node.next != null ) |
28 | private boolean findNodeFromTail(Node node) { |
没太多可说的,注意waitStatus为CONDITION这种结点,只属于条件队列,不在AQS锁等待队列中。
可以看到,await源码中除了实现了上述的3个功能外,还有额外的操作。而这些操作,就是线程被唤醒后需要执行的操作。主要3个操作:acquireQueued, unlinkCancelledWaiters, reportInterruptAfterWait
acquireQueued
被唤醒后,线程需要重新竞争锁,如果竞争不到还是会将自己阻塞,等待被唤醒重新竞争。这个方法具体实现在之前的文章中分析过。
unlinkCancelledWaiters
从条件队列中取消不是在CONDITION的线程节点
Java 代码01 | private void unlinkCancelledWaiters() { |
05 | Node next = t.nextWaiter; |
06 | if (t.waitStatus != Node.CONDITION) { |
11 | trail.nextWaiter = next; |
从条件队列中删除那些取消的节点
reportInterruptAfterWait
根据中断模式的不同,抛出InterruptedException或者重新中断当前线程。这个不详述。
很明显,线程被唤醒后开始还在await方法中,而得以继续执行下去的条件是:要么被中断了,要么isOnSyncQueue返回了true
由此可以想到,当线程被唤醒后,被加入到了AQS的等待队列中,因此signal操作中必然会有这一步。
signal
Java 代码02 | * Moves the longest-waiting thread, if one exists, |
03 | * from the wait queue for this condition to the wait queue for the owning lock. |
05 | public final void signal() { |
06 | if (!isHeldExclusively()) |
07 | throw new IllegalMonitorStateException(); |
08 | Node first = firstWaiter; |
这个方法对条件队列中的第一个节点,执行doSignal操作,其实就是将节点从条件队列移到了AQS的锁等待队列中。
Java 代码02 | * Removes and transfers nodes until hit non-cancelled one or null. |
03 | * Split out from signal in part to encourage compilers to inline the case of no waiters. |
05 | private void doSignal(Node first) { |
07 | if ((firstWaiter = first.nextWaiter) == null ) |
09 | first.nextWaiter = null ; |
11 | while (!transferForSignal(first) && (first = firstWaiter) != null ); |
doSignal()方法是一个while循环,直到transferForSignal成功为止, transferForSignal就是将条件队列的头结点转移到AQS的锁队列。
Java 代码02 | * Transfers a node from a condition queue onto sync queue. Returns true if successful. |
04 | final boolean transferForSignal(Node node) { |
05 | /** If cannot change waitStatus, the node has been cancelled */ |
06 | if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) |
10 | * Splice onto queue and try to set waitStatus of predecessor to indicate |
11 | * that thread is (probably) waiting. If cancelled or attempt to set waitStatus fails, |
12 | * wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong). |
15 | int ws = p.waitStatus; |
16 | if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) |
17 | LockSupport.unpark(node.thread); |
将节点从条件队列转移到锁等待队列,transferForSignal做了如下操作
(1)将节点状态从Node.CONDITION更新为0,如果更新失败,说明节点被取消了,则直接返回false
(2)将节点加入到AQS的锁等待队列中
(3)如果节点在AQS锁队列中的前驱节点被取消了或者将前驱节点设置为SIGNAL失败,则直接唤醒当前节点的线程;否则当前节点还需在AQS锁队列中等待唤醒。
signalAll
这个不说了,和signal的区别是,signalAll把条件队列中的所有节点都转移到了AQS锁队列中去了。
总结一下,signal/signalAll方法主要就是将线程从条件队列加入到了锁等待队列
流程图
下面的图片显示了大概的流程
(1)线程竞争锁,竞争失败时,会被阻塞到AQS的锁等待队列中。
(2)线程获取到了锁,调用Condition.await方法时,会主动将自己阻塞到ConditionObject条件队列中。
(3)线程获取到了锁,调用Condition.signal方法时,会将条件队列中被阻塞的线程转移到AQS的锁等待队列中。
可以和前面减少synchronized, wait, notify的文章中的图片对比看,其实是相似的。不同点就是可以有多个条件队列。
总结
如此,关于AQS中ConditionObject的实现就分析完毕。简而言之,就是实现了更强大的wait和notify,尽管synchronized经过不断优化,性能上已经和ReentrantLock相差不大了,但是在不少场合,ReentrantLock和ConditionObject确实有更实用的场景。
作者|张程
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:hwclouds.bbs@huawei.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
评论(0)