19RocketMQ顺序消息之消息消费

举报
周杰伦本人 发表于 2022/05/18 14:27:21 2022/05/18
【摘要】 19RocketMQ顺序消息之消息消费 消费工作线程ConsumeRequest 总结 19RocketMQ顺序消息之消息消费RocketMQ按照消费模式来说可以分为集群消费和广播消费,广播消费指的是将每一条消息会被ConsumerGroup的每个Consumer消费,类似于计算机中的广播地址,广播给这个网段中的计算机,而集群消费是这一条消息只会被ConsumerGroup中的一个消费者进...

19RocketMQ顺序消息之消息消费

RocketMQ按照消费模式来说可以分为集群消费和广播消费,广播消费指的是将每一条消息会被ConsumerGroup的每个Consumer消费,类似于计算机中的广播地址,广播给这个网段中的计算机,而集群消费是这一条消息只会被ConsumerGroup中的一个消费者进行消费

顺序消费的原理是同一消息队列只允许Consumer中的一个消费线程拉取消费。Consumer中有个消费线程池,多个线程同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁请求允许消费

ConsumeMessageOrderlyService就是对应的顺序消息的处理类,它使用定时线程池来消费消息,线程池中真正工作的线程是ConsumeRequest:

消费工作线程ConsumeRequest

class ConsumeRequest implements Runnable {
    private final ProcessQueue processQueue;
    private final MessageQueue messageQueue;

    public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
        this.processQueue = processQueue;
        this.messageQueue = messageQueue;
    }

    public ProcessQueue getProcessQueue() {
        return processQueue;
    }

    public MessageQueue getMessageQueue() {
        return messageQueue;
    }

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }

        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && !this.processQueue.isLocked()) {
                        log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && this.processQueue.isLockExpired()) {
                        log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    long interval = System.currentTimeMillis() - beginTime;
                    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                        break;
                    }

                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                    List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                    if (!msgs.isEmpty()) {
                        final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                        ConsumeOrderlyStatus status = null;

                        ConsumeMessageContext consumeMessageContext = null;
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext
                                .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                            consumeMessageContext.setMq(messageQueue);
                            consumeMessageContext.setMsgList(msgs);
                            consumeMessageContext.setSuccess(false);
                            // init the consume context type
                            consumeMessageContext.setProps(new HashMap<String, String>());
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                        }

                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        try {
                            this.processQueue.getLockConsume().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }

                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                            hasException = true;
                        } finally {
                            this.processQueue.getLockConsume().unlock();
                        }

                        if (null == status
                            || ConsumeOrderlyStatus.ROLLBACK == status
                            || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                        }

                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        if (null == status) {
                            if (hasException) {
                                returnType = ConsumeReturnType.EXCEPTION;
                            } else {
                                returnType = ConsumeReturnType.RETURNNULL;
                            }
                        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                            returnType = ConsumeReturnType.TIME_OUT;
                        } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            returnType = ConsumeReturnType.FAILED;
                        } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                            returnType = ConsumeReturnType.SUCCESS;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                        }

                        if (null == status) {
                            status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext
                                .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }

                        ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                            .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        continueConsume = false;
                    }
                }
            } else {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }

                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }

}

我们看一下它的run()方法做了什么:

  1. 判断消费模式是广播模式或者加锁了并且锁没有过期
  2. 这时候判断处理的队列的状态是否为dropped,如果是就打印消息队列不能被消费日志,直接return
  3. 当消息模式是集群模式时再次判断是否加锁,并且锁是否过期
  4. 如果可以消费了就讲消息放入线程池中
  5. 获取批量消息,遍历,判断是否有钩子函数,如果有执行钩子函数
  6. 然后加锁,调用messageListener.consumeMessage()方法消费消息,最后解锁
  7. 根据返回的状态设置返回类型,执行处理结果processConsumeResult()的方法

总结

这就是RocketMQ顺序消息之消息消费的内容,主要讲了消息队列中的工作线程ConsumeRequest的run()的分析,大体逻辑就是进行批量处理消息,有钩子函数先执行钩子函数,然后消费前进行加锁,接着消费消息返回消息处理状态,最后解锁,根据返回的消息状态返回消费结果信息,顺序消息如果抛出异常的话会导致消息的阻塞,而相对应的并发消费由多个消费线程从消息队列中进行消费消息,消费速度比顺序消息要快,当前消费线程产生阻塞也不影响其他消费线程的消费。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。