Java中的锁 读写锁-ReentrantReadWriteLock

举报
李子捌 发表于 2021/10/19 14:29:54 2021/10/19
【摘要】 为何要有读写锁ReentrantLock锁和其他锁基本上都是排他锁,排他锁指的是在同一时刻只允许一个线程持有锁,访问共享资源。虽然ReentrantLock支持同一个线程重入,但是允许重入的是同一个线程。因此ReentrantReadWriteLock是为了支持多个线程在同一个时刻对于锁的访问孕育而生的。读写锁—简单介绍读写锁ReentrantReadWriteLock允许多个线程在同一时刻...


为何要有读写锁

ReentrantLock锁和其他锁基本上都是排他锁,排他锁指的是在同一时刻只允许一个线程持有锁,访问共享资源。虽然ReentrantLock支持同一个线程重入,但是允许重入的是同一个线程。因此ReentrantReadWriteLock是为了支持多个线程在同一个时刻对于锁的访问孕育而生的。


读写锁—简单介绍

读写锁ReentrantReadWriteLock允许多个线程在同一时刻对于锁的访问。但是,写线程获取写锁时,所有的读线程和其他写线程均会被阻塞。读写锁是通过维护一对锁(一个读锁、一个写锁)来实现的,读写锁在很多读多于写得场景中有很大的性能提升。举个例子,当我们对数据库中的一条记录进行读取时,不应该阻塞其他读取这条数据的线程,而如果是有线程对该记录进行写操作,数据库就应该阻止其他线程对这条数据的读取和写入,这种场景就可以用类似读写锁的方式来处理。


特性


使用示例

通过ReentrantReadWriteLock来实现一个线程安全的简单内存缓存设计,通过给缓存获取get(String key)方法加上读锁,允许其他线程在同一个时刻进行数据读取;给put(String key,String value)方法加上写锁,禁止在对缓存写入的时候其他线程对于缓存的读取和写入操作。

package com.lizba.p6;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * <p>
 *      使用ReentrantReadWriteLock实现一个简单的线程安全基于map的缓存设计
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/22 22:11
 */
public class CacheDemo {

    /** 存储数据容器 */
    private static Map<String, Object> cache = new HashMap<>();
    /** 读写锁ReentrantReadWriteLock */
    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    /** 读锁 */
    static Lock readLock = lock.readLock();
    /** 写锁 */
    static Lock writeLock = lock.writeLock();

    /**
     * 获取数据,使用读锁加锁
     * @param key
     * @return
     */
    public static Object get(String key) {
        readLock.lock();
        try {
            return cache.get(key);
        } finally {
            readLock.unlock();
        }
    }

