你真的会用Redis做消息队列吗

举报
芥末拌个饭吧 发表于 2022/11/11 18:00:41 2022/11/11
1.2k+ 0 0
【摘要】 说到消息队列,大家首先想到的是rabitmq, rocketmq,kafka等。这些消息队列在生产中都很成熟,但它们相对来说比较重量级。 而Redis大家通常用来当缓存使用,不过在某些场景下,redis也可作用消息队列,但我们需要了解对消息队列的要求,同时也得清楚redis作为消息队列的优缺点。下面让我们来看看如何用redis做消息队列吧。

前言

说到消息队列,大家首先想到的是rabitmq, rocketmq,kafka等。这些消息队列在生产中都很成熟,但它们相对来说比较重量级。

而Redis大家通常用来当缓存使用,不过在某些场景下,redis也可作用消息队列,但我们需要了解对消息队列的要求,同时也得清楚redis作为消息队列的优缺点。下面让我们来看看如何用redis做消息队列吧。

我们对消息队列的要求

作为一个专业的消息队列,除了保证本身生产消费的速度,还应该做到以下2点:

  • 消息不丢失

  • 消息可堆积

一个消息队列,可拆分为3部分:生产者、队列中间件、消费者。

在消息是否丢失上,从图中我们很好分析,会出现消息丢失的地方有:

  1. 生产者是否会丢失;

  2. 队列是否会丢失;

  3. 消费者是否会丢失。

对于消息可堆积来说,就是因某些原因,队列中积压了大量消息,是否会影响消息队列的正常运行。

带着这些要求,我们来看看Redis能否满足吧。

redis有哪些实现队列的方式

这里总结一下,主要有3种实现方式:

  • List队列(最简单)

  • Pub/Sub模型(发布/订阅)

  • Stream(更成熟的方式)

List队列

list是我们用来实现队列最简单的方式,生产者用LPUSH发布消息,消费者通过RPOP拉取消息。

因为list底层是通过链表实现的,因此时间复杂度为O(1),在数据写入和读取速度上都能达到要求。

消费者在拉取消息时,需要不停的从队列拉取,伪代码如下:

while True:
    data = redis_cli.rpop('queue')
    if not data:
        continue
    # 消费数据
    consume_func(data)
    ....

这种方式有个最大的问题,当队列为空的时候,消费者也得不停的去获取消息,会造成cpu资源的浪费。当然我们可以在队列为空的时候休眠一点时间,这样能解决cpu的浪费问题。但是在休眠过程中,如果有新消息产生,消息的消费就会延迟。

redis有想到这个问题,所以有阻塞的拉取消息的方式:BRPOP / BLPOP。这种方式可阻塞拉取消息,使用时可传入【超时时间】,若设置为0,表示不设置超时,直到有新消息才返回。设置大于0时,会在超时时间到后返回null。

while True:
    # 没有消息就阻塞,0是表示不设置超时时间
    data = redis.brpop('queue', 0)
    if not data:
        continue
    # 有数据就消费
    consume_func(data)

这种方式可解决cpu空转的浪费问题,也能解决消息处理延迟的问题。但是也得看到它的缺点:

  • 不支持重复消费:数据pop之后就删除了,不能被其他消费者再次消费;

  • 消息丢失的问题:消费者拉取消息后,如果还没被处理消费者就宕机了,消息就会丢失无法找回。

Pub/Sub模型(发布/订阅)

这是Redis为发布/订阅模式设计出来的,它解决了list不能重复消费的问题。

生产者通过publish命令发布消息, 消费者通过subscribe订阅生产者的数据。并且支持多个消费者订阅同一个生产者,这就能满足重复消费的问题了。

我们来总结一下这种模式的优缺点,优点就是支持多组生产者、消费者处理消息。缺点就是容易丢失数据。主要体现以下几个情况:

  1. 消费者下线

  2. redis宕机

  3. 大量消息的堆积

Pub/Sub的实现不基于任何数据类型,没有做数据存储。它为生产者和消费者建立一个数据的通道,这个过程都是实时转发。如果消费者异常了,等它恢复后,只能接收新消息,异常期间的消息就丢失了。

再看看消息堆积的问题,当消费者处理消息的速度跟不上生产者时,就容易导致数据堆积的情况。Pub/Sub在消息堆积时,可能会直接导致消息丢失和消费失败。

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。当数据超过了缓冲区的上限,redis就会丢失数据。

Stream队列

Stream是通过XADD和XREAD完成生产和消费动作。发布消息(【*】表示自动生成唯一的消息ID):

127.0.0.1:6379> XADD queue * city shenzhen
"1663923811780-0"
127.0.0.1:6379> XADD queue * city shanghai
"1663923819051-0"

消费者拉取数据

# 从头拉取10条信息, 0-0意思是从头开始取
127.0.0.1:6379> XREAD COUNT 10 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1663924206724-0"
         2) 1) "city"
            2) "shenzhen"
      2) 1) "1663924209183-0"
         2) 1) "city"
            2) "shanghai"

如果想继续拉取消息,传入上一条消息的ID。没有消息就返回nil。

127.0.0.1:6379> XREAD COUNT 10 STREAMS queue 1663924209183-0
(nil)

Stream解决队列问题的方式:

  1. 阻塞式拉取消息

在拉取消息时,增加BLOCK参数

# BLOCK 0 表示阻塞直到有消息才返回。
127.0.0.1:6379> XREAD COUNT 10 BLOCK 0 STREAMS queue 1663924209183-0
  1. 发布/订阅模式

  • XGROUP:创建消费者组

  • XREADGROUP:在指定消费组下,开启消费者拉取消息

示例: 生产者发布消息

127.0.0.1:6379> XADD queue * city shenzhen
"1663924206724-0"
127.0.0.1:6379> XADD queue * city shanghai
"1663924209183-0"

假设有2组消费者要处理这批数据,创建2个消费者组:

127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

  1. 消息异常的情况,保证数据不丢失,支持重新消费。当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。如果故障就不会发出XACK命令,队列还是会保留信息,等重新上线后,消费者就会重新消费这些信息。

  2. stream会将数据写入到redis日志Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

  3. 消息堆积的处理在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

# MAXLEN设置队列的最大长度
127.0.0.1:6379> XADD queue MAXLEN 50000 * city shenzhen
"1663925399841-0"

当消息积压超过MAXLEN时,数据还是有可能丢失。

小结

redis有3种方式实现消息队列,list是最简单常用的方式,但不支持重复消费;Pub/Sub模式是发布/订阅模型,支持重复消费,但是容易丢数据;而Stream是一种新的队列实现方式,它支持消费订阅,也有数据持久化的做法,但是解决消息积压问题时,还是会丢数据。

我们在选择消息队列时,要根据自身的需要去选择。如果允许少部分的数据丢失,redis是符合选型要求的。但如果对消息的可靠性要求很高,建议使用专业的消息队列如kafka。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。