Java中的锁 读写锁-ReentrantReadWriteLock
为何要有读写锁
ReentrantLock锁和其他锁基本上都是排他锁,排他锁指的是在同一时刻只允许一个线程持有锁,访问共享资源。虽然ReentrantLock支持同一个线程重入,但是允许重入的是同一个线程。因此ReentrantReadWriteLock是为了支持多个线程在同一个时刻对于锁的访问孕育而生的。
读写锁—简单介绍
读写锁ReentrantReadWriteLock允许多个线程在同一时刻对于锁的访问。但是,写线程获取写锁时,所有的读线程和其他写线程均会被阻塞。读写锁是通过维护一对锁(一个读锁、一个写锁)来实现的,读写锁在很多读多于写得场景中有很大的性能提升。举个例子,当我们对数据库中的一条记录进行读取时,不应该阻塞其他读取这条数据的线程,而如果是有线程对该记录进行写操作,数据库就应该阻止其他线程对这条数据的读取和写入,这种场景就可以用类似读写锁的方式来处理。
特性
使用示例
通过ReentrantReadWriteLock来实现一个线程安全的简单内存缓存设计,通过给缓存获取get(String key)方法加上读锁,允许其他线程在同一个时刻进行数据读取;给put(String key,String value)方法加上写锁,禁止在对缓存写入的时候其他线程对于缓存的读取和写入操作。
package com.lizba.p6;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* <p>
* 使用ReentrantReadWriteLock实现一个简单的线程安全基于map的缓存设计
* </p>
*
* @Author: Liziba
* @Date: 2021/6/22 22:11
*/
public class CacheDemo {
/** 存储数据容器 */
private static Map<String, Object> cache = new HashMap<>();
/** 读写锁ReentrantReadWriteLock */
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/** 读锁 */
static Lock readLock = lock.readLock();
/** 写锁 */
static Lock writeLock = lock.writeLock();
/**
* 获取数据,使用读锁加锁
* @param key
* @return
*/
public static Object get(String key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
/**
* 设置key&value,使用写锁,禁止其他线程的读取和写入
* @param key
* @param value
* @return
*/
public static void put(String key, Object value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
/**
* 清空缓存
*/
public static void flush() {
writeLock.lock();
try {
cache.clear();
} finally {
writeLock.unlock();
}
}
}
实现分析
ReentrantReadWriteLock是基于AQS来实现的,我们知道AQS是以单个int类型的原子变量来表示其状态的,那么一个状态如何即表示读锁又表示写锁呢?观察源码发现(后续会有源码解析)ReentrantReadWriteLock通过使用一个32位的整型变量的高16位表示读,低16位表示写来维护读线程和写线程对于锁的获取情况。此时部分读者可能又会产生一个疑问,写锁是互斥的,只支持有一个线程持有写锁,用低16位记录同一个线程对写锁的多次重入是没问题的;那么对于允许多个线程获取的读锁,高16位又是如何维护持有锁的线程数和单个线程对读锁的获取次数的呢?其实这里引入了ThreadLocal来记录单个线程自己对于读锁的获取次数,高16位存储的是获取读锁的线程的个数。这里只是简单说下疑问,后续源码会有详细分析。我们先来看看这个int类型的变量的划分方式。
如上个这个图写状态=1,表示当前线程获取了写锁,读状态=2,表示该线程同时获取了两次读锁。使用高低位来表示读写状态,那么状态的获取和设值是如何实现的呢?
写状态的get()
此时假设同步状态,也就是32的整型变量的值为S,写状态的获取方式为S&0x0000FFFF,这种办法会保留低16位的同时抹去高16位,也就能计算出写状态的获取情况。
写状态的set()
设值方式相对简单,直接+1即可,即S = S + 1
读状态的get()
读状态的获取需要读取高16位,此时采取的方法是S>>>16(无符号右移16位,高位补0),这样就能等到读状态。
读状态的set()
读状态增加1,计算方式为S+(1<<16),相当于S+0x00010000
源码分析
源码分析主要分为四块,分别是ReentrantReadWriteLock中一个int分为两半计算方式、写锁的获取与释放、读锁的获取与释放以及锁降级这几个方面来展开。
1、一个int分为两半计算方式
int拆分为高16位和低16位的计算是在ReentrantReadWriteLock的内部类在Sync中实现的,其主要核心如下
/**
* Sync是ReentrantReadWriteLock的内部类,它继承了AQS,其实现有FairSync和NonfairSync
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
/** 定义位移数变量16 */
static final int SHARED_SHIFT = 16;
/** 读锁是高16位,读锁加1需要加2^16,也就是 1 << 16 */
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** 读锁最大线程获取数和写锁最大可重入次数 65535*/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/** 写锁是低16位,(1 << 16) - 1为写锁的掩码,用于计算写锁的重入次数 */
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 计算读锁持有的线程数 c >>> 16 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 计算写锁持有的线程重入次数 c & 0x0000FFFF */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
2、写锁的获取与释放
WriteLock写锁是一个支持重入的排它锁,它的获取与释放通过tryAcquire(int arg)和tryRelease(int arg)来实现,获取到写锁的条件是,当前读锁未被获取或者当前线程是持有写锁的线程,否则获取失败进入等待状态。
写锁获取
/**
* tryAcquire方法实现在Sync中
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前状态值
int c = getState();
// 计算当前写锁的重入次数
int w = exclusiveCount(c);
// 如果状态不为0,表示锁已经被某个线程获取
if (c != 0) {
// 如果写锁未被获取(因为c!=0那就是存在读锁),或者持有锁的线程和当前线程不是同一个线程,那么获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 否则为重入
// 判断是否超过写锁重入的最大值,超过则直接抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 设置重入次数,递增,返回重入锁成功
setState(c + acquires);
return true;
}
// 如果状态为0
// writerShouldBlock()的实现由FairSync和NonfairSync实现
// FairSync中需判断当前节点是否有前驱节点是否需要阻塞
// NonfairSync中写锁则不需要阻塞,默认优先
// 如果不需要阻塞,则CAS更新状态的值
// 需要阻塞或者更新失败均返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 更新成功则设置当前独占锁的持有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
写锁释放
/**
* tryRelease方法实现在Sync中
*/
protected final boolean tryRelease(int releases) {
// 判断当前线程是否是独占锁的持有线程,不是则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算需要更新的状态值
int nextc = getState() - releases;
// 判断释放后写锁是否完全被释放也就是重入的每一次都已经退出了
boolean free = exclusiveCount(nextc) == 0;
// 如果写锁完全释放,则需要清空写锁的持有线程引用置为null
if (free)
setExclusiveOwnerThread(null);
// 更新状态值
setState(nextc);
return free;
}
3、读锁的获取与释放
读锁的获取与释放相比写锁的获取与释放相对来说要复杂一些,因为它是支持重入的共享锁,它能被多个线程同时获取,因此我们不仅需要记录获取读锁的每一个线程,同时需要记录每个线程对于读锁的重入次数,因此我们首先来看读锁的重入计数。
读锁的重入计数
读锁的重入计数,巧妙的使用每个线程自己来记录,通过存在在ThreadLocal中的HoldCounter中的变量count来增加和减少,其是线程隔离的因此也是线程安全的,具体实现在Sync中。
/**
* Sync是ReentrantReadWriteLock的内部类,它继承了AQS,其实现有FairSync和NonfairSync
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 用于每个线程读锁重入次数记录,线程安全
*/
static final class HoldCounter {
int count = 0;
// 持有线程ID
final long tid = getThreadId(Thread.currentThread());
}
/**
* 继承自ThreadLocal,重写initialValue(),重写为了更方便使用和初始化
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 构造函数中初始化
private transient ThreadLocalHoldCounter readHolds;
// 缓存最近获取的HoldCounter,也是为了性能开销
private transient HoldCounter cachedHoldCounter;
/**
* firstReader和firstReaderHoldCount是为了在无竞争读锁的情况下计数和获取更加便宜
*/
// 记录第一个读锁获取的线程
private transient Thread firstReader = null;
// 记录第一个读线程重入次数
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
// 这里也比较巧妙,利用volatile的内存语义,来保证ThreadLocalHoldCounter()实例化时的内存可见性
setState(getState());
}
}
读锁获取
/*
* tryAcquireShared方法实现在Sync中,unused如其定义未被使用
*/
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态值
int c = getState();
// exclusiveCount(c)获取写锁的值,如果不为0表示写锁已被持有
// getExclusiveOwnerThread(),如果是写锁则需要判断当前线程和持有写锁(互斥锁)的线程是否相等
// 因为写锁被允许获取读锁,如果不是同一个线程则直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁持有的线程数
int r = sharedCount(c);
// readerShouldBlock() 有两个实现分别是FairSync和NonfairSync,两种的获取锁策略实现不一样,用于判断是否获取读锁
// r < MAX_COUNT 判断读锁获取线程数是否小于最大值
// compareAndSetState(c, c + SHARED_UNIT) 尝试获取读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果获取所成功
// 当前线程为第一个获取锁的线程,并且是第一次
if (r == 0) {
// 设置当前线程为firstReader
firstReader = current;
// 设置当前线程重入读锁的次数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 如果不是第一次,但是是第一个获取读锁的线程
// 当前线程重入读锁的次数为累加1
firstReaderHoldCount++;
} else {
// 获取缓存cachedHoldCounter
HoldCounter rh = cachedHoldCounter;
// 如果缓存为空,或者缓存的线程ID与当前线程ID不一致
if (rh == null || rh.tid != getThreadId(current))
// 从ThreadLocalHoldCounter中读取当前线程的读锁重入次数,设置缓存
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 缓存存在,但是重入次数为空
// 设置当前线程readHolds中的HoldCounter
readHolds.set(rh);
// 重入次数+1
rh.count++;
}
return 1;
}
// 获取读锁失败,自旋重试
return fullTryAcquireShared(current);
}
读锁释放
/**
* tryReleaseShared方法实现在Sync中
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 当前线程为第一个持有读锁的线程,则清除firstReader或者计数自减1
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 缓存是否命中,没命中则从readHolds获取HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 获取当前线程的重入次数
int count = rh.count;
// <=1移除计数(读锁完全释放了),<= 0抛出异常
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 计数递减
--rh.count;
}
// 循环CAS更新state的状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
4、锁降级
在ReentrantReadWriteLock读写锁中,写锁可以降级为读锁这称之为锁降级,但是读锁是不能升级为写锁的。锁的降级不能在释放写锁后再去获取读锁,而应该在获取到写锁(拥有写锁)的时候去获取读锁,随后再释放写锁,最后在释放读锁,这才是正确的方式。
一个锁降级的伪代码:
volatile boolean flag = false;
// User对象模拟操作的共享资源
User data = null;
// 获取写锁
writeLock.lock();
try {
if(!flag) {
// 共享资源处理
data = new User();
flag = true;
}
readLock.lock(); // 获取读锁,此时写锁并未释放
} finally {
writeLock.unlock(); // 此时可以安全释放写锁
}
// 写锁释放之前获取了读锁,这个被视为锁降级
try {
// 共享资源操作
String name = user.getName();
} finally {
readLock.unlock(); // 读锁释放
}
- 点赞
- 收藏
- 关注作者
评论(0)