RocketMQ(九):延迟消息是如何实现的?
RocketMQ(九):延迟消息是如何实现的?
上个阶段已经从再平衡机制、拉取消息、并发/顺序消费消息来完全描述过消费者相关的消费原理
其中在并发消费的文章中曾说过:并发消费失败时会采用重试机制,将重试的消息作为延迟消息投入延迟队列,经历延迟时间后再重新放回重试队列,等待后续被消费者拉取然后再进行重试消费
其中,延迟消息(Delayed Message)不仅仅只用于重试,还是一个非常实用的功能,它允许消息在指定的时间后才被消费,这对于定时任务、订单超时提醒、促销活动等场景尤为重要
当时并没有详细说明延时队列的原理,本篇文章通过图文并茂、通俗易懂的说明延迟消息是如何实现的
本文导图如下:
往期回顾:
RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
使用延迟消息
使用延迟消息非常简单,只需要调用 setDelayTimeLevel
方法设置延迟的级别
Message message = new Message(topic, tag, body);
message.setDelayTimeLevel(delayLevel);
一共分为18个延迟级别可以设置:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
设置完延时级别后,其他使用方式与普通消息相同,延时消息的机制是在Broker自动实现的,等待对应的延时时间后,消息就会被重新进行消费
延迟消息原理
接下来让我们分析下,延时消息是如何实现的
消息投入延时队列
setDelayTimeLevel
方法会在消息的properties中对延时级别进行存储
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
与普通消息发送消息流程相同,这里就不过多叙述,主要对延迟消息的处理在Broker对消息进行持久化时:
在持久化消息的流程中,需要对CommitLog进行追加消息数据 this.commitLog.asyncPutMessage(msg)
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
在此方法中,CommitLog追加数据前会判断消息是否为延迟消息,如果是则会将消息原有的topic、队列信息替换为延迟相关的,并将原数据存储到properties
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//topic改为延迟topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//队列改为延迟队列
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//将原始topic、队列数据备份到properties
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//更新为延迟相关的topic、队列
msg.setTopic(topic);
msg.setQueueId(queueId);
}
延迟的topic固定为SCHEDULE_TOPIC_XXXX
,ScheduleMessageService.delayLevel2QueueId
方法中则是对延迟级别进行减一,说明18个级别对应的队列ID为0-17
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
至此消息就会被持久化到对应的延迟队列中,后续延时队列中消息都要经过一定的延时时间才会被组件ScheduleMessageService重新取出投入到原数据的队列中(延时时间与级别对应)
ScheduleMessageService初始化
组件ScheduleMessageService用于处理延时消息,初始化的步骤:
- 加载数据:加载每个延时队列对应的偏移量、解析延时级别对应延时时间、矫正延时偏移量
- 初始化执行定时任务的线程池
- 遍历延时队列并创建传递延时消息的任务放入线程池执行(每个队列对应一个任务)
- 启动定时持久化的任务
public void start() {
if (started.compareAndSet(false, true)) {
//加载数据
this.load();
//初始化线程池
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
//遍历延时队列(延时级别以及对应的延时时间),并创建任务放入线程池进行执行
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
//延时级别
Integer level = entry.getKey();
//延时时间 ms
Long timeDelay = entry.getValue();
//延时偏移量
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//创建任务放入线程池执行
if (timeDelay != null) {
//异步的情况
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
//同步
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//开启持久化定时任务
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
在加载数据的方法中,主要是从文件中加载各个队列的偏移量、解析各个延时级别对应的延时时间以及矫正偏移量
(这里的偏移量其实就是延时队列的ConsumerQueue记录的偏移量,通过consumerQueue记录能够找到消息)
ScheduleMessageService.load
public boolean load() {
//加载偏移量
boolean result = super.load();
//解析延时级别
result = result && this.parseDelayLevel();
//矫正偏移量
result = result && this.correctDelayOffset();
return result;
}
调用super.load()
时,会从broker配置的存储目录下config/delayOffset.json
文件,读取每个队列偏移量的JSON数据,最终将数据填充到offsetTable
(this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable())
)
文件内容如下:
{
"offsetTable":{1:1,2:2,3:12,4:12,5:14,6:15,7:149,8:1251,9:397,10:112,11:60,12:59,13:58,14:58,15:64,16:60,17:59,18:1129
}
}
ScheduleMessageService.parseDelayLevel
解析延时级别主要就是将各个延时级别对应的时间解析为毫秒,并将结果存储到delayLevelTable
public boolean parseDelayLevel() {
//时间单位转换
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
//默认写死的延时级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
//转换为ms后 写入 delayLevelTable
this.delayLevelTable.put(level, delayTimeMillis);
if (this.enableAsyncDeliver) {
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
}
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
correctDelayOffset
矫正偏移量则是判断偏移量是否超过逻辑队列ConsumerQueue最小/大值,超出则设置为最小/大值
后续较为重要的流程就是遍历所有延时队列,创建DeliverDelayedMessageTimerTask任务交给线程池处理(默认同步情况下)
定时转发延时消息
DeliverDelayedMessageTimerTask任务用于定时转发延时消息,会判断当前队列的消息是否过期,如果过期会将消息重投到原topic、队列中,否则延时重试
执行过程中,会调用executeOnTimeup
,流程如下:
- 获取延时队列对应的消费队列ConsumerQueue(根据topic、队列id)
- 根据偏移量获取ConsumerQueue对应位置的映射缓冲(方便后续读consumer queue记录)
- 遍历解析consumer queue记录,判断消息是否超时,超时则找到CommitLog上的消息,并恢复消息原始topic、队列,最后进行转发到对应队列,期间失败或其他情况延时重试
(需要理解ConsumerQueue文件)
public void executeOnTimeup() {
//找到延时队列对应的(逻辑)消费队列 ConsumerQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
if (cq == null) {
//没找到消费队列就延时处理
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
//根据偏移量获取对应的映射缓冲 方便后续读
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
//没获取到 可能是偏移量有问题 纠正 延时执行
long resetOffset;
if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else {
resetOffset = this.offset;
}
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
//获取到映射缓存的情况下
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//遍历consumer queue记录
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//读取数据 消息偏移量、大小、tag哈希值(延时消息该字段被利用延时后的时间)
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//...
long now = System.currentTimeMillis();
//超过延迟到达时间就是延迟到达时间,否则就是now 会被延时处理
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
//下个偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//时间未到的情况下延时处理
long countdown = deliverTimestamp - now;
if (countdown > 0) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
//根据consumerqueue记录解析的偏移量和大小找到commit log上的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
//将延时消息延时相关属性清理,使用原来的topic、队列数据
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//转发消息 默认同步
boolean deliverSuc;
if (ScheduleMessageService.this.enableAsyncDeliver) {
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
} else {
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
//失败延迟重试
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
由于每条延时队列上需要延时的时间相同,并且消息入队的顺序为FIFO,因此判断超时只需要依次取出判断即可
无论是同步转发还是异步转发,最终会调用deliverMessage
重新持久化消息ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner)
至此延时消息的原理描述完毕,流程图如下:
总结
使用延时消息只需要通过API设置延时级别,不同的延时级别对应不同的延时时间
broker将消息追加到commit log前会判断消息是否有设置延时级别,如果设置则说明为延时消息,会将原始topic、队列ID等数据存储在properties,并将消息topic、队列ID改为延时队列相关属性,最终消息会被持久化到延时队列
每个延时队列都有定时任务隔100ms进行检测,如果消息超时则通过consumer queue记录找到commitlog中的消息,并将其原始topic、队列ID等信息恢复,再调用持久化消息的API,相当于将消息重投到最开始设置的队列中
由于每个延时队列延时的时间相同,消息入队后,检测超时可以顺序检测,离超时时间越近的消息越前,如果有大量消息同时定时相同时间,处理流程可能会导致堆积从而影响定时精度
最后(点赞、收藏、关注求求啦~)
我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)