redis 极简分布式锁实现

举报
山河已无恙 发表于 2024/02/04 23:20:35 2024/02/04
【摘要】 写在前面工作中遇到,整理 reids 做简单分布式锁的思考博文适合刚接触 redis 的小伙伴理解不足小伙伴帮忙指正 对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》假设现在有这样一个需求,需要做排队预约住宿的功能,当前宿舍住满了,有新...

写在前面


  • 工作中遇到,整理 reids 做简单分布式锁的思考
  • 博文适合刚接触 redis 的小伙伴
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


假设现在有这样一个需求,需要做排队预约住宿的功能,当前宿舍住满了,有新的同学需要来入住,可以进行排队预约,排队编号通过累加的方式生成

我们设计这样一张数据表

CREATE TABLE `ams_student_queue_check_in_sync` (
 `queue_check_in_id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '学生队列ID',
 `student_name` VARCHAR(50) NOT NULL COMMENT '学生姓名' COLLATE 'utf8mb4_general_ci',
 `student_uid` VARCHAR(50) NULL DEFAULT NULL COMMENT '学生uid' COLLATE 'utf8mb4_general_ci',
 `student_card` VARCHAR(30) NULL DEFAULT NULL COMMENT '学生身份证号' COLLATE 'utf8mb4_general_ci',
 `student_contact_number` VARCHAR(20) NOT NULL COMMENT '学生联系电话' COLLATE 'utf8mb4_general_ci',
 `student_email` VARCHAR(50) NULL DEFAULT NULL COMMENT '学生电子邮件地址' COLLATE 'utf8mb4_general_ci',
 `student_gender` TINYINT(4) NOT NULL DEFAULT '0' COMMENT '学生性别',
 `student_emergency_contact_name` VARCHAR(100) NULL DEFAULT NULL COMMENT '第二联系人姓名' COLLATE 'utf8mb4_general_ci',
 `student_emergency_contact_number` VARCHAR(20) NULL DEFAULT NULL COMMENT '第二联系人电话' COLLATE 'utf8mb4_general_ci',
 `student_status` TINYINT(4) NULL DEFAULT '1' COMMENT '学生排队状态(1.待入住,2.以入住 3.以取消)',
 `arrival_dates` DATETIME NULL DEFAULT NULL COMMENT '预计入住时间',
 `departure_dates` DATETIME NULL DEFAULT NULL COMMENT '预计离开日期',
 `queue_position` INT(11) NULL DEFAULT NULL COMMENT '学生在排队中的位置',
 `check_in_remark` TEXT NULL DEFAULT NULL COMMENT '备注' COLLATE 'utf8mb4_general_ci',
 `extended1` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段1' COLLATE 'utf8mb4_general_ci',
 `extended2` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段2' COLLATE 'utf8mb4_general_ci',
 `extended3` VARCHAR(50) NULL DEFAULT NULL COMMENT '扩展字段3' COLLATE 'utf8mb4_general_ci',
 `created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 `updated_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
 PRIMARY KEY (`queue_check_in_id`) USING BTREE,
 INDEX `student_uid` (`student_uid`) USING BTREE
)
COMMENT='入住排队表'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=1363
;

queue_position 为每一位同学的排队编号,需要根据当前的学生编号最大来累加

下面为实现的基础代码

    @ApiOperation("入住排队接口")
    @PostMapping("/checkInQueue")
    @Transactional
    public   AjaxResult checkInQueue( @RequestHeader("UID") String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn){

        if (Objects.isNull(uid)){
            return AjaxResult.error("Uid 为空");
        }
        if (Objects.isNull(amsStudentQueueCheckIn.getStudentEmergencyContactNumber())){
            return AjaxResult.error("电话号为空");
        }
        StringBuilder stringBuilder = new StringBuilder();
        String studentContactNumber = amsStudentQueueCheckIn.getStudentContactNumber();
        List<AmsStudentQueueCheckIn> amsStudentQueueCheckIns1 = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInList(new AmsStudentQueueCheckIn().setStudentContactNumber(studentContactNumber));

        Integer count = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListCount(amsStudentQueueCheckIn.getStudentGender());

        if (Objects.nonNull(amsStudentQueueCheckIns1) && amsStudentQueueCheckIns1.size() !=0 ){
            stringBuilder.append("已经排队预约啦,请耐心等待 ^_^")
                    .append(", 预约编号为 " ).append(amsStudentQueueCheckIns1.get(0).getQueuePosition())
                    .append(", 前面还有 ").append(count - 1).append( " 人");
            return  AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition",amsStudentQueueCheckIns1.get(0).getQueuePosition(),"beforePeopleBumber",count -1 ));
        }

        AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender());
        Long queuePosition = 0L;
        if (Objects.nonNull(amsStudentQueueCheckIns)){
            queuePosition = amsStudentQueueCheckIns.getQueuePosition()
        }
        amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(queuePosition + 1L).setStudentUid(uid);
        amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(amsStudentQueueCheckIns.getQueuePosition() + 1L).setStudentUid(uid);
        int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn);
        if (i != 1){
            return AjaxResult.error("排队预约失败!");
        }
        stringBuilder.append("排队预约成功")
                .append(", 预约编号为 " ).append(amsStudentQueueCheckIn.getQueuePosition())
                .append(", 前面还有 ").append(count).append( " 人");
        return  AjaxResult.success(stringBuilder.toString(),ImmutableMap.of("queuePosition",amsStudentQueueCheckIn.getQueuePosition(),"beforePeopleBumber",count));
    }

逻辑比较简单,拿到数据,获取编号最大值累加,数据落表,但是上面的代码存在一个问题,因为是 Springboot 项目,使用 tomcat 部署,Spring Boot 嵌入的 Tomcat 默认启用 Http11NioProtocol,可以切换日志级别为 Debug 可看到

Http11NioProtocol 表示多线程非阻塞模式的HTTP协议的通信(web 服务端网络IO处理模型包括:单(多)线程阻塞(非阻塞)IO模型)。

# 日志级别 Debug
# 日志配置
logging:
  level:
    root: debug
11:42:51.810 [restartedMain] INFO  o.a.c.h.Http11NioProtocol - [log,173] - Initializing ProtocolHandler ["http-nio-8080"]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [Connector[HTTP/1.1-8080]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [INITIALIZED]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [STARTING_PREP]
11:42:51.811 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardServer[-1]] to [STARTING]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING_PREP]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTING]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [org.apache.catalina.deploy.NamingResourcesImpl@1dc49001] to [STARTED]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [STARTING_PREP]
11:42:51.812 [restartedMain] INFO  o.a.c.c.StandardService - [log,173] - Starting service [Tomcat]
11:42:51.812 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardService[Tomcat]] to [STARTING]
11:42:51.813 [restartedMain] DEBUG o.a.c.u.LifecycleBase - [log,173] - Setting state for [StandardEngine[Tomcat]] to [STARTING_PREP]
11:42:51.813 [restartedMain] INFO  o.a.c.c.StandardEngine - [log,173] - Starting Servlet engine: [Apache Tomcat/9.0.75]

可以看到 spring-boot-starter-web 嵌入的 9.0.75 版本的 tomcat ,Tomcat 从 8.5 版本开始移除了 BIO,默认启用 NIO

下图为从套接字连接接收、处理请求、响应客户端的整个过程

《Tomcat内核设计剖析》《Tomcat内核设计剖析》

所以当多个排队请求并发调用接口时,不同的线程会分别进入方法,这个时候有可能会从数据库获取相同的排队编号进行累加,同时生成相同新编号,所以这里需要考虑方法线程安全,

最简单的方式是使用同步方法,保证只有一个线程获取锁,但是这不是最优的方式,这里不做考虑

 public  synchronized  AjaxResult checkInQueue( @RequestHeader("UID") String uid, @RequestBody AmsStudentQueueCheckIn amsStudentQueueCheckIn){
 ....................

使用同步方法的方式解决了上面的问题,但是如果当前项目是在 k8s 集群上面部署,以分布式的方式,就需要考虑多个 Pod 的数据同步问题。

假设两个排队请求被负载到两个不同的 Pod,这个时候同时查询数据会获取相同的最大编号,生成相同的编号,考虑使用分布式锁。下面为对方法的改进,这里如果使用分布式锁的方式,那么上面的同步方法即可以去掉了,因为获取锁的方法是原子操作。

分布式锁实现很简单,就是进来一个线程先占位,当别的线城进来操作时,发现已经有人占位了,就会放弃或者稍后再试。这里的占位状态是全局的,相对整个集群而言,代码如下

        String token = UUID.randomUUID().toString();
        // 添加分布式锁
        if (redisCache.tryAcquireLock("checkInQueue", token, 2, 10)){
            AmsStudentQueueCheckIn amsStudentQueueCheckIns = amsStudentQueueCheckInService.selectAmsStudentQueueCheckInListMax(amsStudentQueueCheckIn.getStudentGender());
            Long queuePosition = 0L;
            if (Objects.nonNull(amsStudentQueueCheckIns)){
                queuePosition = amsStudentQueueCheckIns.getQueuePosition();

            }
            amsStudentQueueCheckIn.setStudentStatus(1).setQueuePosition(queuePosition + 1L).setStudentUid(uid);
            int i = amsStudentQueueCheckInService.insertAmsStudentQueueCheckIn(amsStudentQueueCheckIn);
            // 释放分布式锁
            redisCache.unlock("checkInQueue", token);
            if (i != 1){
                return AjaxResult.error("排队预约失败!请重新填写");
            }

        }else {
            return AjaxResult.error("系统繁忙,请稍后提交!");
        }

tryAcquireLocktryLock 以及 unlock 的方法实现

public class RedisCache
{
    private static final Logger log = LoggerFactory.getLogger(RedisCache.class);
    private static final String REDIS_UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    @Autowired
    public RedisTemplate redisTemplate;
    /**
     * 获取分布式锁
     *
     * @param key
     * @param token
     * @param expireInSeconds 锁超时时间
     * @return
     */
    public boolean tryLock(String key, String token, long expireInSeconds) {
        Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, expireInSeconds, TimeUnit.SECONDS);
        log.info("获取分布式锁:"+ key + ":" + token);
        return Objects.equals(res, true);
    }
    /**
     * 分布式锁 unlock,使用lua脚本保证事务
     *
     * @param key
     * @param token lock时的token值,只有token一致才能解锁
     * @return
     */
    public void unlock(String key, String token) {
        try {
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REDIS_UNLOCK_SCRIPT, Long.class);
            Long res = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), token);
            log.info("释放分布式锁:"+ key + ":" + token);
            if (!Objects.equals(res, 1L)) {
                log.warn("redis unlock wrong:key=[{}],token=[{}],res=[{}]", key, token, res);
            }
        } catch (Exception e) {
            log.error("redis unlock error:key=[{}],token=[{}]", key, token, e);
        }
    }
    /**
     * @param key
     * @param token
     * @param lockTimeout  锁的超时时间
     * @param acquireTimeout  获取锁的截止时间
     * @return
     */
    public boolean tryAcquireLock(String key, String token, long lockTimeout, long acquireTimeout) {
        try {
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                Boolean res = redisTemplate.opsForValue().setIfAbsent(key, token, lockTimeout, TimeUnit.MILLISECONDS);

                if (Boolean.TRUE.equals(res)) {
                    log.info("获取分布式锁:"+ key + ":" + token);
                    return true;
                }
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    log.error("thread sleep error", e);
                    Thread.currentThread().interrupt();
                }
            }
        } catch (Exception e) {
            log.error("try acquire lock error, ", e);
        }
        return false;
    }
}    

tryAcquireLocktryLock 都用于获取分布式锁,unlock 用于释放分布式锁,逻辑简单,这里不做说明,关注以下几点:

  • tryAcquireLocktryLock 的区别在于,前者在没有获取到锁之后会在限定的时间进行重复尝试获取,后者只尝试获取一次。
  • 防止业务代码在执行的时候抛出异常,每一个锁添加了一个超时时间,超时之后,锁会被自动释放,考虑获取锁和设置过期时间之间如果服务器突然挂掉了,这个时候锁被占用,无法及时得到释放,也会造成死锁所以,所以要保证这个操作是原子的,所以使用 Redis 提供的原子操作 setIfAbsent(检查指定的键是否存在,如果不存在则设置键值对)
  • 如果当前线程执行业务较耗时,超时时间会自动释放锁,其他线程会获取锁,当前线程执行完释放锁或释放到其他线程的锁,会出现混乱,所以需要锁相对线程唯一,自己的锁只能自己释放,使用 key+token 的机制
  • 使用 key+token 的机制,每次释放锁都要判断 value, 一致才释放,但是这样的话,要去查看锁的 value,比较 value 的值是否正确,释放锁, 多个操作不保证原子性,所以unlock 需要引入 lua脚本,Lua 脚本可以在 Redis 服务端原子的执行多个 Redis 命令

上面的实现是最简单的 redis 实现分布式锁,如果要进一步增强分布式锁的可靠性和性能,可以考虑使用更复杂的方案,如 RedLock 算法(redis 集群)、基于 Redis 的 Pub/Sub 机制等。这些方案可以提供更强的分布式锁功能,并解决一些特殊情况下的竞态条件和故障恢复问题。

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,如果你认可它,不要吝啬星星哦 :)


https://liruilong.blog.csdn.net/article/details/107076223

http://www.gxitsky.com/2022/02/12/SpringBoot-60-tomcat-nio/


© 2018-2024 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。