AQS源码探究_09 Semaphore源码分析
【摘要】 文章参考:小刘老师讲源码
1、简介
Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。Semaphore信号量,获取通行证流程图:
2、入门案例
案例1:Pool.jav...
- 文章参考:小刘老师讲源码
1、简介
- Semaphore,信号量,它保存了一系列的许可(permits),每次调用
acquire()
都将消耗一个许可,每次调用release()
都将归还一个许可。 - Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。
- Semaphore信号量,获取通行证流程图:
2、入门案例
案例1:Pool.java
/**
* date: 2021/5/10
* @author csp
*/
public class Pool { /** * 可同时访问资源的最大线程数 */ private static final int MAX_AVAILABLE = 100; /** * 信号量 表示:可获取的对象通行证 */ private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); /** * 共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是连接池 */ protected Object[] items = new Object[MAX_AVAILABLE]; /** * 共享资源占用情况,与items数组一一对应,比如: * items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false */ protected boolean[] used = new boolean[MAX_AVAILABLE]; /** * 获取一个空闲对象 * 如果当前池中无空闲对象,则等待..直到有空闲对象为止 */ public Object getItem() throws InterruptedException { // 每次调用acquire()都将消耗一个许可(permits) available.acquire(); return getNextAvailableItem(); } /** * 归还对象到池中 */ public void putItem(Object x) { if (markAsUnused(x)) available.release(); } /** * 获取池内一个空闲对象,获取成功则返回Object,失败返回Null * 成功后将对应的 used[i] = true */ private synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } /** * 归还对象到池中,归还成功返回true * 归还失败: * 1.池中不存在该对象引用,返回false * 2.池中存在该对象引用,但该对象目前状态为空闲状态,也返回false */ private synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; }
}
案例2:SemaphoreTest02.java
/**
* date: 2020/5/10
* @author csp
*/
public class SemaphoreTest02 { public static void main(String[] args) throws InterruptedException { // 声明信号量,初始的许可(permits)为2 // 公平模式:fair为true final Semaphore semaphore = new Semaphore(2, true); Thread tA = new Thread(() ->{ try { // 每次调用acquire()都将消耗一个许可(permits) semaphore.acquire(); System.out.println("线程A获取通行证成功"); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { }finally { // 每次调用release()都将归还一个许可(permits) semaphore.release(); } }); tA.start(); // 确保线程A已经执行 TimeUnit.MILLISECONDS.sleep(200); Thread tB = new Thread(() ->{ try { // 调用acquire(2)都将消耗2个许可(permits) semaphore.acquire(2); System.out.println("线程B获取通行证成功"); } catch (InterruptedException e) { }finally { // 调用release(2)都将归还2个许可(permits) semaphore.release(2); } }); tB.start(); // 确保线程B已经执行 TimeUnit.MILLISECONDS.sleep(200); Thread tC = new Thread(() ->{ try { // 每次调用acquire()都将消耗一个许可(permits) semaphore.acquire(); System.out.println("线程C获取通行证成功"); } catch (InterruptedException e) { }finally { // 每次调用release()都将归还一个许可(permits) semaphore.release(); } }); tC.start(); }
}
执行结果:
线程A获取通行证成功
线程B获取通行证成功
线程C获取通行证成功
3、源码分析
内部类Sync
- 通过Sync的几个实现方法,我们获取到以下几点信息:
- 许可是在构造方法时传入的;
- 许可存放在状态变量state中;
- 尝试获取一个许可的时候,则state的值减1;
- 当state的值为0的时候,则无法再获取许可;
- 释放一个许可的时候,则state的值加1;
- 许可的个数可以动态改变;
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 构造方法,传入许可次数,放入state中 Sync(int permits) { setState(permits); } // 获取许可次数 final int getPermits() { return getState(); }
// 非公平模式尝试获取许可 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 先看看还有几个许可 int available = getState(); // 减去这次需要获取的许可还剩下几个许可 int remaining = available - acquires; // 如果剩余许可小于0了则直接返回 // 如果剩余许可不小于0,则尝试原子更新state的值,成功了返回剩余许可 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 释放许可 protected final boolean tryReleaseShared(int releases) { for (;;) { // 先看看还有几个许可 int current = getState(); // 加上这次释放的许可 int next = current + releases; // 检测溢出 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 如果原子更新state的值成功,就说明释放许可成功,则返回true if (compareAndSetState(current, next)) return true; } }
// 减少许可 final void reducePermits(int reductions) { for (;;) { // 先看看还有几个许可 int current = getState(); // 减去将要减少的许可 int next = current - reductions; // 检测溢出 if (next > current) // underflow throw new Error("Permit count underflow"); // 原子更新state的值,成功了返回true if (compareAndSetState(current, next)) return; } } // 销毁许可 final int drainPermits() { for (;;) { // 先看看还有几个许可 int current = getState(); // 如果为0,直接返回 // 如果不为0,把state原子更新为0 if (current == 0 || compareAndSetState(current, 0)) return current; } }
}
内部类NonfairSync
非公平模式下,直接调用父类的nonfairTryAcquireShared()
尝试获取许可。
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // 构造方法,调用父类的构造方法 NonfairSync(int permits) { super(permits); }
// 尝试获取许可,调用父类的nonfairTryAcquireShared()方法 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
}
内部类FairSync
公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。
**注意:**为了阅读方便,该内部类中将一些AQS中的方法粘贴过来了,在方法头注释加有标注!
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } /** * 该方法位于AQS中: * 尝试获取通行证,获取成功返回 >= 0的值; * 获取失败 返回 < 0 值 */ protected int tryAcquireShared(int acquires) { for (;;) { // 判断当前 AQS 阻塞队列内 是否有等待者线程,如果有直接返回-1,表示当前aquire操作的线程需要进入到队列等待.. if (hasQueuedPredecessors()) return -1; // 执行到这里,有哪几种情况? // 1.调用aquire时 AQS阻塞队列内没有其它等待者 // 2.当前节点 在阻塞队列内是headNext节点 // 获取state ,state这里表示 通行证 int available = getState(); // remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量 int remaining = available - acquires; // 条件一:remaining < 0 成立,说明线程获取通行证失败.. // 条件二:前置条件,remaning >= 0, CAS更新state 成功,说明线程获取通行证成功,CAS失败,则自旋。 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * 该方法位于AQS中: */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 条件成立:说明当前调用acquire方法的线程 已经是 中断状态了,直接抛出异常.. if (Thread.interrupted()) throw new InterruptedException(); // 对应业务层面 执行任务的线程已经将latch打破了。然后其他再调用latch.await的线程,就不会在这里阻塞了 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * 该方法位于AQS中: */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 将调用Semaphore.aquire方法的线程 包装成node加入到 AQS的阻塞队列当中。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取当前线程节点的前驱节点 final Node p = node.predecessor(); // 条件成立,说明当前线程对应的节点 为 head.next节点 if (p == head) { // head.next节点就有权利获取 共享锁了.. int r = tryAcquireShared(arg); // 站在Semaphore角度:r 表示还剩余的通行证数量 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // shouldParkAfterFailedAcquire 会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 signal(-1),返回true // parkAndCheckInterrupt 挂起当前节点对应的线程... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 该方法位于AQS中: * 设置当前节点为 head节点,并且向后传播!(依次唤醒!) */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 将当前节点设置为 新的 head节点。 setHead(node); // 调用setHeadAndPropagete 时 propagate == 1 一定成立 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取当前节点的后继节点.. Node s = node.next; // 条件一:s == null 什么时候成立呢? 当前node节点已经是 tail了,条件一会成立。 doReleaseShared() 里面会处理这种情况.. // 条件二:前置条件,s != null , 要求s节点的模式必须是 共享模式。 latch.await() -> addWaiter(Node.SHARED) if (s == null || s.isShared()) // 基本上所有情况都会执行到 doReleasseShared() 方法。 doReleaseShared(); } } //AQS.releaseShared 该方法位于AQS中: public final boolean releaseShared(int arg) { // 条件成立:表示当前线程释放资源成功,释放资源成功后,去唤醒获取资源失败的线程.. if (tryReleaseShared(arg)) { // 唤醒获取资源失败的线程... doReleaseShared(); return true; } return false; } /** * 唤醒获取资源失败的线程 * * CountDownLatch版本 * 都有哪几种路径会调用到doReleaseShared方法呢? * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 唤醒当前阻塞队列内的 head.next 对应的线程。 * 2.被唤醒的线程 -> doAcquireSharedInterruptibly parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared() * * Semaphore版本 * 都有哪几种路径会调用到doReleaseShared方法呢? * */ //AQS.doReleaseShared 该方法位于AQS中: private void doReleaseShared() { for (;;) { // 获取当前AQS 内的 头结点 Node h = head; // 条件一:h != null 成立,说明阻塞队列不为空.. // 不成立:h == null 什么时候会是这样呢? // latch创建出来后,没有任何线程调用过 await() 方法之前,有线程调用latch.countDown()操作 且触发了 唤醒阻塞节点的逻辑.. // 条件二:h != tail 成立,说明当前阻塞队列内,除了head节点以外 还有其他节点。 // h == tail -> head 和 tail 指向的是同一个node对象。 什么时候会有这种情况呢? // 1. 正常唤醒情况下,依次获取到 共享锁,当前线程执行到这里时 (这个线程就是 tail 节点。) // 2. 第一个调用await()方法的线程 与 调用countDown()且触发唤醒阻塞节点的线程 出现并发了.. // 因为await()线程是第一个调用 latch.await()的线程,此时队列内什么也没有,它需要补充创建一个Head节点,然后再次自旋时入队 // 在await()线程入队完成之前,假设当前队列内 只有 刚刚补充创建的空元素 head 。 // 同期,外部有一个调用countDown()的线程,将state 值从1,修改为0了,那么这个线程需要做 唤醒 阻塞队列内元素的逻辑.. // 注意:调用await()的线程 因为完全入队完成之后,再次回到上层方法 doAcquireSharedInterruptibly 会进入到自旋中, // 获取当前元素的前驱,判断自己是head.next, 所以接下来该线程又会将自己设置为 head,然后该线程就从await()方法返回了... if (h != null && h != tail) { // 执行到if里面,说明当前head 一定有 后继节点! int ws = h.waitStatus; // 当前head状态 为 signal 说明 后继节点并没有被唤醒过呢... if (ws == Node.SIGNAL) { // 唤醒后继节点前 将head节点的状态改为 0 // 这里为什么,使用CAS呢? 回头说... // 当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时, // CAS 可能会失败... // 案例: // t3 线程在if(h == head) 返回false时,t3 会继续自旋. 参与到 唤醒下一个head.next的逻辑.. // t3 此时执行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,也进入到 if (ws == Node.SIGNAL) 里面了, // 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 会失败,因为 t3 改过了... if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 条件成立: // 1.说明刚刚唤醒的 后继节点,还没执行到 setHeadAndPropagate方法里面的 设置当前唤醒节点为head的逻辑。 // 这个时候,当前线程 直接跳出去...结束了.. // 此时用不用担心,唤醒逻辑 在这里断掉呢?、 // 不需要担心,因为被唤醒的线程 早晚会执行到doReleaseShared方法。 // 2.h == null latch创建出来后,没有任何线程调用过 await() 方法之前, // 有线程调用latch.countDown()操作 且触发了 唤醒阻塞节点的逻辑.. // 3.h == tail -> head 和 tail 指向的是同一个node对象 // 条件不成立: // 被唤醒的节点 非常积极,直接将自己设置为了新的head,此时 唤醒它的节点(前驱),执行h == head 条件会不成立.. // 此时 head节点的前驱,不会跳出 doReleaseShared 方法,会继续唤醒 新head 节点的后继... if (h == head) // loop if head changed break; } }
}
构造方法
创建Semaphore时需要传入许可次数。Semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。
// 构造方法,创建时要传入许可次数,默认使用非公平模式
public Semaphore(int permits) { sync = new NonfairSync(permits);
}
// 构造方法,需要传入许可次数,及是否公平模式
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire()方法
获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);
}
// 获取一个许可,非中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。
public void acquireUninterruptibly() { sync.acquireShared(1);
}
acquire(int permits)方法
一次获取多个许可,可中断方式。
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits);
}
// 一次获取多个许可,非中断方式。
public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits);
}
tryAcquire()方法
尝试获取一个许可,使用Sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true;
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
release()方法
释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。
public void release() { sync.releaseShared(1);
}
release(int permits)方法
一次释放多个许可,state的值会相应增加permits的数量。
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits);
}
4、小结
- Semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景;
- Semaphore的内部实现是基于AQS的共享锁来实现的;
- Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中;
- 获取一个许可时,则state值减1;
- 释放一个许可时,则state值加1;
文章来源: csp1999.blog.csdn.net,作者:兴趣使然の草帽路飞,版权归原作者所有,如需转载,请联系作者。
原文链接:csp1999.blog.csdn.net/article/details/116604531
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)