    /**
     * 设置key&value,使用写锁,禁止其他线程的读取和写入
     * @param key
     * @param value
     * @return
     */
    public static void put(String key, Object value) {
        writeLock.lock();
        try {
            cache.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 清空缓存
     */
    public static void flush() {
        writeLock.lock();
        try {
            cache.clear();
        } finally {
            writeLock.unlock();
        }
    }

}


实现分析

ReentrantReadWriteLock是基于AQS来实现的,我们知道AQS是以单个int类型的原子变量来表示其状态的,那么一个状态如何即表示读锁又表示写锁呢?观察源码发现(后续会有源码解析)ReentrantReadWriteLock通过使用一个32位的整型变量的高16位表示读,低16位表示写来维护读线程和写线程对于锁的获取情况。此时部分读者可能又会产生一个疑问,写锁是互斥的,只支持有一个线程持有写锁,用低16位记录同一个线程对写锁的多次重入是没问题的;那么对于允许多个线程获取的读锁,高16位又是如何维护持有锁的线程数和单个线程对读锁的获取次数的呢?其实这里引入了ThreadLocal来记录单个线程自己对于读锁的获取次数,高16位存储的是获取读锁的线程的个数。这里只是简单说下疑问,后续源码会有详细分析。我们先来看看这个int类型的变量的划分方式。

如上个这个图写状态=1,表示当前线程获取了写锁,读状态=2,表示该线程同时获取了两次读锁。使用高低位来表示读写状态,那么状态的获取和设值是如何实现的呢?


写状态的get()

此时假设同步状态,也就是32的整型变量的值为S,写状态的获取方式为S&0x0000FFFF,这种办法会保留低16位的同时抹去高16位,也就能计算出写状态的获取情况。


写状态的set()

设值方式相对简单,直接+1即可,即S = S + 1


读状态的get()

读状态的获取需要读取高16位,此时采取的方法是S>>>16(无符号右移16位,高位补0),这样就能等到读状态。


读状态的set()

读状态增加1,计算方式为S+(1<<16),相当于S+0x00010000


源码分析

源码分析主要分为四块,分别是ReentrantReadWriteLock中一个int分为两半计算方式、写锁的获取与释放、读锁的获取与释放以及锁降级这几个方面来展开。


1、一个int分为两半计算方式

int拆分为高16位和低16位的计算是在ReentrantReadWriteLock的内部类在Sync中实现的,其主要核心如下

/**
 * Sync是ReentrantReadWriteLock的内部类,它继承了AQS,其实现有FairSync和NonfairSync
 */
abstract static class Sync extends AbstractQueuedSynchronizer {

    /** 定义位移数变量16 */
    static final int SHARED_SHIFT   = 16;
    /** 读锁是高16位,读锁加1需要加2^16,也就是 1 << 16 */
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    /** 读锁最大线程获取数和写锁最大可重入次数 65535*/
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    /** 写锁是低16位,(1 << 16) - 1为写锁的掩码,用于计算写锁的重入次数 */
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** 计算读锁持有的线程数 c >>> 16  */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 计算写锁持有的线程重入次数 c & 0x0000FFFF */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
2、写锁的获取与释放

WriteLock写锁是一个支持重入的排它锁,它的获取与释放通过tryAcquire(int arg)和tryRelease(int arg)来实现,获取到写锁的条件是,当前读锁未被获取或者当前线程是持有写锁的线程,否则获取失败进入等待状态。

写锁获取

/**
 *	tryAcquire方法实现在Sync中
 */
protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取当前状态值
    int c = getState();
    // 计算当前写锁的重入次数
    int w = exclusiveCount(c);
    // 如果状态不为0,表示锁已经被某个线程获取
    if (c != 0) {
        // 如果写锁未被获取(因为c!=0那就是存在读锁),或者持有锁的线程和当前线程不是同一个线程,那么获取锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 否则为重入
        // 判断是否超过写锁重入的最大值,超过则直接抛出异常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 设置重入次数,递增,返回重入锁成功
        setState(c + acquires);
        return true;
    }
    // 如果状态为0
    // writerShouldBlock()的实现由FairSync和NonfairSync实现
    // FairSync中需判断当前节点是否有前驱节点是否需要阻塞
    // NonfairSync中写锁则不需要阻塞,默认优先
    // 如果不需要阻塞,则CAS更新状态的值
    // 需要阻塞或者更新失败均返回false
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 更新成功则设置当前独占锁的持有线程为当前线程
    setExclusiveOwnerThread(current);
    return true;
}


写锁释放

/**
 *	tryRelease方法实现在Sync中
 */
protected final boolean tryRelease(int releases) {
    // 判断当前线程是否是独占锁的持有线程,不是则抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 计算需要更新的状态值
    int nextc = getState() - releases;
    // 判断释放后写锁是否完全被释放也就是重入的每一次都已经退出了
    boolean free = exclusiveCount(nextc) == 0;
    // 如果写锁完全释放,则需要清空写锁的持有线程引用置为null
    if (free)
        setExclusiveOwnerThread(null);
    // 更新状态值
    setState(nextc);
    return free;
}


3、读锁的获取与释放

读锁的获取与释放相比写锁的获取与释放相对来说要复杂一些,因为它是支持重入的共享锁,它能被多个线程同时获取,因此我们不仅需要记录获取读锁的每一个线程,同时需要记录每个线程对于读锁的重入次数,因此我们首先来看读锁的重入计数。


读锁的重入计数

读锁的重入计数,巧妙的使用每个线程自己来记录,通过存在在ThreadLocal中的HoldCounter中的变量count来增加和减少,其是线程隔离的因此也是线程安全的,具体实现在Sync中。

/**
 * Sync是ReentrantReadWriteLock的内部类,它继承了AQS,其实现有FairSync和NonfairSync
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    
    /**
     * 用于每个线程读锁重入次数记录,线程安全
     */
    static final class HoldCounter {
        int count = 0;
        // 持有线程ID
        final long tid = getThreadId(Thread.currentThread());
    }

	/**
     *  继承自ThreadLocal,重写initialValue(),重写为了更方便使用和初始化
     */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

	// 构造函数中初始化
    private transient ThreadLocalHoldCounter readHolds;
	//	缓存最近获取的HoldCounter,也是为了性能开销
    private transient HoldCounter cachedHoldCounter;
   
    /**
     *	firstReader和firstReaderHoldCount是为了在无竞争读锁的情况下计数和获取更加便宜
     */
    // 记录第一个读锁获取的线程
    private transient Thread firstReader = null;
    // 记录第一个读线程重入次数
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        // 这里也比较巧妙,利用volatile的内存语义,来保证ThreadLocalHoldCounter()实例化时的内存可见性
        setState(getState()); 
    }
}

