浅谈AQS
说到Java的并发编程包,就一定少不了一个东西,它就是AQS,可能有些同学是第一次遇到这个名词,没关系,并发包里的ReentrantLock你总用过吧?那么你有没有想过,为什么简简单单地调用lock()、unlock()方法就能够解决线程的安全问题呢?
CAS
我们都知道,Java还有一种线程同步的方式,synchronized关键字,使用它能够解决线程的安全问题,然而,由于synchronized底层是通过操作系统Mutex Lock来实现的,导致synchronized的效率比较低,被大家称为重量级锁。好在JDK1.6,官方对synchronized进行了较为深入的改动,引入了偏向锁、轻量级锁、锁消除、锁粗化等等机制,大大提升了synchronized的性能。
而在JDK1.6之前,为了解决synchronized性能低下的问题, Doug Lea一举开发出了Java并发包中的众多组件,为Java的发展做出了巨大的贡献,它通过各种巧妙的机制,实现了在不加锁的前提下保障线程安全,比如:
public class LockDemo {
private AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
// 获取当前线程对象
Thread thread = Thread.currentThread();
// 自旋等待
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void unlock() {
// 获取当前线程对象
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
}
static int count = 0;
public static void main(String[] args) throws InterruptedException {
LockDemo lockDemo = new LockDemo();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
Thread thread = new Thread(() -> {
lockDemo.lock();
for (int j = 0; j < 1000; j++) {
count++;
}
lockDemo.unlock();
});
thread.start();
threadList.add(thread);
}
// 等待线程执行完毕
for (Thread thread : threadList) {
thread.join();
}
System.out.println(count);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
该程序使用CAS机制实现了一个自旋锁,保证了线程安全,Java并发包里大量地使用到了CAS。
AQS
下面进入本篇文章的主题,AQS,我们以一个ReentrantLock的程序为例:
public class LockDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
List<Thread> threadList = new ArrayList<>();
Lock lock = new ReentrantLock();
for (int i = 0; i < 50; ++i) {
Thread thread = new Thread(() -> {
lock.lock();
try {
for (int j = 0; j < 1000; j++) {
count++;
}
} finally {
lock.unlock();
}
});
thread.start();
threadList.add(thread);
}
for (Thread thread : threadList) {
thread.join();
}
System.out.println(count);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
当我们创建一个ReentrantLock对象时:
public ReentrantLock() {
sync = new NonfairSync();
}
- 1
- 2
- 3
会创建NonfairSync对象并将其赋值给sync,那这个sync是什么呢?
private final Sync sync;
- 1
它是一个Sync类型的变量,而Sync是ReentrantLock的一个内部类:
abstract static class Sync extends AbstractQueuedSynchronizer {
......
}
- 1
- 2
- 3
Sync继承自AbstractQueuedSynchronizer,它就是我们重点要介绍的AQS,意为抽象队列同步器。所以说,实际上我们创建的是一个抽象队列同步器。
此时某个线程会执行lock()方法,来看看lock()方法的源码:
public void lock() {
sync.lock();
}
- 1
- 2
- 3
调用的是sync的lock()方法:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
这是一个ReentrantLock的内部类,继承自Sync,所以执行它的lock方法,在该方法中,使用到了CAS,首先执行compareAndSetState()方法,因为NonfairSync和Sync类都没有重写该方法,所以它执行的是AbstractQueuedSynchronizer类的:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
- 1
- 2
- 3
- 4
该方法的含义是预测AbstractQueuedSynchronizer类中的属性state值为0,若确实为0,则将其更新为1,对于第一个线程来说,这肯定是成立的,所以修改成功,返回true值,并继续执行if语句块中的方法:
setExclusiveOwnerThread(Thread.currentThread());
- 1
它仍然执行的是AbstractQueuedSynchronizer中的方法:
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
- 1
- 2
- 3
它设置的是独占模式同步的当前拥有者,即:哪个线程将state置为了1,说明该线程占有了它,就将该线程设置为资源的拥有者,到这里lock()方法就结束了。
此时如果有第二个线程想要来抢占资源,它也来执行lock()方法,同样地走到这个方法:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 1
- 2
- 3
- 4
- 5
- 6
此时该线程期望state的值为0,但state已经被第一个线程修改为1了,第二个线程的更新操作肯定是失败的,并返回false,所以执行acquire()方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 1
- 2
- 3
- 4
- 5
这里就涉及到AQS的核心内容了,因为当前state的值为1,所以当前线程认为资源被其它线程独占了,此时该线程就需要等待,在AQS中维护了一个队列,它是用双向链表实现的,当某个线程需要等待资源时,就将其作为一个节点存入队列。
此时第一个线程执行完毕,调用unlock()方法准备释放锁:
public void unlock() {
sync.release(1);
}
- 1
- 2
- 3
它调用的是AQS的release()方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
AQS判断队列中是否有节点,若有则从队列中得到一个节点并唤醒它。
以上便是ReentrantLock加锁解锁的整个流程,由源代码不难发现,ReentrantLock的底层全是由AQS实现。
最后以一张图作为总结:
文章来源: blizzawang.blog.csdn.net,作者:·wangweijun,版权归原作者所有,如需转载,请联系作者。
原文链接:blizzawang.blog.csdn.net/article/details/122100016
- 点赞
- 收藏
- 关注作者
评论(0)