RocketMQ(八):轻量级拉取消费原理

举报
菜菜的后端私房菜 发表于 2024/11/14 10:38:18 2024/11/14
【摘要】 图文并茂、通俗易懂描述rocketmq轻量级拉取消费者原理

RocketMQ(八):轻量级拉取消费原理

前几篇文章,我们从DefaultMQPushConsumer从再平衡给消费者分配队列开始、再到消费者拉取消息、最后通过并发/顺序的方式消费消息,已经完全描述它的实现原理

虽然它取名为"Push",但内部实现获取消息依旧是使用拉取的方式,只是增加了长轮询机制

这样取名只是为了想表达它的消息会被“推送”到消息监听器上,而我们只需要实现自己的消息监听器来处理消息

这篇文章我们使用“逆推”的思维,来看看消费者的另一个实现DefaultLitePullConsumer是如何实现轻量级拉取消费的

本文思维导图如下:

思维导图

往期回顾:

RocketMQ(七):消费者如何保证顺序消费?

RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)

RocketMQ(五):揭秘高吞吐量并发消费原理

RocketMQ(四):消费前如何拉取消息?(长轮询机制)

RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

使用DefaultLitePullConsumer

DefaultLitePullConsumer使用起来与DefaultMQPushConsumer略有不同

它不需要再配置消息监听器(因为需要手动去拉取),启动后通过调用它的poll方法来手动拉取消息进行处理

			DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();

            //根据配置文件set...
			consumer.setConsumerGroup(groupName);
            consumer.subscribe(topic, tag);
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.start();

            executor.execute(() -> {
                while (true) {
                    //拉取消息
                    List<MessageExt> poll = consumer.poll();
                    log.info("{}拉取消息:{}", groupName, poll);
                }
            });

实现原理

熟悉过DefaultMQPushConsumer的我们肯定对消费的整体流程不会陌生,无非就是需要做到以下三点:

  1. 再分配机制如何给同组消费者负载均衡分配队列?
  2. 如何拉取消息?
  3. 如何消费消息?

授人以鱼不如授人以渔,这次我们直接以poll为入口,“逆推”其实现的原理

(面对没有文档、没有自顶向下的架构、不熟悉的源码都可以使用这种方式进行“推理”,找一个熟悉的业务实现点,往前寻找)

poll 获取消息

poll无参方法默认会携带5S的超时时间来进行调用,因此我们可以猜测如果没有消息到达就是每5s拉取一次消息

每个方法依次查看会发现它会进行检查、自动提交、从内存中获取消息请求ConsumeRequest,最后再获取本次消费的消息以及维护数据(更新偏移量、重置topic)

poll获取消息

public synchronized List<MessageExt> poll(long timeout) {
    try {
        //检查服务状态
        checkServiceState();
        //校验参数
        if (timeout < 0) {
            throw new IllegalArgumentException("Timeout must not be negative");
        }

        //自动提交 
        if (defaultLitePullConsumer.isAutoCommit()) {
            maybeAutoCommit();
        }
        long endTime = System.currentTimeMillis() + timeout;

        //获取消费请求
        ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

        //未超时
        if (endTime - System.currentTimeMillis() > 0) {
            //如果获取消费请求但processQueue已弃用要重新获取 直到超时
            while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                if (endTime - System.currentTimeMillis() <= 0) {
                    break;
                }
            }
        }

        //获取到消费请求并且队列对应的processQueue未弃用 获取消息并维护其他状态
        if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
			//获取拉取到的消息
            List<MessageExt> messages = consumeRequest.getMessageExts();
            //内存队列中删除这批消息
            long offset = consumeRequest.getProcessQueue().removeMessage(messages);
            //更新偏移量
            assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
            //If namespace not null , reset Topic without namespace.
            //消息重置topic 可能之前是从其他特殊队列出来的?
            this.resetTopic(messages);
            return messages;
        }
    } catch (InterruptedException ignore) {

    }

    return Collections.emptyList();
}

DefaultLitePullConsumerImpl.commitAll

查看自动提交的方法可以发现:**在自动提交中,如果当前时间超过下次自动提交的时间(默认每隔5S)就会调用 commitAll **(从之前看过的源码,可以猜测这个Commit会去更新偏移量或持久化相关的操作)

public synchronized void commitAll() {
    try {
        //遍历队列
        for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
            //获取队列的消费偏移量
            long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
            if (consumerOffset != -1) {
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                if (processQueue != null && !processQueue.isDropped()) {
                    //更新消费偏移量
                    updateConsumeOffset(messageQueue, consumerOffset);
                }
            }
        }
        
        //如果是广播模式则全部持久化
        if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
            offsetStore.persistAll(assignedMessageQueue.messageQueues());
        }
    } catch (Exception e) {
        log.error("An error occurred when update consume offset Automatically.");
    }
}

虽然目前不太理解assignedMessageQueue队列是干嘛的,但从名字可以看出可能是再平衡给当前消费者负载均衡分配的队列

这个方法看上去像兜底的更新消费偏移量(广播下持久化),某些情况下会遗漏,则每次获取消息时检查超时5S就进行兜底更新

consumeRequestCache.poll 从名称上看就像是专门存储consumeRequestCache的缓存

之前的文章也说过ConsumeRequest封装的消费请求,其中包含本次消费的消息列表以及对应的队列MessageQueue和ProcessQueue

BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>()

而存储ConsumeRequest的ConsumeRequestCache是阻塞队列,那就是明显的生产者、消费者模式

接下来只需要看看什么场景下会将ConsumeRequest放入阻塞队列,即可“逆推”出拉取消息的流程

pull 拉取消息

ConsumeRequestCache.put的方法只有一处就是提交消费请求

