Redisson分布式锁实现原理解读
Redisson源码解读
加锁机制、可重入加锁
加锁其实通过一段lua脚本实现的,如下:
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
这里的keys[1]代表的是加锁的key,比如我们传入一个“mylock”
argv[1] 代表的是锁key的默认生存时间,默认为30s
argv[2] 代表的是加锁的客户端的id 285475da-9152-4c83-822a-67ee2f116a79:52
上面这段lua脚本的作用是:
-
通过
exist myLock
判断当前加锁的锁key是否存在,不存在就进行加锁 -
加锁:通过
hincrby
命令设置一个hash结构
类似于下面这种结构hincrby mylock 285475da-9152-4c83-822a-67ee2f116a79:52 1
-
设置锁的生存时间:
pexpire mylock 30000
-
自此加锁完成。
可重入锁的实现原理
加锁 和 可重入加锁都是通过 lua脚本实现,比如,客户端执行上面第一段脚本时,判断 key 是否存在,由于第一次已经加锁过了,
所以第一段就不走了,走第二段,判断当前 锁的hash结构里是不是 存在 一个 当前线程id,可以得出是存在的。
则会通过 hincrby mylock 线程id 1 对已经存在的值 加1
则这个redis 存储 的hash 线程对应的值 就是 锁的可重入次数。
hash key=锁名称 field 客户端id value 客户端加锁的次数
锁互斥机制
如果这时候另一个客户端来尝试加锁,首先第一个if判断 是否存在这个mylock key,紧接着第二个if判断,mylock锁key的
hash数据结构中,是否包含客户端2的ID,这里明显不是,所以客户端2执行
return redis.call('pttl', KEYS[1]);",
通过这个获取mylock这个锁的剩余时间。
我们可以看一下 Redisson tryLock的主流程:
尝试使用定义的leaseTime获取锁。 如有必要,最多等待定义的waitTime ,直到锁定可用。 锁定将在定义的leaseTime时间间隔后自动释放
true如果锁成功获取,否则false如果锁已设定。
/**
* waitTime: 获取锁的最大时间
* leaseTime: 加锁时间
* unit: 租期时间单位
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1. 尝试获取锁,根据当前线程id和约定的时间去获取 锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 剩余等待时间
time -= System.currentTimeMillis() - current;
// 如果剩余等待时间小于 0,则说明申请锁失败。
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅锁释放事件,通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 当await 返回 false 说明等待时间已经超出获取锁最大时间,取消订阅,返回失败
// await这块代码 没有找到合适的源码,看不太懂,是怎么阻塞的
// await的方法作用是 等待一段时间,获取操作结果,这里等待时间就是 剩余等待时间。
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
// 订阅成功后,开始通过while循环尝试尝试获取锁
try {
// 重新计算剩余等待时间
time -= System.currentTimeMillis() - current;
// 小于0 说明则说明超出,获取锁失败,直接返回false
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
// 收到锁释放的信号,在最大等待时间不断循环,尝试获取锁
// 一旦获取锁成功,马上返回true。
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
// 小于0 说明则说明超出最大等待时间,获取锁失败,直接返回false
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 如果锁的生存时间 大于0 小于 最新的最大等待时间
// 在允许的时间内尝试重复获取锁
if (ttl >= 0 && ttl < time) {
// 在ttl时间段内尝试获取锁
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 如果锁的存活时间大于 最大等待时间,则在最大等待时间获取锁,
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间,如果小于0 则说明获取锁失败,否则继续循环
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 无论如何最后都要取消订阅消息。
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
流程说明:
- 尝试获取锁,返回null,则说明获取锁成功。
- 如果获取失败,则通过订阅机制,订阅锁释放的事件。在最大等待时间内,只要订阅成功。就进入一个不断重试获取锁的循环,如果订阅失败,则返回一个false,获取锁失败。
- 循环中 先尝试获取锁,如果获取到,则直接返回,否则在最大可以操作的时间范文(tll 或者 最新的waitTIme)通过Semphore来阻塞线程,当锁释放后,信号量的release()方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
当锁正在被占用时,等待获取锁的进程并不是通过一个
while(true)
死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题。
锁续期机制
客户端1的加锁的锁key 默认生存时间为 30s,如果时间到了,客户端还想要持有这一把锁,怎么处理?锁的续期
redisson 通过 tryAcquire
方法尝试获取锁,tryAcquire
调用tryAcquireAsync
方法获取锁。
看门狗机制必须使用默认的加锁时间为30S
// 看门狗的配置
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
// leaseTime == -1 时表示开启看门狗机制
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 如果 EXPIRATION_RENEWAL_MAP 中不含有当前线程的id,将新建的entry放入map, 返回null。(首次获取锁)
// 如果已经有了,则取出这个key对应的数据。 返回 值(看门狗续时)
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 二次获取就不需要再开定时任务了
oldEntry.addThreadId(threadId);
} else {
// 首次获取锁,开启一个定时任务,进行续命
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 定时任务进行续命
// 每 internalLockReleaseTime / 3 执行一次, 也就是 30 / 3 = 10s
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 调用lua脚本进行 续命操作
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// 这里通过lua脚本又重新将这个mylock 执行力 pexire internalLockLeaseTime 秒
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
WatchDog机制说明:
watchdog其实就是在你获取到redisson锁之后,在后台开启了一个定时任务,会将获取到锁的线程的id相关数据放入到
EXPIRATION_RENEWAL_MAP,然后定时任务每隔 10s 去查一下,看看当前这个map中是否还有 对应的 value值数据。
【lua脚本实现】
如果有,则取出第一个 threadId,根据这个threadId去找对应锁的名称,(getLockName)。如果锁存在就延长锁的时间。
服务器如果宕机了,则watch dog 机制就没有了。
释放锁
redisson释放锁使用了 lock.unlock
方法
// 异步释放锁的代码
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 释放锁的核心代码
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 取消watch dog机制
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断锁 key 是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除
// 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
// getName、getChannelName、解锁消息、
}
释放锁流程分析:
- 判断key是否存在
- 将当前客户端对应的 锁的hash结构的 value 递减为0,
- 先删除锁,发送解锁消息,向通道 redisson_lock_channel 广播形式发送。
- 取消watchdog机制, 删除
EXPIRATION_RENEWAL_MAP
的线程id【cancelExpirationRenewal】,并且取消定时任务线程
- 点赞
- 收藏
- 关注作者
评论(0)