RocketMQ(九):延迟消息是如何实现的?

举报
菜菜的后端私房菜 发表于 2024/11/11 09:45:43 2024/11/11
【摘要】 图文并茂、深入浅出揭秘RocketMQ高吞吐量并发消费原理

RocketMQ(九):延迟消息是如何实现的?

上个阶段已经从再平衡机制、拉取消息、并发/顺序消费消息来完全描述过消费者相关的消费原理

其中在并发消费的文章中曾说过:并发消费失败时会采用重试机制,将重试的消息作为延迟消息投入延迟队列,经历延迟时间后再重新放回重试队列,等待后续被消费者拉取然后再进行重试消费

其中,延迟消息(Delayed Message)不仅仅只用于重试,还是一个非常实用的功能,它允许消息在指定的时间后才被消费,这对于定时任务、订单超时提醒、促销活动等场景尤为重要

当时并没有详细说明延时队列的原理,本篇文章通过图文并茂、通俗易懂的说明延迟消息是如何实现的

阅读本篇文章之前需要了解消息发送持久化相关的流程

本文导图如下:

导图

往期回顾:

RocketMQ(八):轻量级拉取消费原理

RocketMQ(七):消费者如何保证顺序消费?

RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)

RocketMQ(五):揭秘高吞吐量并发消费原理

RocketMQ(四):消费前如何拉取消息?(长轮询机制)

RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

使用延迟消息

使用延迟消息非常简单,只需要调用 setDelayTimeLevel 方法设置延迟的级别

Message message = new Message(topic, tag, body);
message.setDelayTimeLevel(delayLevel);

一共分为18个延迟级别可以设置:

投递等级(delay level) 延迟时间 投递等级(delay level) 延迟时间
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h

设置完延时级别后,其他使用方式与普通消息相同,延时消息的机制是在Broker自动实现的,等待对应的延时时间后,消息就会被重新进行消费

延迟消息原理

接下来让我们分析下,延时消息是如何实现的

消息投入延时队列

setDelayTimeLevel 方法会在消息的properties中对延时级别进行存储

public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

与普通消息发送消息流程相同,这里就不过多叙述,主要对延迟消息的处理在Broker对消息进行持久化时:

在持久化消息的流程中,需要对CommitLog进行追加消息数据 this.commitLog.asyncPutMessage(msg)

CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

在此方法中,CommitLog追加数据前会判断消息是否为延迟消息,如果是则会将消息原有的topic、队列信息替换为延迟相关的,并将原数据存储到properties

if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }

    //topic改为延迟topic
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    //队列改为延迟队列
    int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // Backup real topic, queueId
    //将原始topic、队列数据备份到properties
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    //更新为延迟相关的topic、队列
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

延迟的topic固定为SCHEDULE_TOPIC_XXXXScheduleMessageService.delayLevel2QueueId方法中则是对延迟级别进行减一,说明18个级别对应的队列ID为0-17

public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}

至此消息就会被持久化到对应的延迟队列中,后续延时队列中消息都要经过一定的延时时间才会被组件ScheduleMessageService重新取出投入到原数据的队列中(延时时间与级别对应)

ScheduleMessageService初始化

组件ScheduleMessageService用于处理延时消息,初始化的步骤:

  1. 加载数据:加载每个延时队列对应的偏移量、解析延时级别对应延时时间、矫正延时偏移量
  2. 初始化执行定时任务的线程池
  3. 遍历延时队列并创建传递延时消息的任务放入线程池执行(每个队列对应一个任务)
  4. 启动定时持久化的任务

public void start() {
    if (started.compareAndSet(false, true)) {
        //加载数据
        this.load();
        
        //初始化线程池
        this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
        if (this.enableAsyncDeliver) {
            this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
        }
        
        //遍历延时队列(延时级别以及对应的延时时间),并创建任务放入线程池进行执行
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            //延时级别
            Integer level = entry.getKey();
            //延时时间 ms
            Long timeDelay = entry.getValue();
            //延时偏移量
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            //创建任务放入线程池执行
            if (timeDelay != null) {
                //异步的情况
                if (this.enableAsyncDeliver) {
                    this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
                //同步
                this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
            }
        }

        //开启持久化定时任务
        this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (started.get()) {
                        ScheduleMessageService.this.persist();
                    }
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
    }
}

在加载数据的方法中,主要是从文件中加载各个队列的偏移量、解析各个延时级别对应的延时时间以及矫正偏移量

(这里的偏移量其实就是延时队列的ConsumerQueue记录的偏移量,通过consumerQueue记录能够找到消息)

ScheduleMessageService.load

public boolean load() {
    //加载偏移量
    boolean result = super.load();
    //解析延时级别
    result = result && this.parseDelayLevel();
    //矫正偏移量
    result = result && this.correctDelayOffset();
    return result;
}

调用super.load()时,会从broker配置的存储目录下config/delayOffset.json文件,读取每个队列偏移量的JSON数据,最终将数据填充到offsetTable