private void submitConsumeRequest(ConsumeRequest consumeRequest) {
    try {
        consumeRequestCache.put(consumeRequest);
    } catch (InterruptedException e) {
        log.error("Submit consumeRequest error", e);
    }
}

而该方法在什么时机下会被调用,相信看过前几篇文章的同学们都很熟悉(即拉取到消息后),这个方法是在执行PullTaskImpl任务中找到消息后被调用的

PullTaskImpl

通过名字也可以猜测PullTaskImpl任务就是拉取消息的任务

通过方法可以看出:期间也会去做检查、流控,然后获取队列拉取偏移量进行拉取消息,拉到消息后将消息放入processQueue并封装消费请求进行提交,通知后续消息消费流程

拉取消息任务

public void run() {
    if (!this.isCancelled()) {
        this.currentThread = Thread.currentThread();

        //队列暂停 延时1S重试
        if (assignedMessageQueue.isPaused(messageQueue)) {
            scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
            return;
        }

        //流控检查 不满足延时重试
        //...
        
        long offset = 0L;
        try {
            //获取队列下次拉取的偏移量
            offset = nextPullOffset(messageQueue);
        } catch (Exception e) {
            //失败延时重试
            scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
            return;
        }
        
        long pullDelayTimeMills = 0;
        try {
            SubscriptionData subscriptionData;
            String topic = this.messageQueue.getTopic();
            //订阅数据
            if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
            } else {
                subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
            }

            //拉取消息
            PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
            if (this.isCancelled() || processQueue.isDropped()) {
                return;
            }
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    //找到消息的情况 队列加锁
                    final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                    synchronized (objLock) {
                        if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                            //拉取到的消息放入processQueue
                            processQueue.putMessage(pullResult.getMsgFoundList());
                            //封装消费请求并提交
                            submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                        }
                    }
                    break;
                case OFFSET_ILLEGAL:
                    break;
                default:
                    break;
            }
            //更新拉取偏移量
            updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
        } catch (InterruptedException interruptedException) {
        } catch (Throwable e) {
            pullDelayTimeMills = pullTimeDelayMillsWhenException;
        }
	
        
		if (!this.isCancelled()) {
            //未取消继续延时拉取
            scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
        } else {
        }
    }
}

在通过队列获取偏移量时,优先使用seek手动设置的偏移量作为消费偏移量,然后再考虑拉取的偏移量,如果内存中拉取偏移量未设置要向broker获取

private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
    long offset = -1;
    //优先使用手动设置的偏移量
    long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
    if (seekOffset != -1) {
        offset = seekOffset;
        //手动设置则将其更新为消费偏移量
        assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
        assignedMessageQueue.setSeekOffset(messageQueue, -1);
    } else {
        //然后考虑拉取偏移量
        offset = assignedMessageQueue.getPullOffset(messageQueue);
        if (offset == -1) {
            //拉取偏移量未设置则向broker获取
            offset = fetchConsumeOffset(messageQueue);
        }
    }
    return offset;
}

pull拉取消息的方法中,最终会采用同步的方式向Broker拉取数据(默认10S超时)pullMessageSync (DefaultMQPushConsumer拉取消息采用的是异步)

(如果Broker没有消息的话也是长轮询机制的流程,有消息到达会拉取完再返回,长轮询机制在拉取消息的文章中也说过这里就不过多叙述)

最终更新完偏移量,只要任务未被取消则会继续执行该任务 scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS)

再平衡触发拉取消息任务

拉取流程也是类似的,只是有的细节实现不同,那么再来看看何时会触发PullTaskImpl任务

PullTaskImpl任务被构造的方法有两处:

  1. seek手动更改偏移量时,构造PullTaskImpl任务后异步执行拉取消息
  2. 再平衡机制触发

集群模式下会根据Topic进行再平衡,如果更新processQueue,队列需要更改时会调用messageQueueChanged

(之前再平衡的文章分析过该流程,只是推和拉的具体实现不同,这里简单回顾下)

//根据分配的队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
    //更改过后还需要更新队列 维护拉取任务
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
}

最终调用updatePullTask更新拉取任务,将不需要负责的队列任务取消,新增需要负责的队列任务启动

private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
    Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
        if (next.getKey().getTopic().equals(topic)) {
            if (!mqNewSet.contains(next.getKey())) {
                //取消不再负责队列的拉取任务
                next.getValue().setCancelled(true);
                it.remove();
            }
        }
    }
    //新增并启动需要负责队列的拉取任务
    startPullTask(mqNewSet);
}

至此拉取消息的定时任务就会被再平衡机制给启动

再平衡更新拉取任务

虽然拉取消息的任务是同步拉取,但是是放在线程池中执行的,并不会阻塞其他队列的拉取

向Broker更新消费偏移量也是相同的,MQClientInstance启动时开启默认5S的定时任务进行同步消费偏移量MQClientInstance.this.persistAllConsumerOffset()

总结

DefaultLitePullConsumer运行流程与推送的消费者类似,只是部分方法内部实现不同

再平衡机制会将队列负载均衡到消费者,同时更新队列对应的拉取任务

拉取任务使用线程池执行,拉取前会检查状态以及流控失败就延迟重试,然后获取下次拉取消息的偏移量,接着同步向broker进行拉取消息

如果拉取到消息,会将消息存储在队列对应的processQueue,并封装消费请求提交到ConsumerQueueCache中

拉取与推送的一大区别是,拉取获取消息的逻辑需要自己来实现,更加自由易扩展,poll获取消息则是从ConsumerQueueCache中获取消费请求并拿到消息再进行处理

最后(点赞、收藏、关注求求啦~)

我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。