RocketMQ(八):轻量级拉取消费原理
RocketMQ(八):轻量级拉取消费原理
前几篇文章,我们从DefaultMQPushConsumer从再平衡给消费者分配队列开始、再到消费者拉取消息、最后通过并发/顺序的方式消费消息,已经完全描述它的实现原理
虽然它取名为"Push",但内部实现获取消息依旧是使用拉取的方式,只是增加了长轮询机制
这样取名只是为了想表达它的消息会被“推送”到消息监听器上,而我们只需要实现自己的消息监听器来处理消息
这篇文章我们使用“逆推”的思维,来看看消费者的另一个实现DefaultLitePullConsumer是如何实现轻量级拉取消费的
本文思维导图如下:
往期回顾:
RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
使用DefaultLitePullConsumer
DefaultLitePullConsumer使用起来与DefaultMQPushConsumer略有不同
它不需要再配置消息监听器(因为需要手动去拉取),启动后通过调用它的poll
方法来手动拉取消息进行处理
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
//根据配置文件set...
consumer.setConsumerGroup(groupName);
consumer.subscribe(topic, tag);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
executor.execute(() -> {
while (true) {
//拉取消息
List<MessageExt> poll = consumer.poll();
log.info("{}拉取消息:{}", groupName, poll);
}
});
实现原理
熟悉过DefaultMQPushConsumer的我们肯定对消费的整体流程不会陌生,无非就是需要做到以下三点:
- 再分配机制如何给同组消费者负载均衡分配队列?
- 如何拉取消息?
- 如何消费消息?
授人以鱼不如授人以渔,这次我们直接以poll
为入口,“逆推”其实现的原理
(面对没有文档、没有自顶向下的架构、不熟悉的源码都可以使用这种方式进行“推理”,找一个熟悉的业务实现点,往前寻找)
poll 获取消息
poll无参方法默认会携带5S的超时时间来进行调用,因此我们可以猜测如果没有消息到达就是每5s拉取一次消息
每个方法依次查看会发现它会进行检查、自动提交、从内存中获取消息请求ConsumeRequest,最后再获取本次消费的消息以及维护数据(更新偏移量、重置topic)
public synchronized List<MessageExt> poll(long timeout) {
try {
//检查服务状态
checkServiceState();
//校验参数
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}
//自动提交
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
//获取消费请求
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
//未超时
if (endTime - System.currentTimeMillis() > 0) {
//如果获取消费请求但processQueue已弃用要重新获取 直到超时
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) {
break;
}
}
}
//获取到消费请求并且队列对应的processQueue未弃用 获取消息并维护其他状态
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
//获取拉取到的消息
List<MessageExt> messages = consumeRequest.getMessageExts();
//内存队列中删除这批消息
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
//更新偏移量
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
//If namespace not null , reset Topic without namespace.
//消息重置topic 可能之前是从其他特殊队列出来的?
this.resetTopic(messages);
return messages;
}
} catch (InterruptedException ignore) {
}
return Collections.emptyList();
}
DefaultLitePullConsumerImpl.commitAll
查看自动提交的方法可以发现:**在自动提交中,如果当前时间超过下次自动提交的时间(默认每隔5S)就会调用 commitAll
**(从之前看过的源码,可以猜测这个Commit会去更新偏移量或持久化相关的操作)
public synchronized void commitAll() {
try {
//遍历队列
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
//获取队列的消费偏移量
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue != null && !processQueue.isDropped()) {
//更新消费偏移量
updateConsumeOffset(messageQueue, consumerOffset);
}
}
}
//如果是广播模式则全部持久化
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
}
} catch (Exception e) {
log.error("An error occurred when update consume offset Automatically.");
}
}
虽然目前不太理解assignedMessageQueue队列是干嘛的,但从名字可以看出可能是再平衡给当前消费者负载均衡分配的队列
这个方法看上去像兜底的更新消费偏移量(广播下持久化),某些情况下会遗漏,则每次获取消息时检查超时5S就进行兜底更新
而consumeRequestCache.poll
从名称上看就像是专门存储consumeRequestCache的缓存
之前的文章也说过ConsumeRequest封装的消费请求,其中包含本次消费的消息列表以及对应的队列MessageQueue和ProcessQueue
BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>()
而存储ConsumeRequest的ConsumeRequestCache是阻塞队列,那就是明显的生产者、消费者模式
接下来只需要看看什么场景下会将ConsumeRequest放入阻塞队列,即可“逆推”出拉取消息的流程
pull 拉取消息
ConsumeRequestCache.put的方法只有一处就是提交消费请求
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException e) {
log.error("Submit consumeRequest error", e);
}
}
而该方法在什么时机下会被调用,相信看过前几篇文章的同学们都很熟悉(即拉取到消息后),这个方法是在执行PullTaskImpl任务中找到消息后被调用的
PullTaskImpl
通过名字也可以猜测PullTaskImpl任务就是拉取消息的任务
通过方法可以看出:期间也会去做检查、流控,然后获取队列拉取偏移量进行拉取消息,拉到消息后将消息放入processQueue并封装消费请求进行提交,通知后续消息消费流程
public void run() {
if (!this.isCancelled()) {
this.currentThread = Thread.currentThread();
//队列暂停 延时1S重试
if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
return;
}
//流控检查 不满足延时重试
//...
long offset = 0L;
try {
//获取队列下次拉取的偏移量
offset = nextPullOffset(messageQueue);
} catch (Exception e) {
//失败延时重试
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
}
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
String topic = this.messageQueue.getTopic();
//订阅数据
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}
//拉取消息
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
case FOUND:
//找到消息的情况 队列加锁
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
//拉取到的消息放入processQueue
processQueue.putMessage(pullResult.getMsgFoundList());
//封装消费请求并提交
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
//更新拉取偏移量
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
} catch (InterruptedException interruptedException) {
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
if (!this.isCancelled()) {
//未取消继续延时拉取
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
}
}
}
在通过队列获取偏移量时,优先使用seek手动设置的偏移量作为消费偏移量,然后再考虑拉取的偏移量,如果内存中拉取偏移量未设置要向broker获取
private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1;
//优先使用手动设置的偏移量
long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) {
offset = seekOffset;
//手动设置则将其更新为消费偏移量
assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
assignedMessageQueue.setSeekOffset(messageQueue, -1);
} else {
//然后考虑拉取偏移量
offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
//拉取偏移量未设置则向broker获取
offset = fetchConsumeOffset(messageQueue);
}
}
return offset;
}
pull拉取消息的方法中,最终会采用同步的方式向Broker拉取数据(默认10S超时)pullMessageSync
(DefaultMQPushConsumer拉取消息采用的是异步)
(如果Broker没有消息的话也是长轮询机制的流程,有消息到达会拉取完再返回,长轮询机制在拉取消息的文章中也说过这里就不过多叙述)
最终更新完偏移量,只要任务未被取消则会继续执行该任务 scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS)
再平衡触发拉取消息任务
拉取流程也是类似的,只是有的细节实现不同,那么再来看看何时会触发PullTaskImpl任务
PullTaskImpl任务被构造的方法有两处:
- seek手动更改偏移量时,构造PullTaskImpl任务后异步执行拉取消息
- 再平衡机制触发
集群模式下会根据Topic进行再平衡,如果更新processQueue,队列需要更改时会调用messageQueueChanged
(之前再平衡的文章分析过该流程,只是推和拉的具体实现不同,这里简单回顾下)
//根据分配的队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
//更改过后还需要更新队列 维护拉取任务
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
最终调用updatePullTask
更新拉取任务,将不需要负责的队列任务取消,新增需要负责的队列任务启动
private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!mqNewSet.contains(next.getKey())) {
//取消不再负责队列的拉取任务
next.getValue().setCancelled(true);
it.remove();
}
}
}
//新增并启动需要负责队列的拉取任务
startPullTask(mqNewSet);
}
至此拉取消息的定时任务就会被再平衡机制给启动
虽然拉取消息的任务是同步拉取,但是是放在线程池中执行的,并不会阻塞其他队列的拉取
向Broker更新消费偏移量也是相同的,MQClientInstance启动时开启默认5S的定时任务进行同步消费偏移量MQClientInstance.this.persistAllConsumerOffset()
总结
DefaultLitePullConsumer运行流程与推送的消费者类似,只是部分方法内部实现不同
再平衡机制会将队列负载均衡到消费者,同时更新队列对应的拉取任务
拉取任务使用线程池执行,拉取前会检查状态以及流控失败就延迟重试,然后获取下次拉取消息的偏移量,接着同步向broker进行拉取消息
如果拉取到消息,会将消息存储在队列对应的processQueue,并封装消费请求提交到ConsumerQueueCache中
拉取与推送的一大区别是,拉取获取消息的逻辑需要自己来实现,更加自由易扩展,poll获取消息则是从ConsumerQueueCache中获取消费请求并拿到消息再进行处理
最后(点赞、收藏、关注求求啦~)
我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)