(this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable()))

文件内容如下:

{
	"offsetTable":{1:1,2:2,3:12,4:12,5:14,6:15,7:149,8:1251,9:397,10:112,11:60,12:59,13:58,14:58,15:64,16:60,17:59,18:1129
	}
}

ScheduleMessageService.parseDelayLevel

解析延时级别主要就是将各个延时级别对应的时间解析为毫秒,并将结果存储到delayLevelTable

public boolean parseDelayLevel() {
    //时间单位转换
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    //默认写死的延时级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            //转换为ms后 写入 delayLevelTable
            this.delayLevelTable.put(level, delayTimeMillis);
            if (this.enableAsyncDeliver) {
                this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
            }
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }

    return true;
}

correctDelayOffset矫正偏移量则是判断偏移量是否超过逻辑队列ConsumerQueue最小/大值,超出则设置为最小/大值

后续较为重要的流程就是遍历所有延时队列,创建DeliverDelayedMessageTimerTask任务交给线程池处理(默认同步情况下)

定时转发延时消息

DeliverDelayedMessageTimerTask任务用于定时转发延时消息,会判断当前队列的消息是否过期,如果过期会将消息重投到原topic、队列中,否则延时重试

执行过程中,会调用executeOnTimeup,流程如下:

  1. 获取延时队列对应的消费队列ConsumerQueue(根据topic、队列id)
  2. 根据偏移量获取ConsumerQueue对应位置的映射缓冲(方便后续读consumer queue记录)
  3. 遍历解析consumer queue记录,判断消息是否超时,超时则找到CommitLog上的消息,并恢复消息原始topic、队列,最后进行转发到对应队列,期间失败或其他情况延时重试

(需要理解ConsumerQueue文件

public void executeOnTimeup() {
    //找到延时队列对应的(逻辑)消费队列 ConsumerQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));

    if (cq == null) {
        //没找到消费队列就延时处理
        this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
        return;
    }

    //根据偏移量获取对应的映射缓冲 方便后续读
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ == null) {
        //没获取到 可能是偏移量有问题 纠正 延时执行
        long resetOffset;
        if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
            log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                this.offset, resetOffset, cq.getQueueId());
        } else {
            resetOffset = this.offset;
        }

        this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
        return;
    }

    //获取到映射缓存的情况下
    long nextOffset = this.offset;
    try {
        int i = 0;
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        //遍历consumer queue记录 
        for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
            //读取数据 消息偏移量、大小、tag哈希值(延时消息该字段被利用延时后的时间)
            long offsetPy = bufferCQ.getByteBuffer().getLong();
            int sizePy = bufferCQ.getByteBuffer().getInt();
            long tagsCode = bufferCQ.getByteBuffer().getLong();

            //...
			
            long now = System.currentTimeMillis();
            //超过延迟到达时间就是延迟到达时间,否则就是now 会被延时处理
            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
            //下个偏移量
            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
			
            //时间未到的情况下延时处理
            long countdown = deliverTimestamp - now;
            if (countdown > 0) {
                this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                return;
            }

            //根据consumerqueue记录解析的偏移量和大小找到commit log上的消息
            MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
            if (msgExt == null) {
                continue;
            }

            //将延时消息延时相关属性清理,使用原来的topic、队列数据
            MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
            if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                    msgInner.getTopic(), msgInner);
                continue;
            }

            //转发消息 默认同步
            boolean deliverSuc;
            if (ScheduleMessageService.this.enableAsyncDeliver) {
                deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
            } else {
                deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
            }

            //失败延迟重试
            if (!deliverSuc) {
                this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                return;
            }
        }

        
        nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    } catch (Exception e) {
        log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
    } finally {
        bufferCQ.release();
    }

    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

由于每条延时队列上需要延时的时间相同,并且消息入队的顺序为FIFO,因此判断超时只需要依次取出判断即可

无论是同步转发还是异步转发,最终会调用deliverMessage重新持久化消息ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner)

至此延时消息的原理描述完毕,流程图如下:

总结

使用延时消息只需要通过API设置延时级别,不同的延时级别对应不同的延时时间

broker将消息追加到commit log前会判断消息是否有设置延时级别,如果设置则说明为延时消息,会将原始topic、队列ID等数据存储在properties,并将消息topic、队列ID改为延时队列相关属性,最终消息会被持久化到延时队列

每个延时队列都有定时任务隔100ms进行检测,如果消息超时则通过consumer queue记录找到commitlog中的消息,并将其原始topic、队列ID等信息恢复,再调用持久化消息的API,相当于将消息重投到最开始设置的队列中

由于每个延时队列延时的时间相同,消息入队后,检测超时可以顺序检测,离超时时间越近的消息越前,如果有大量消息同时定时相同时间,处理流程可能会导致堆积从而影响定时精度

最后(点赞、收藏、关注求求啦~)

我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。