AQS源码探究_06 Conditon条件队列(await方法、线程入队与挂起、signal方法)

举报
兴趣使然的草帽路飞 发表于 2021/06/08 22:14:05 2021/06/08
【摘要】 AQS源码探究_06 Conditon条件队列(await方法、线程入队与挂起、signal方法) 提示:读源码的时候尽量自己点开源码跟着博客注释一起看,不然容易迷路~ 1、条件队列流程图 2、Condition接口 public interface Condition { // 线程等待,可抛出中断异常(可以响应中断) void await() t...

AQS源码探究_06 Conditon条件队列(await方法、线程入队与挂起、signal方法)

提示:读源码的时候尽量自己点开源码跟着博客注释一起看,不然容易迷路~

1、条件队列流程图

在这里插入图片描述


2、Condition接口

public interface Condition {
	// 线程等待,可抛出中断异常(可以响应中断)
	void await() throws InterruptedException; // 线程等待,但是不可响应中断
	void awaitUninterruptibly(); // 线程超时等待
	boolean await(long time, TimeUnit unit) throws InterruptedException; // 线程等待,直到收到被唤醒、或收到中断信号、或到了指定的截止时间日期自动唤醒
	boolean awaitUntil(Date deadline) throws InterruptedException; // 线程唤醒
	void signal(); // 唤醒所有线程
	void signalAll();
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3、AQS中实现Condition接口的内部类ConditionObject

// 位于AQS中的内部类:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 队列的第一个等待者节点:头等待者 private transient Node firstWaiter; // 队列的最后一个等待者节点:尾等待者 private transient Node lastWaiter; /** * 无参构造函数: * 在ReentrantLock中的newCondition()方法内,借助ReentrantLock的静态内部类Sync,去调用 * newCondition方法实例化ConditionObject */ public ConditionObject() { } ...
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

接下来分析一下ConditionObject的内部方法:

3.1 await线程等待方法

// 线程等待,可抛出中断异常(可以响应中断)
public final void await() throws InterruptedException { // 判断当前线程是否是中断状态: if (Thread.interrupted()) // 如果是中断状态,则直接抛出中断异常 throw new InterruptedException(); // 把当前线程包装成一个node,放入条件队列中,并返回封装当前线程的node Node node = addConditionWaiter(); // 完全释放当前线程对应的锁(将state置为0): // 为什么要释放锁呢? 因为加着锁挂起的时候,除了本线程能唤醒(本线程已挂起),其他线程都没办法唤醒啊~ int savedState = fullyRelease(node); // Condition队列中断状态: // 0: 在Condition队列挂起期间,未接收过中断信号~ // -1: 在Condition队列挂起期间,接收到中断信号~ // 1: 在Condition队列挂起期间,未接收到中断信号,但是迁移到阻塞队列之后,接收到过中断信号~ int interruptMode = 0; // while循环条件:判断当前node节点是否在阻塞队列中:  // isOnSyncQueue返回true:表示当前线程对应的node已经迁移到了阻塞队列中了 // isOnSyncQueue返回false:说明当前node仍还在条件队列中,需要park挂起~ while (!isOnSyncQueue(node)) { // 挂起当前线程,使其处于等待状态~ LockSupport.park(this); //什么时候会被唤醒?都有几种情况呢? // 1.常规路径:外部线程获取到lock之后,调用了 signal()方法 转移条件队列的头节点到 阻塞队列, 当这个节点获取到锁后,会唤醒。 // 2.转移至阻塞队列后,发现阻塞队列中的前驱节点状态 是 取消状态,此时会唤醒当前节点 // 3.当前节点挂起期间,被外部线程使用中断唤醒.. // checkInterruptWhileWaiting: 即使在Condition队列挂起期间,线程发生中断了,对应的node仍然会被迁移到阻塞队列中(等待去获取锁)~ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 执行到这里,说明当前node已经迁移到阻塞队列了 // acquireQueued:竞争队列的逻辑,线程节点竞争资源 // 条件一:返回true 表示在阻塞队列中 被外部线程中断唤醒过.. // 条件二:interruptMode != THROW_IE 成立,说明当前node在条件队列内 未发生过中断 // 设置interruptMode = REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 考虑下 node.nextWaiter != null 条件什么时候成立呢? // 其实是node在条件队列内时 如果被外部线程 中断唤醒时,会加入到阻塞队列,但是并未设置nextWaiter = null。 if (node.nextWaiter != null) // clean up if cancelled // 清理条件队列内取消状态的节点.. unlinkCancelledWaiters(); // 条件成立:说明挂起期间 发生过中断(1.条件队列内的挂起 2.条件队列之外的挂起) if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}

// 把当前线程包装成一个node,放入条件队列中,并返回封装当前线程的node
// 注意:调用await方法的线程都是持锁状态的线程,也就是说:在await方法中调用的addConditionWaiter方法不存在并发问题~
private Node addConditionWaiter() { // 获取当前条件队列的尾节点的引用,保存到局部变量t中 Node t = lastWaiter; // 条件1:t != null 成立:说明当前条件队列中,已经有node元素了 // 条件2:(node在队列中时,它的状态是 CONDITION(-2)) // t.waitStatus != Node.CONDITION 成立:说明当前node发生中断了... if (t != null && t.waitStatus != Node.CONDITION) { // 清理条件队列中,所有CONDITION(-2)取消状态的node节点 unlinkCancelledWaiters(); // 更新局部变量t 为最新的尾结点的引用~  // 因为unlinkCancelledWaiters方法可能会更改lastWaiter的引用 t = lastWaiter; } // 为当前线程创建node节点,设置状态为 CONDITION(-2) Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果t == null 条件成立,说明条件队列中没有任何元素,当前线程是进入队列的第一个元素 if (t == null) firstWaiter = node; else t.nextWaiter = node; // 更新队尾节点引用指向node lastWaiter = node; // 返回包装当前线程的node return node;
}

// 即使在Condition队列挂起期间,线程发生中断了,对应的node仍然会被迁移到阻塞队列中(等待去获取锁)~
private int checkInterruptWhileWaiting(Node node) { // Thread.interrupted() 返回当前线程中断标记位,并且重置当前标记位为false return Thread.interrupted() ? // transferAfterCancelledWait 这个方法只有在线程是被中断唤醒时才会调用! (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) { // 条件成立:说明当前node一定是在 条件队列内,因为signal 迁移节点到阻塞队列时,会将节点的状态修改为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 中断唤醒的node也会被加入到 阻塞队列中!! enq(node); // true:表示是在条件队列内被中断的. return true; } // 执行到这里有几种情况? // 1.当前node已经被外部线程调用 signal 方法将其迁移到 阻塞队列内了。 // 2.当前node正在被外部线程调用 signal 方法将其迁移至 阻塞队列中 进行中状态.. while (!isOnSyncQueue(node)) Thread.yield(); // false:表示当前节点被中断唤醒时 不在 条件队列了.. return false;
}

// 清理条件队列内取消状态的节点..(纯链表操作的方法)
private void unlinkCancelledWaiters() { // 表示循环当前节点,从链表的第一个节点开始 向后迭代处理. Node t = firstWaiter; // 当前链表上一个正常状态的node节点 Node trail = null; while (t != null) { // 当前节点的下一个节点. Node next = t.nextWaiter; // 条件成立:说明当前节点状态为 取消状态 if (t.waitStatus != Node.CONDITION) { // 更新nextWaiter为null t.nextWaiter = null; // 条件成立:说明遍历到的节点还未碰到过正常节点.. if (trail == null) // 更新firstWaiter指针为下个节点就可以 firstWaiter = next; else // 让上一个正常节点指向 取消节点的 下一个节点..中间有问题的节点 被跳过去了.. trail.nextWaiter = next; // 条件成立:当前节点为队尾节点了,更新lastWaiter 指向最后一个正常节点 就Ok了 if (next == null) lastWaiter = trail; } else// 条件不成立执行到else,说明当前节点是正常节点 trail = t; t = next; }
}

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 条件成立:说明在条件队列内发生过中断,此时await方法抛出中断异常 if (interruptMode == THROW_IE) throw new InterruptedException(); // 条件成立:说明在条件队列外发生的中断,此时设置当前线程的中断标记位 为true // 中断处理 交给 你的业务处理。 如果你不处理,那什么事 也不会发生了... else if (interruptMode == REINTERRUPT) selfInterrupt();
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158

3.2 fullyRelease完全释放当前线程锁的方法

// 位于AQS中:
// 完全释放当前线程对应的锁(将state置为0):
final int fullyRelease(Node node) { // 完全释放锁是否成功,当failed失败时,说明当前线程是未持有锁调用await方法的线程(错误写法...) // 假设失败,在finally代码块中,会将刚刚加入到条件队列的,当前线程对应的node节点状态修改为取消状态 // 后继线程就会将取消状态的节点给清理出去~  boolean failed = true; try { // 获取当前线程所持有的state值。 int savedState = getState(); // 绝大部分情况下:release这里会返回true if (release(savedState)) { // 失败标记设置为false failed = false; // 返回当前线程释放的state值 return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3.3 isOnSyncQueue判断当前node节点是否在阻塞队列中

// 位于AQS中:
// 判断当前node节点是否在阻塞队列中
final boolean isOnSyncQueue(Node node) { // 条件一:node.waitStatus == Node.CONDITION 条件成立:说明当前node一定是在条件队列中, // 因为signal方法迁移节点到阻塞队列前,会将node的状态设置为0 // 条件二:前置条件node.waitStatus != Node.CONDITION ===>然后再细分以下几种情况: // 1.node.waitStatus == 0 (表示当前节点已经被singal) // 2.node.waitStatus == 1 (表示当前线程未持有锁就调用了await方法,最终会将node的状态修改为取消状态...) // node.waitStatus == 0 为什么还要判断node.prev == null呢? // 因为:signal方法是先修改状态,再迁移 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 执行到这里,会是哪种情况? // node.waitStatus != CONDITION 且 node.prev != null  ===> 可以排除掉 node.waitStatus == 1 取消状态.. // 为什么可以排除取消状态? 因为signal方法是不会把 取消状态的node迁移走的 // 设置prev引用的逻辑 是 迁移 阻塞队列 逻辑的设置的(enq()) // 入队的逻辑:1.设置node.prev = tail;   2. cas当前node为 阻塞队列的 tail 尾节点 成功才算是真正进入到 阻塞队列! 3.pred.next = node; // 可以推算出,就算prev不是null,也不能说明当前node 已经成功入队到 阻塞队列了。 // 条件成立:说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它node了... if (node.next != null) return true; /** * 执行到这里,说明当前节点的状态为:node.prev != null 且 node.waitStatus == 0 * findNodeFromTail 从阻塞队列的尾巴开始向前遍历查找node,如果查找到 返回true,查找不到返回false * 当前node有可能正在signal过程中,正在迁移中...还未完成... */ return findNodeFromTail(node);
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3.4 signal线程唤醒方法

// 位于AQS中:线程唤醒方法
public final void signal() { // 判断调用signal方法的线程是否是独占锁持有线程,如果不是,直接抛出异常.. if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列的一个node Node first = firstWaiter; // 第一个节点不为null,则将第一个节点 进行迁移到阻塞队列的逻辑.. if (first != null) doSignal(first);
}

// 位于AQS中:将第一个节点 进行迁移到阻塞队列 
private void doSignal(Node first) { do { // firstWaiter = first.nextWaiter因为当前first马上要出条件队列了, // 所以更新firstWaiter为 当前节点的下一个节点.. // 如果当前节点的下一个节点 是 null,说明条件队列只有当前一个节点了... // 当前出队后,整个队列就空了..所以需要更新lastWaiter = null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 当前first节点 出 条件队列。断开和下一个节点的关系 first.nextWaiter = null; // transferForSignal(first) 返回boolean类型 // 返回true 表示当前first节点迁移到阻塞队列成功  返回false 表示迁移失败... // while循环 :(first = firstWaiter) != null   // 当前first迁移失败,则将first更新为 first.next 继续尝试迁移.. // 直至迁移某个节点成功,或者 条件队列为null为止。 } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3.4 transferForSignal当前节点迁移到阻塞队列

// 位于AQS:当前节点迁移到阻塞队列:
// 返回true 表示当前first节点迁移到阻塞队列成功  返回false 表示迁移失败...
final boolean transferForSignal(Node node) { // cas修改当前节点的状态,修改为0,因为当前节点马上要迁移到 阻塞队列了 // 成功:当前节点在条件队列中状态正常。 // 失败: 1.取消状态 (线程await时 未持有锁,最终线程对应的node会设置为 取消状态) // 2.node对应的线程 挂起期间,被其它线程使用 中断信号 唤醒过... // (就会主队进入到 阻塞队列,这时也会修改状态为0) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq最终会将当前 node 入队到 阻塞队列,p 是当前节点在阻塞队列的 前驱节点. Node p = enq(node); // ws 前驱节点的状态.. int ws = p.waitStatus; // 条件一:ws > 0 成立:说明前驱节点的状态在阻塞队列中是 取消状态,唤醒当前节点。 // 条件二:前置条件(ws <= 0), // compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回true 表示设置前驱节点状态为 SIGNAl状态成功 // compareAndSetWaitStatus(p, ws, Node.SIGNAL) 返回false  ===> 什么时候会false? // 当前驱node对应的线程 是 lockInterrupt入队的node时,是会响应中断的,外部线程给前驱线程中断信号之后,前驱node会将 // 状态修改为 取消状态,并且执行 出队逻辑.. // 前驱节点状态 只要不是 0 或者 -1 那么,就唤醒当前线程。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒当前node对应的线程...回头再说。 LockSupport.unpark(node.thread); return true;
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

文章来源: csp1999.blog.csdn.net,作者:兴趣使然の草帽路飞,版权归原作者所有,如需转载,请联系作者。

原文链接:csp1999.blog.csdn.net/article/details/116399552

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。