Redis(三十四)-Redisson分布式锁看门狗
您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦。
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
❤️ 2. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 3. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 4. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门
😁 5. 社区逛一逛,周周有福利,周周有惊喜。码农飞哥社区,飞跃计划
全网同名【码农飞哥】欢迎关注,个人VX: wei158556
1. 简介
上一篇文章我们介绍了用如何用Redis做分布式锁Redis(三十二)-用Redis做分布式锁
在文章的末尾留下了个问题:
- 业务还没执行完,Redis分布式锁就过期了该怎么办?,由于我们给锁指定了过期时间,极有可能会出现业务还还没执行完,分布式锁就过期的情况。针对这种情况,我们该如何处理呢?
可能我们最先想到的方案就是:给分布式锁设置更长的有效时间,但是这只是治标不治本的一种方式。你无法保证业务流程在你设置的过期时间内就一定能执行完成。
针对这种情况,最好的方式就是在分布式锁快要过期,但是,自动延长分布式锁的过期时间。如果要我们来实现这个过程可能有点复杂。
还在已经有大佬帮我们实现了。我们只需要直接拿来用就好了。
现在就隆重请出 Redisson:它独有的看门狗(Watchdog)功能就可以帮我们轻易的实现。
2. Redisson怎么用
2.1. 引入依赖
<!--jedis客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.0</version>
</dependency>
<!--jedis客户端-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.0</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
2.2. 写个简单的demo测试
public class LockDemo {
private final RedissonClient redissonClient;
public LockDemo(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
LockDemo lockDemo = new LockDemo(Redisson.create(config));
lockDemo.reentrantLock();
new Thread(lockDemo::reentrantLock_expire, "线程一").start();
TimeUnit.SECONDS.sleep(30);
new Thread(lockDemo::reentrantLock_expire, "线程二").start();
TimeUnit.SECONDS.sleep(30);
}
/**
* 加锁,默认的时长是30秒,不指定超时时间
*/
public void reentrantLock() {
RLock lock = redissonClient.getLock("reentrant-lock-no-expire");
//没有获取到锁,返回
lock.lock();
long startTime = System.currentTimeMillis();
try {
// 模拟业务操作耗时
System.out.println("模拟业务开始");
TimeUnit.SECONDS.sleep(60);
System.out.println("模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
lock.unlock();
}
}
/**
* 加锁,指定锁的时长是25秒
*/
public void reentrantLock_expire() {
RLock lock = redissonClient.getLock("reentrant-lock-expire");
long startTime = System.currentTimeMillis();
lock.lock(25, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "获取到锁");
try {
// 模拟业务操作耗时
System.out.println(Thread.currentThread().getName() + "模拟业务开始");
TimeUnit.SECONDS.sleep(60);
System.out.println(Thread.currentThread().getName() + "模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
lock.unlock();
}
}
public void release() {
this.redissonClient.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
运行结果:
从上述运行结果可以看出,如下几个结论。
- Redisson的
lock()
方法不指定过期时间的话,默认的过期时间是30秒,当过期时间超过1/3时,看门狗会自动续期(比如过期时间是30秒,则在10s的时候,看门狗就会自动续期),续期后的锁的时长重新变成30s - Redisson的
lock(long leaseTime, TimeUnit unit)
方法指定过期时间时,当到达过期时间时锁会自动释放,也就是说在这种情况下,看门狗失效。 - Redisson的锁与线程相关,每个线程只能释放自己的锁,不能释放别的线程的锁。
3.源码分析
3.1. 获取锁
//不指定过期时间
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
//指定过期时间
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
这两个访问最终调用的都是lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法,唯一的区别是lock()
方法传入的是参数leaseTime值是-1,而lock(long leaseTime, TimeUnit unit)
传入的参数值是leaseTime。
让我们接着看看lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//获取锁,获取锁时带入线程ID
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired (获取到锁直接返回)
if (ttl == null) {
return;
}
//订阅锁,这样锁释放时会被通知到
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
//没获取到锁,死循环等待
.....
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
这个方法主要tryAcquire(-1, leaseTime, unit, threadId)
方法来获取锁,获取时传入了当前线程ID。而tryAcquire
方法内部主逻辑优势调用tryAcquireAsync方法。下面就直接查看tryAcquireAsync方法。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
//指定过期时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//不指定过期时间时,过期时间设为看门狗超时时间internalLockLeaseTime,然后由看门狗一直续期,直到锁释放
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired(获取到锁)
if (ttlRemaining == null) {
if (leaseTime != -1) {
//指定过期时间的话,则不续期
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 未指定过期时间,需要开启Watchdog自动续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
从这里面我们可以看出两个重要的信息,第一个信息是指定过期时间的话不续期,未指定过期时间的话,则会开启Watchdog自动续期。internalLockLeaseTime的默认时间是30秒。private long lockWatchdogTimeout = 30 * 1000;
首先看下尝试获取锁的实现,tryLockInnerAsync方法通过EVAL执行LUA脚本,代码如下:
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
它的主要逻辑是:
- 若锁不存在,则设置锁,并设置过期时间,然后返回nil。
- 若锁存在且由本线程持有,则锁计数加一,并重设过期时间,然后返回nil;
- 否则返回锁的过期时间;
然后,看下看门狗是如何给锁续期的呢?直接查看scheduleExpirationRenewal方法。
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
//重入加锁
oldEntry.addThreadId(threadId);
} else {
//第一次加锁,触发定时任务。
entry.addThreadId(threadId);
renewExpiration();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
接着看看renewExpiration这个方法,
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 借助Netty的Timeout实现自动续期
// 超时时间为1/3过期时间,确保在过期前能够重设过期时间
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
//过期时间超过1/3的话则会需求
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
续期的方法是renewExpirationAsync方法。这个方法也是一个LUA脚本,这个脚本的主要逻辑是,如果锁存在的话,则将过期时间重新设置为30s。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
参考
Redisson的“看门狗”机制,一个关于分布式锁的非比寻常的BUG
Redis学习之Redisson分布式锁看门狗
Redisson的看门狗watchDog机制是怎么实现的?
文章来源: feige.blog.csdn.net,作者:码农飞哥,版权归原作者所有,如需转载,请联系作者。
原文链接:feige.blog.csdn.net/article/details/125284423
- 点赞
- 收藏
- 关注作者
评论(0)