Redis进阶-细说分布式锁
Pre
Redis Version : 5.0.3
Redis进阶-核心数据结构进阶实战 中我们讲 strings 数据结构的时候,举了一个例子
事实上,要实现一把相对完善的分布式锁,需要注意的细节还是蛮多的,这里我们好好的梳理一把。
引
我们先来看段代码
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
}
- 1
- 2
- 3
- 4
- 5
redis中提前存储了一个key stock , value为 100
上述代码有问题吗?
是不是我们熟悉的超卖问题?
为啥会超卖? 假设同时有两个线程都执行到了 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
, 比如都取到了stock为 100 , 然后继续执行后面的业务逻辑,到最后将扣减后的值set到redis中,应该剩98吧, 事实上呢? 你库存里的值是 99个… 卖到最后,是不是卖多了? 。。。。
那怎么办呢? 没有分布式经验的童鞋,可能会说 加把锁啊 云云
加锁后 变成了啥呢?
synchronized(this){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
那 这样的代码还有问题吗?
- 性能问题
- 更为重要的是,如果你的应用是集群模式,好比 你有N个tomcat, 用户通过NG地址访问,你想想你的这个JVM级别的锁 ,还有啥用,一样会超卖…
这个时候你需要一把分布式锁,这里我们讨论的是如何使用redis实现分布式锁
分布式锁演进 V1
来, 上代码
String key = "STOCK_LOCK";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key,"ARTISAN_LOCK");
if (!result){ // 如果未获取到锁,直接返回
return "1001";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock + "");
}
stringRedisTemplate.delete(key);
return "扣减成功";
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
我们来分析下, stringRedisTemplate.opsForValue().setIfAbsent(key,"ARTISAN_LOCK");
这行代码就保证了只有一个线程能set成功 (redis 的工作线程是单线程的嘛 ), setIfAbsent 不存在才设置,如果有一个线程设置成功了,在这个线程未释放之前,其他线程是无法set成功的,所以其他线程返回false,直接return了。
分布式锁演进 V2
那这个代码严谨吗? ---------> 有的同学说,你这个中间要是出异常了,没有执行 stringRedisTemplate.delete(key);
,那岂不是这把锁释放不了了,死锁了呀? 要不try catch finally ?
那代码变成如下
那,这样就完美了吗? 抛出异常的场景我们是处理了,在finally里释放。
分布式锁演进 V3
那假设在运行的过程中,还没有执行到finally , 这个时候tomcat挂了,但是锁已经set到redis里了 咋办? --------》 有的同学说, 简单啊 加个超时时间呗。
那还有问题吗? ----------》如果宕机时间发生在
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key,"ARTISAN_LOCK");
stringRedisTemplate.expire(key,5000,TimeUnit.SECONDS);
- 1
- 2
- 3
这两行代码之间,有怎么办? … 不会这么巧吧 …但理论上是存在的
继续聊
分布式锁演进 V4
本质上: 要把set key和 设置过期时间 搞成一个原子命令 .
低版本的Redis,你可能需要lua脚本,但是现在Redis提供了setnx 命令, spring也帮我们封装好了
最关键的一行代码
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key,"ARTISAN_LOCK",10,TimeUnit.SECONDS);
- 1
- 2
- 3
代码就变成了
对于一般的应用,并发不是很高,这个也足够用了,因为简单啊
但是如果在高并发下,那还有问题吗? 这样就满足所有场景了吗 ?
我们在设置key的时候,给key设置的过期时间是 10秒 ,也就说 10秒后,这个key会被redis给删除掉, 假设你的这个业务执行了15秒才执行完。当前业务还未执行结束,第二个线程的请求已经过来了,它也能加锁成功。 第二个线程继续执行,执行了5秒,你的第一个线程也执行完了,最后一步 删除key , 那第一个线程就把第二个线程加的锁给删掉了啊。。。。。
删了别的线程加的锁,并发一高,你这个锁就没啥用了哇。。。所以 还有另外一个原则: 加锁和解锁必须是同一个线程 .
分布式锁演进 V5
加锁和解锁必须是同一个线程 . 实现的话也简单,value 不写死,写成一个线程ID或者随机数等等 都行,删除key的时候,比较下,相等的话才删除
根据V4存在的问题,我们来看下代码
那有的童鞋会问,如果 在finally 中 执行到if 挂了。。。并没有执行delete咋办? 理论上是有可能发生的, 其实也不要紧,我们set key的时候,设置了一个超时时间, 那最多锁10秒嘛 ,不会死锁。 也能接受。
如果你非得要想改这个地方,把查询和delete弄成一个原子命令,lua脚本就排上用场了。
这里我们不展开了。
到这里,一把相对完善的锁,就OK了。
关于到底设置多长的过期时间合适, 这个不好讲了, 1秒中是长是短 ,1分钟呢? 要权衡一下。 那有没有更好的办法呢?
终极版-分布式锁演进(Redisson ) V6
针对v5中存在的问题, 虽然解决了 加锁和解锁都是同一个线程, 但是还是有点小bug , 比如 你给key设置了过期时间为10秒, 但你的方法执行了15秒,方法还没执行完,锁已经被redis干掉了。。。另外一个线程就可以拿到锁,继续干活了。 多个线程同时执行,还是有潜在的bug出现。
超时的问题,你设置多长时间都不合适…
真的要彻底解决,咋弄呢? -------》 可不可以给锁续命? 没执行完就给锁延期呗。 说起来简单,实现起来有点复杂了。。。
简单来说,后台弄个定时任务,检测这个锁是否存在,存在的话延长时间,不存在的话就是被删掉了,不考虑即可。
好在Redisson提供了这个牛逼的功能。
Code
@Bean
public Redisson redisson() {
// 此为单机模式
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.18.130:6379").
setConnectionMinimumIdleSize(10).setDatabase(0);
/*config.useClusterServers()
.addNodeAddress("redis://192.168.0.61:8001")
.addNodeAddress("redis://192.168.0.62:8002")
.addNodeAddress("redis://192.168.0.63:8003")
.addNodeAddress("redis://192.168.0.61:8004")
.addNodeAddress("redis://192.168.0.62:8005")
.addNodeAddress("redis://192.168.0.63:8006");*/
return (Redisson) Redisson.create(config);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
@RequestMapping("/deduct_stock")
public String deductStock() throws InterruptedException {
String lockKey = "STOCK_LOCK";
// 获取锁
RLock redissonLock = redisson.getLock(lockKey);
try {
// 加锁,实现锁续命功能
redissonLock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock + "");
}
}finally {
// 释放锁
redissonLock.unlock();
}
return "扣减成功";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
总结一下 三部曲
- 第一步:获取锁 RLock redissonLock = redisson.getLock(lockKey);
- 第二步: 加锁,实现锁续命功能 redissonLock.lock();
- 第三步:释放锁 redissonLock.unlock();
Redisson分布式锁实现原理
源码分析
Redis进阶- Redisson分布式锁实现原理及源码解析
文章来源: artisan.blog.csdn.net,作者:小小工匠,版权归原作者所有,如需转载,请联系作者。
原文链接:artisan.blog.csdn.net/article/details/105353875
- 点赞
- 收藏
- 关注作者
评论(0)