读锁获取

 /*
  * tryAcquireShared方法实现在Sync中,unused如其定义未被使用
  */
protected final int tryAcquireShared(int unused) {
	// 获取当前线程
    Thread current = Thread.currentThread();
    // 获取同步状态值
    int c = getState();
    // exclusiveCount(c)获取写锁的值,如果不为0表示写锁已被持有
    // getExclusiveOwnerThread(),如果是写锁则需要判断当前线程和持有写锁(互斥锁)的线程是否相等
    // 因为写锁被允许获取读锁,如果不是同一个线程则直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 获取读锁持有的线程数
    int r = sharedCount(c);
    // readerShouldBlock() 有两个实现分别是FairSync和NonfairSync,两种的获取锁策略实现不一样,用于判断是否获取读锁
    // r < MAX_COUNT 判断读锁获取线程数是否小于最大值
    // compareAndSetState(c, c + SHARED_UNIT) 尝试获取读锁
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果获取所成功
        // 当前线程为第一个获取锁的线程,并且是第一次
        if (r == 0) {
            // 设置当前线程为firstReader
            firstReader = current;
            // 设置当前线程重入读锁的次数为1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {  // 如果不是第一次,但是是第一个获取读锁的线程
            // 当前线程重入读锁的次数为累加1
            firstReaderHoldCount++;
        } else {
            // 获取缓存cachedHoldCounter
            HoldCounter rh = cachedHoldCounter;
            // 如果缓存为空,或者缓存的线程ID与当前线程ID不一致
            if (rh == null || rh.tid != getThreadId(current))
                // 从ThreadLocalHoldCounter中读取当前线程的读锁重入次数,设置缓存
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0) // 缓存存在,但是重入次数为空
                // 设置当前线程readHolds中的HoldCounter
                readHolds.set(rh);
            // 重入次数+1
            rh.count++;
        }
        return 1;
    }
    // 获取读锁失败,自旋重试
    return fullTryAcquireShared(current);
}

读锁释放

/**
 * tryReleaseShared方法实现在Sync中
 */
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 当前线程为第一个持有读锁的线程,则清除firstReader或者计数自减1
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 缓存是否命中,没命中则从readHolds获取HoldCounter
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        // 获取当前线程的重入次数
        int count = rh.count;
        // <=1移除计数(读锁完全释放了),<= 0抛出异常
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 计数递减
        --rh.count;
    }
    // 循环CAS更新state的状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
4、锁降级

在ReentrantReadWriteLock读写锁中,写锁可以降级为读锁这称之为锁降级,但是读锁是不能升级为写锁的。锁的降级不能在释放写锁后再去获取读锁,而应该在获取到写锁(拥有写锁)的时候去获取读锁,随后再释放写锁,最后在释放读锁,这才是正确的方式。


一个锁降级的伪代码:

volatile boolean flag = false;
// User对象模拟操作的共享资源
User data = null;
// 获取写锁
writeLock.lock();
try {
	if(!flag) {
    	// 共享资源处理
        data = new User();
        flag = true;
    }
    readLock.lock(); // 获取读锁,此时写锁并未释放
} finally {
	writeLock.unlock(); // 此时可以安全释放写锁
}

// 写锁释放之前获取了读锁,这个被视为锁降级
try {
    // 共享资源操作
	String name = user.getName();
} finally {
	readLock.unlock(); // 读锁释放
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。