redis 极简分布式锁实现
写在前面
-
工作中遇到,整理 reids
做简单分布式锁的思考 -
博文适合刚接触 redis
的小伙伴 -
理解不足小伙伴帮忙指正
对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
假设现在有这样一个需求,需要做排队预约
住宿的功能,当前宿舍住满了,有新的同学需要来入住,可以进行排队预约,排队编号通过累加的方式生成
我们设计这样一张数据表
CREATE TABLE `ams_student_queue_check_in_sync` (
`queue_check_in_id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '学生队列ID',
`student_name` VARCHAR(50) NOT NULL COMMENT '学生姓名' COLLATE 'utf8mb4_general_ci',
`student_uid` VARCHAR(50) NULL DEFAULT NULL COMMENT '学生uid' COLLATE 'utf8mb4_general_ci',
`student_card` VARCHAR(30) NULL DEFAULT NULL COMMENT '学生身份证号' COLLATE 'utf8mb4_general_ci',
`student_contact_number` VARCHAR(20) NOT NULL COMMENT '学生联系电话' COLLATE 'utf8mb4_general_ci',
`student_email` VARCHAR(50) NULL DEFAULT NULL COMMENT '学生电子邮件地址' COLLATE 'utf8mb4_general_ci',
`student_gender` TINYINT(4) NOT NULL DEFAULT '0' COMMENT '学生性别',
`student_emergency_contact_name` VARCHAR(100) NULL DEFAULT NULL COMMENT '第二联系人姓名' COLLATE 'utf8mb4_general_ci',
`student_emergency_contact_number` VARCHAR(20) NULL DEFAULT NULL COMMENT '第二联系人电话' COLLATE 'utf8mb4_general_ci',
`student_status` TINYINT(4) NULL DEFAULT '1' COMMENT '学生排队状态(1.待入住,2.以入住 3.以取消)',
`arrival_dates` DATETIME NULL DEFAULT NULL COMMENT '预计入住时间',
`departure_dates` DATETIME NULL DEFAULT NULL COMMENT '预计离开日期',
`queue_position` INT(11) NULL DEFAULT NULL COMMENT '学生在排队中的位置',
`check_in_remark` TEXT NULL DEFAULT NULL COMMENT '备注' COLLATE 'utf8mb4_general_ci',
`extended1` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段1' COLLATE 'utf8mb4_general_ci',
`extended2` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段2' COLLATE 'utf8mb4_general_ci',
`extended3` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段3' COLLATE 'utf8mb4_general_ci',
`created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`queue_check_in_id`) USING BTREE,
INDEX `student_uid` (`student_uid`) USING BTREE
)
COMMENT='入住排队表'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=1363
;
queue_position
为每一位同学的排队编号,需要根据当前的学生编号最大来累加
下面为实现的基础代码
@ApiOperation("入住排队接口")
@PostMapping("/checkInQueue")
@Transactional
public AjaxResult checkInQueue( @RequestHeader("UID") String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn){
if (Objects.isNull(uid)){
return AjaxResult.error("Uid 为空");
}
if (Objects.isNull(amsStudentQueueCheckIn.getStudentEmergencyContactNumber())){
return AjaxResult.error("电话号为空");
}
StringBuilder stringBuilder = new StringBuilder();
String studentContactNumber = amsStudentQueueCheckIn.getStudentContactNumber();
List<AmsStudentQueueCheckIn> amsStudentQueueCheckIns1 = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInList(new AmsStudentQueueCheckIn().setStudentContactNumber(studentContactNumber));
Integer count = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListCount(amsStudentQueueCheckIn.getStudentGender());
if (Objects.nonNull(amsStudentQueueCheckIns1) && amsStudentQueueCheckIns1.size() !=0 ){
stringBuilder.append("已经排队预约啦,请耐心等待 ^_^")
.append(", 预约编号为 " ).append(amsStudentQueueCheckIns1.get(0).getQueuePosition())
.append(", 前面还有 ").append(count - 1).append( " 人");
return AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition",amsStudentQueueCheckIns1.get(0).getQueuePosition(),"beforePeopleBumber",count -1 ));
}
AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender());
Long queuePosition = 0L;
if (Objects.nonNull(amsStudentQueueCheckIns)){
queuePosition = amsStudentQueueCheckIns.getQueuePosition()
}
amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(queuePosition + 1L).setStudentUid(uid);
amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(amsStudentQueueCheckIns.getQueuePosition() + 1L).setStudentUid(uid);
int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn);
if (i != 1){
return AjaxResult.error("排队预约失败!");
}
stringBuilder.append("排队预约成功")
.append(", 预约编号为 " ).append(amsStudentQueueCheckIn.getQueuePosition())
.append(", 前面还有 ").append(count).append( " 人");
return AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition",amsStudentQueueCheckIn.getQueuePosition(),"beforePeopleBumber",count));
}
逻辑比较简单,拿到数据,获取编号最大值累加,数据落表,但是上面的代码存在一个问题,因为是 Springboot
项目,使用 tomcat
部署,Spring Boot 嵌入的 Tomcat 默认启用 Http11NioProtocol
,可以切换日志级别为 Debug 可看到
Http11NioProtocol
表示多线程非阻塞模式的HTTP协议的通信
(web 服务端网络IO处理模型包括:单(多)线程阻塞(非阻塞)IO模型)。
# 日志级别 Debug
# 日志配置
logging:
level:
root: debug
11:42:51.810 [restartedMain] INFO o.a.c.h.Http11NioProtocol - [log,173] - Initializing ProtocolHandler ["http-nio-8080"]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [Connector[HTTP/1.1-8080]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [STARTING_PREP]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [STARTING]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING_PREP]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTED]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [STARTING_PREP]
11:42:51.812 [restartedMain] INFO o.a.c.c.StandardService - [log,173] - Starting service [Tomcat]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [STARTING]
11:42:51.813 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardEngine[Tomcat]] to [STARTING_PREP]
11:42:51.813 [restartedMain] INFO o.a.c.c.StandardEngine - [log,173] - Starting Servlet engine: [Apache Tomcat/9.0.75]
可以看到 spring-boot-starter-web
嵌入的 9.0.75
版本的 tomcat
,Tomcat 从 8.5
版本开始移除了 BIO
,默认启用 NIO
下图为从套接字连接接收、处理请求、响应客户端的整个过程
《Tomcat内核设计剖析》所以当多个排队请求并发调用接口时,不同的线程会分别进入方法,这个时候有可能会从数据库获取相同的排队编号进行累加,同时生成相同新编号,所以这里需要考虑方法线程安全
,
最简单的方式是使用同步方法
,保证只有一个线程获取锁,但是这不是最优的方式,这里不做考虑
public synchronized AjaxResult checkInQueue( @RequestHeader("UID") String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn){
....................
使用同步方法
的方式解决了上面的问题,但是如果当前项目是在 k8s
集群上面部署,以分布式的方式,就需要考虑多个 Pod 的数据同步问题。
假设两个排队请求被负载到两个不同的 Pod,这个时候同时查询数据会获取相同的最大编号,生成相同的编号,考虑使用分布式锁
。下面为对方法的改进,这里如果使用分布式锁
的方式,那么上面的同步方法
即可以去掉了,因为获取锁的方法是原子操作。
分布式锁实现很简单,就是进来一个线程先占位,当别的线城进来操作时,发现已经有人占位了,就会放弃或者稍后再试。这里的占位状态是全局的,相对整个集群而言,代码如下
String token = UUID.randomUUID().toString();
// 添加分布式锁
if (redisCache.tryAcquireLock("checkInQueue", token, 2, 10)){
AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender());
Long queuePosition = 0L;
if (Objects.nonNull(amsStudentQueueCheckIns)){
queuePosition = amsStudentQueueCheckIns.getQueuePosition();
}
amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(queuePosition + 1L).setStudentUid(uid);
int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn);
// 释放分布式锁
redisCache.unlock("checkInQueue", token);
if (i != 1){
return AjaxResult.error("排队预约失败!请重新填写");
}
}else {
return AjaxResult.error("系统繁忙,请稍后提交!");
}
tryAcquireLock
和 tryLock
以及 unlock
的方法实现
public class RedisCache
{
private static final Logger log = LoggerFactory.getLogger(RedisCache.class);
private static final String REDIS_UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
@Autowired
public RedisTemplate redisTemplate;
/**
* 获取分布式锁
*
* @param key
* @param token
* @param expireInSeconds 锁超时时间
* @return
*/
public boolean tryLock(String key, String token, long expireInSeconds) {
Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, expireInSeconds, TimeUnit.SECONDS);
log.info("获取分布式锁:"+ key + ":" + token);
return Objects.equals(res, true);
}
/**
* 分布式锁 unlock,使用lua脚本保证事务
*
* @param key
* @param token lock时的token值,只有token一致才能解锁
* @return
*/
public void unlock(String key, String token) {
try {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REDIS_UNLOCK_SCRIPT, Long.class);
Long res = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), token);
log.info("释放分布式锁:"+ key + ":" + token);
if (!Objects.equals(res, 1L)) {
log.warn("redis unlock wrong:key=[{}],token=[{}],res=[{}]", key, token, res);
}
} catch (Exception e) {
log.error("redis unlock error:key=[{}],token=[{}]", key, token, e);
}
}
/**
* @param key
* @param token
* @param lockTimeout 锁的超时时间
* @param acquireTimeout 获取锁的截止时间
* @return
*/
public boolean tryAcquireLock(String key, String token, long lockTimeout, long acquireTimeout) {
try {
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, lockTimeout, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(res)) {
log.info("获取分布式锁:"+ key + ":" + token);
return true;
}
try {
Thread.sleep(100);
} catch (Exception e) {
log.error("thread sleep error", e);
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
log.error("try acquire lock error, ", e);
}
return false;
}
}
tryAcquireLock
和 tryLock
都用于获取分布式锁,unlock
用于释放分布式锁,逻辑简单,这里不做说明,关注以下几点:
-
tryAcquireLock
和tryLock
的区别在于,前者在没有获取到锁之后会在限定的时间进行重复尝试获取,后者只尝试获取一次。 -
防止业务代码在执行的时候抛出异常,每一个锁添加了一个 超时时间
,超时之后,锁会被自动释放,考虑获取锁和设置过期时间之间如果服务器突然挂掉了,这个时候锁被占用,无法及时得到释放,也会造成死锁所以,所以要保证这个操作是原子的
,所以使用 Redis 提供的原子操作setIfAbsent(检查指定的键是否存在,如果不存在则设置键值对)
-
如果当前线程执行业务较耗时,超时时间会自动释放锁,其他线程会获取锁,当前线程执行完释放锁或释放到其他线程的锁,会出现混乱,所以需要锁相对线程唯一,自己的锁只能自己释放,使用 key+token
的机制 -
使用 key+token
的机制,每次释放锁都要判断 value, 一致才释放,但是这样的话,要去查看锁的 value,比较 value 的值是否正确,释放锁, 多个操作不保证原子性,所以unlock
需要引入lua脚本
,Lua 脚本可以在 Redis 服务端原子的执行多个 Redis 命令
上面的实现是最简单的 redis
实现分布式锁,如果要进一步增强分布式锁的可靠性和性能,可以考虑使用更复杂的方案,如 RedLock 算法
(redis 集群)、基于 Redis 的 Pub/Sub
机制等。这些方案可以提供更强的分布式锁功能,并解决一些特殊情况下的竞态条件和故障恢复问题。
博文部分内容参考
© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,如果你认可它,不要吝啬星星哦 :)
https://liruilong.blog.csdn.net/article/details/107076223
http://www.gxitsky.com/2022/02/12/SpringBoot-60-tomcat-nio/
© 2018-2024 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)
- 点赞
- 收藏
- 关注作者
评论(0)