Redis系列之延时队列简介

举报
yd_273762914 发表于 2020/12/02 22:22:48 2020/12/02
【摘要】 文章目录 一、业务场景1.1 实践场景1.2 实现方式 二、Redis延时队列2.1 Redis列表实现2.2 Redis集合实现 一、业务场景 所谓延时队列就是延时的消息队列,下面说一下一些业务场景比较好理解 1.1 实践场景 订单支付失败,每隔一段时间提醒用户用户并发量的情况,可以延时2分钟给用户发短信… 1.2 实现方式 这些情...

一、业务场景

所谓延时队列就是延时的消息队列,下面说一下一些业务场景比较好理解

1.1 实践场景

  • 订单支付失败,每隔一段时间提醒用户
  • 用户并发量的情况,可以延时2分钟给用户发短信

1.2 实现方式

这些情况都可以使用延时队列来做,实现延时队列比较场景的有使用消息队列MQ来实现,比如RocketMQ等等,也可以使用Redis来实现,本博客主要介绍一下Redis实现延时队列

二、Redis延时队列

2.1 Redis列表实现

Redis实现延时队列可以通过其数据结构列表(list)来实现,顺便复习一下Redis的列表,实现列表,Redis可以通过队列和栈来实现:

/* 队列:First in first out */

//加两个value
>rpush keynames key1 key2
2

//计算
>llen keynames
2

>lpop keynames
key1

>lpop keynames
key2

//rpush会自动过期的
>rpop keynames
NULL

/* 栈:First in last out */

//同样,加两个元素
>rpush keynames key1 key2
2

>rpop keynames
key2

>rpop keynames
key1


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

对于Redis的基本数据结构,可以参考我之前的博客:https://blog.csdn.net/u014427391/article/details/82860694

然后怎么实现延时?Thread睡眠或者线程join?这种方法是可以实现,不过假如用户一多?10个请求就要延时10N了,这种情况系统性能不好的话就会出现线程阻塞了的情况。

队列空了的情况?就会出现pop 的死循环,这种情况很可怕,很吃系统CPU,虽然可以通过线程睡眠方法来缓解,但不是最好的方法

这时候就要介绍一下Redis的blpop/brpop来替换lpop/rpop,blpop/brpop阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零

2.2 Redis集合实现

Redis的有序集合(zset)也可以用于实现延时队列,消息作为value,时间作为score,这里顺便复习一下Redis的有序集合

//9.0是score也就是权重
>zadd keyname 9.0 math
1

>zadd keyname 9.2 history
1

//顺序
>zrange keyname 0 -1
1) history
2) math

//逆序
>zrevrange keyname 0 -1
1) math
2) history

//相当于count()
>zcard keyname
2

获取指定key的score
>zscore keyname math
9



  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

然后多个线程的环境怎么保证任务不被多个线程抢了?这里可以使用Redis的zrem命令来实现

Redis Zrem 命令用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

当 key 存在但不是有序集类型时,返回一个错误。

注意: 在 Redis 2.4 版本以前, ZREM 每次只能删除一个元素。

下面给出来自《Redis 深度历险:核心原理与应用实践》小册的例子:例子就是用有序集合和zrem来实现的

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg;
  } // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType(); private Jedis jedis;
  private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey;
  } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
  } public void loop() { while (!Thread.interrupted()) { // 只取一条 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇会继续 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 抢到了 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } }
  } public void handleMsg(T msg) { System.out.println(msg);
  } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { }
  }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

不过在多线程环境,是很难做控制的,上面例子也有缺陷,下面引用小册的说法:

上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

文章来源: smilenicky.blog.csdn.net,作者:smileNicky,版权归原作者所有,如需转载,请联系作者。

原文链接:smilenicky.blog.csdn.net/article/details/87905450

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。