RocketMQ(四):消费前如何拉取消息?(长轮询机制)
RocketMQ(四):消费前如何拉取消息?(长轮询机制)
上篇文章从Broker接收消息开始,到消息持久化到各种文件结束,分析完消息在Broker持久化的流程与原理
消费者消费消息前需要先从Broker进行获取消息,然后再进行消费
为了流程的完整性,本篇文章就先来分析下消费者是如何获取消息的,文章内容导图如下:
获取消息的方式
消费者并不是每次要消费一条数据就向Broker获取一条数据的,这样RPC的开销太大了,因此先从Broker获取一批数据到内存中,再进行消费
消费端获取消息通常有三种方式:推送消息、拉取消息、长轮询(推拉结合)
推送消息:消息持久化到Broker后,Broker监听到有新消息,主动将消息推送到对应的消费者
Broker主动推送消息具有很好的实时性,但如果消费端没有流控,推送大量消息时会增加消费端压力,导致消息堆积、吞吐量、性能下降
拉取消息:消费端可以根据自身的能力主动向Broker拉取适量的消息,但不好预估拉取消息的频率,拉取太慢会导致实时性差,拉取太快可能导致压力大、消息堆积
长轮询:在拉取消息的基础上进行改进,如果在broker没拉取到消息,则会等待一段时间,直到消息到达或超时再触发拉取消息
长轮询相当于在拉取消息的同时,通过监听消息到达,增加推送的优点,将拉取、推送的优点结合,但长连接会更占资源,大量长连接会导致开销大
RocketMQ中常用的消费者DefaultMQPushConsumer
,虽然从名字看是“推送”的方式,但获取消息用的是长轮询的方式
这种特殊的拉取消息方式能到达实时推送的效果,并在消费者端做好流控(拉取消息达到阈值就延时拉取)以防压力过大
拉取消息原理
DefaultMQPushConsumer
的内部实现DefaultMQPushConsumerImpl
有一个MQ客户端实例MQClientInstance
它内部包含的PullMessageService
组件,就是用于长轮询拉取消息的
PullMessageService会使用DefaultMQPushConsumerImpl与Broker建立长连接并拉取消息,拉取的消息存放在本地内存队列(processQueue)中,方便后续给消费者消费
其中涉及一些组件,先简单介绍,方便后续描述:
-
ProcessQueue:从Broker拉取的消息存放在这个内存队列中
- 底层使用有序的TreeMap来进行存储的,其中Key为偏移量、Value为存储的消息
-
PullRequest:拉取请求,拉取消息(以队列为)基本单位
-
PullMessageService轮询时,每次取出PullRequest再进行后续流程
-
存储消费者组、对应的MessageQueue(broker上的队列)、ProcessQueue(消费者内存队列)、拉取的偏移量等信息
-
PullRequest(拉消息)、MessageQueue、ProcessQueue(存消息)一一对应
-
-
PullRequestQueue:PullRequest的队列
- 由于消费者可能同时消费多条队列,每次拉取的基本单位又是以同个队列进行拉取,因此PullMessageService需要轮询取出PullRequest进行后续拉取流程
- 拉取消息失败或下次拉取消息都会把PullRequest重新投入队列中,由后续PullMessageService轮询取出再进行拉取消息
简化的流程为:
- 从队列取出PullRequest,然后封装请求向Broker异步发送
- 响应后通过回调将查到的消息放入其内存队列中,方便后续消费
- 在此期间最终都会将PullRequest放回队列(失败可能延时放回),便于下次拉取该队列的消息
发送拉取消息请求
PullMessageService启动时也会使用线程进行轮询,会从pullRequestQueue取出PullRequest进行后续的拉取消息
public void run() {
//...
while (!this.isStopped()) {
//取出PullRequest 没有则阻塞
PullRequest pullRequest = this.pullRequestQueue.take();
//拉取消息
this.pullMessage(pullRequest);
}
}
pullMessage 拉取消息前准备参数
pullMessage
最终会调用DefaultMQPushConsumerImpl.pullMessage
,代码虽然很多,但主要流程为校验、获取参数、调用核心方法
- 进行参数、状态、流控的校验,如果失败会调用
executePullRequestLater
后续延时50ms将拉取请求重新放回队列中,也就是后续再进行该队列的消息拉取 - 如果是第一次执行,要获取消费进度的偏移量
computePullFromWhereWithException
,后续使用PullRequest上的nextOffset(集群模式向Broker获取) - 获取消费端相关信息(后续会封装成请求),创建回调,回调在RPC后调用
- 执行拉取消息的核心方法
pullKernelImpl
public void pullMessage(final PullRequest pullRequest) {
//获取内存队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//内存队列设置最新的拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
//参数、状态校验、流控..
//内存队列中的消息数量
long cachedMessageCount = processQueue.getMsgCount().get();
//内存队列中消息大小 MB
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//如果数量太多 默认1000 说明当前已经消息堆积 需要进行流控 后续定时将拉取请求再放入队列中 后续再来拉取消息
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
//消息太大 类似 同理 默认100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
//...
//如果是第一次要去获取拉取消息的偏移量
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
//获取当前Topic的订阅数据
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
final long beginTimestamp = System.currentTimeMillis();
//创建回调 这里的回调是从broker拉取消息后执行的回调 后面再分析,这里先省略代码
PullCallback pullCallback = new PullCallback();
//...
try {
//拉取消息核心实现
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
computePullFromWhereWithException 获取拉取消息的偏移量
computePullFromWhereWithException
方法由再平衡组件RebalancePushImpl
调用
(再平衡是消费者间重新分配队列的机制,增加/减少队列、消费者都会触发再平衡机制,平均分配给消费者队列,PullRequest也是它分配的,细节后文再说)
这里的拉取消息偏移量又可以叫上一次消费的偏移量,因为拉取消息从上次消费的偏移量开始拉取
当消费者首次拉取消息时,需要查询拉取偏移量(即上一次消费的偏移量),广播模式下这个偏移量在消费者端记录,就可以从内存中获取
而集群模式下,偏移量在broker记录,需要从broker获取,最终调用fetchConsumeOffsetFromBroker
获取
fetchConsumeOffsetFromBroker
也是先去获取Broker信息,本地没有就从NameServer获取
然后通过客户端API的queryConsumerOffset
发送获取消费偏移量的请求
pullKernelImpl 拉取消息核心
在拉取消息核心方法中会去获取Broker等信息、然后封装请求,再通过Netty调用
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//缓存中查broker信息
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
//没查到 则RPC查NameServer 并加入缓存
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
{
//检查是否为空或tag过滤 不是则抛异常
// check version
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
//最大拉取数量 默认 32
requestHeader.setMaxMsgNums(maxNums);
//系统标记
requestHeader.setSysFlag(sysFlagInner);
//已提交偏移量 用于消费进度
requestHeader.setCommitOffset(commitOffset);
//Broker最大支持时间 15S
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
//消息过滤表达式
requestHeader.setSubscription(subExpression);
//消息过滤版本 通常是当前时间ms
requestHeader.setSubVersion(subVersion);
//消息过滤类型 通常是TAG类型
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
//netty RPC 请求 Broker
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
//NameServer都没查到broker 抛出异常
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
至此,发送拉取消息的请求的流程已经结束,其实整体流程与生产者发送消息流程也类似,比如参数校验、获取broker信息、封装请求、RPC调用、处理结果等
回调处理结果
由于是异步拉取消息,收到结果后会执行回调,结果有四种情况:
- FOUND:找到消息
- NO_NEW_MSG:没有最新消息
- NO_MATCHED_MSG:消息不匹配
- OFFSET_ILLEGAL:非法偏移量,可能太大或太小
回调代码也比较多,这里就不进行展示,直接总结:
- 无论回调成功,还是失败,最终都会将该PullRequest放回队列中,方便后续继续拉取消息
- 成功的情况下通常会更新下次拉取消息的偏移量(PullRequest的nextOffset)、将消息放入内存队列(processQueue)、提交消费请求(异步消费)
至此消费端拉取消息的流程已经结束,需要注意几个关键流程节点:
-
PullMessageService轮询获取PullRequest进行拉取消息
-
拉取消息前需要收集各种消费端数据,如果是集群模式下,首次调用还需要向Broker获取拉取消息的偏移量
-
封装拉取消息请求、回调后向Broker拉取消息,成功后回调会将消息存入PullRequest对应的ProcessQueue,同时将PullRequest返回队列,还会提交消费请求后续进行异步消费
注意将消息存入内存队列ProcessQueue还会发送消费请求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
,用于后续异步消费消息,这里图中暂时未画出,后文再描述消费过程
Broker处理查询消费偏移量请求
接下来再来看看Broker是如何获取消息并放回的
上篇文章曾分析过:Broker服务端的Netty是如何接收请求的,最终会让各种各样的Processor进行请求的处理
Broker由ConsumerManageProcessor来进行读写消费偏移量(写偏移量的原理留在分析消费流程的文章中说明)
ConsumerManageProcessor会使用ConsumerOffsetManager的 queryOffset
获取消费偏移量
ConsumerOffsetManager中使用双层Map的offsetTable来存储消费偏移量
ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer /*queueId*/, Long/*offset*/>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
offsetTable的第一层Key为topic@group
,用topic与消费组确定,第二层Key为队列ID
通过Topic、GroupName、队列ID等信息可以快速获取消费偏移量,如果没记录消费偏移量则使用该队列上的最小偏移量(从头开始)
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
//构建响应
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
//解析请求
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
//通过消费组名称、topic、队列id获取消费偏移量
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
//没有消费偏移量记录,则用队列上最小的偏移量
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
return response;
}
Broker加入查询消费偏移量后的流程如下:
Broker处理拉取消息请求
处理拉取消息请求的是PullMessageProcessor,它会调用processRequest
处理请求
processRequest
中的代码非常多,主要会封装响应、处理前的一些校验、根据请求信息查询消息,最后根据响应状态分别做处理
其实与写业务代码非常相同,这里它的核心方法是使用MessageStore进行查询消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
(上篇文章说过持久化时写数据用的是MessageStore,现在读数据当然也是使用MessageStore)
在这个方法中做校验的代码也很多,主要会使用ConsumerQueue记录过滤消息并快速找到CommitLog上的消息
简化流程为:
-
根据Topic、队列ID获取对应的ConsumerQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
(通过ConsumerQueue逻辑文件可以快速找到CommitLog上的消息)
-
根据拉取偏移量获取ConsumerQueue文件(缓冲区),方便后续读数据
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
(消费端的队列偏移量就是ConsumerQueue上的偏移量,每次拉取完会把下次拉取的偏移量返回进行更新)
-
计算本次读取的ConsumerQueue最大偏移量,然后开始循环读取ConsumerQueue记录并进行后续处理
(由每次最大拉取消息数量maxMsgNums决定,每条ConsumerQueue记录为20B)
int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
查询消息前,使用ConsumerQueue记录的tag哈希值进行消息过滤
messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)
-
根据ConsumerQueue记录上存在的偏移量和消息大小,找到CommitLog上的消息(缓冲区)
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
-
查询后二次消息过滤
messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)
-
最后把本次找到的消息加入结果,并更新偏移量,继续后续的循环
getResult.addMessage(selectResult);
最后将结果响应写回给消费者,后续消费者回调会将消息放入processqueue内存队列,等待后续消费者进行消费
至此有消息拉取的流程结束
Broker长轮询
上文中还说过如果只是通过消费端轮询拉取的方式,可能会导致实时性不好,拉取频率也会不好控制
为了优化这些缺陷,在没消息拉取的情况下会使用长轮询,每次等待5s再判断是否唤醒,如果超时或者监听到队列中有新的消息则会唤醒,并再次执行PullMessageProcessor拉取消息的流程,然后写回客户端
由于消费者客户端发送拉取消息的请求是异步的,因此在Broker上等待时并不会阻塞消费者拉取其他队列
消费者在发送拉取消息请求时,有两个与长轮询相关的参数:
BROKER_SUSPEND_MAX_TIME_MILLIS:Broker支持的最大超时时间,默认15000ms = 15S
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND:消费者网络请求最大超时时间,默认30000ms = 30s
Broker中负责长轮询的组件是 PullRequestHoldService
当PullMessageProcess未找到消息时,允许暂停的情况下会将参数封装成PullRequest
放入PullRequestHoldService暂停请求(因为此时没消息)
case ResponseCode.PULL_NOT_FOUND:
//允许
if (brokerAllowSuspend && hasSuspendFlag) {
//broker最大支持超时时间 默认15000ms
long pollingTimeMills = suspendTimeoutMillisLong;
//关闭长轮询的情况下 设置超时时间为 1000ms
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
//构建PullRequest
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//由PullRequestHoldService存储
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
//响应置空 暂时不会写回响应 因此达到长轮询的效果
response = null;
break;
}
PullRequestHoldService是维护长轮询的组件,当未找到消息暂停请求时,将请求进行存储
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
//key:topic@队列id
String key = this.buildKey(topic, queueId);
//获取队列上暂停拉取请求的列表
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
//没有就创建
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
//加入
mpr.addPullRequest(pullRequest);
}
思考:相同Topic下同一个队列一般只能由一个消费者进行消费,这里为什么要用列表存储?
(可能是因为消费者可能再平衡导致对消费的队列进行改变,或因为断线重连导致多次请求)
PullRequestHoldService会定时进行检测,如果长轮询就是5s,短轮询就是1秒
public void run() {
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
//长轮询 5s
this.waitForRunning(5 * 1000);
} else {
//1s
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
//检查暂停请求
this.checkHoldRequest();
} catch (Throwable e) {
}
}
}
检查暂停请求的流程就是遍历所有topic@队列下的拉取请求,判断是否有新消息到来,或者请求是否已超时
protected void checkHoldRequest() {
//遍历 key:topic@队列id
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
//根据topic、队列id获取队列当前最大偏移量 (通过比较暂停前记录的拉取偏移量可以知道是否有消息来了)
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
//检查 消息是否到达 或 请求是否超时
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
在通知消息到达的方法notifyMessageArriving
中,主要检查消息是否到达和超时:
- 比较偏移量判断消息是否到达,如果到达则判断是否满足消息过滤
- 同时也会检查请求是否超时(就是broker最大支持的超时时间 默认15s)
如果消息到达或请求超时都会进行唤醒并尝试拉取消息,否则会进行暂停
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
//根据key找到暂停拉取请求列表
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
//复制出来进行处理并清空mpr
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
//遍历队列上每个暂停的拉取请求
for (PullRequest request : requestList) {
//队列上最大偏移量
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
//如果队列上最大偏移量不大于拉取偏移量则更新偏移量 后续被重新加入暂停列表
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
//如果大于 说明消息到达
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
//消息到达并且满足过滤的情况下,唤醒并拉取消息
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
//超时也会唤醒并拉取消息
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
//加入重试列表 后续一起加入暂停拉取请求列表
replayList.add(request);
}
//其他情况重新加入暂停列表
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
executeRequestWhenWakeup会使用PullMessageProcessor尝试拉取消息,但这次拉取消息,如果没消息是不会允许被暂停的
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//调用PullMessageProcessor 第三个参数为false,这次不允许暂停
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
//写回响应
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
//...
});
} catch (Throwable e) {
}
}
} catch (RemotingCommandException e1) {
}
}
};
//提交任务执行
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
疑问:PullRequestHoldService长轮询下5S才检查一次请求,如果期间消息到达,那岂不是延迟太高了?
为了解决这个实时性差的问题,引入消息到达监听器MessageArrivingListener
,当消息到达时能够处理一些事情,比如进行通知的NotifyMessageArrivingListener
它也是通过调用this.pullRequestHoldService.notifyMessageArriving
通知请求拉取消息
那么监听器触发的时机在哪呢?看过上篇文章的同学就会知道,在消息重投进行调度写ConsumerQueue、IndexFile等其他文件后,会触发这个监听器(上篇文章也贴过此处源码)
//成功进行调度写ConsumerQueue、IndexFile等其他文件
DefaultMessageStore.this.doDispatch(dispatchRequest);
//如果不止从节点且 开启长轮询 且 消息到达监听器不为空 会调用消息到达监听器 用于消费的长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
//调用消息到达监听器
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
}
至此Broker长轮询机制的流程已经结束,小结一下流程:
- 未在Broker拉取到消息时会暂停请求
- 由pullRequestHoldService定时检查或消息到达监听器进行处理
- 如果消息到达并且满足匹配(不满足过滤条件)、请求超时,都会再次尝试进行拉取消息(这次的消息不会暂停)
- 后续再写回响应
总结
消息中间件消费端获取消息的方式通常有推送、拉取、长轮询(推拉结合)三种
Broker主动推送消息有很好的实时性,但消费端未做流控可能会压力大,导致吞吐量、性能下降,消息积压
消费者主动拉取消息能根据自己的消费能力决定拉取数量,但无法预估拉取频率,太慢会导致实时性差
长轮询是特殊的拉取方式,在拉取的基础上,如果未拉取到消息会进行等待,超时或消息到达后再进行拉取,弥补拉取方式实时性差的缺点,但大量长连接一直等待资源开销大
PullMessageService组件用于消息拉取,每次拉取以队列为单位,会从队列轮询获取PullRequest进行消息拉取
发送拉取消息API前会收集消费端参数作为请求内容,如果是首次消费还要先向Broker获取消费偏移量,才知道后续要从哪里进行拉取
最后发送拉取消息请求,由于该请求是长连接,可能会一直阻塞不返回,为了不阻塞拉取其他队列消息,这里使用异步发送,通过回调处理响应
收到响应后会把本次PullRequest重新返回队列,如果拉取到消息,还要把消息放入PullRequest对应的ProcessorQueue内存队列中并提交消费请求,后续消费时通过该内存队列获取消息
Broker使用ConsumerManageProcessor处理查询/修改消费偏移量的请求,读写消费偏移量其实就是读写ConsumerOffsetManager组件维护的Map(根据topic、消费者组、队列id读写Map中的消费偏移量)
Broker使用PullMessageProcessor处理拉取消息的请求,会先通过topic、队列id获取ConsumerQueue,然后循环解析ConsumerQueue记录,通过记录进行消息过滤(比较tag哈希值),最后通过ConsumerQueue记录的偏移量和消息大小信息,查找CommitLog上的消息,加入结果集,最后写回响应
Broker处理长轮询的组件是PullRequestHoldService,当拉取消息请求找不到消息时,会暂停请求存在PullRequestHoldService中,等到PullRequestHoldService定时检测或消息到达监听器触发,去通知消息到达,如果消息到达并且匹配(不被消息过滤)或暂停请求超时都会触发拉取消息,但这次拉取消息不能再暂停请求,是否有响应都会写回
最后(点赞、收藏、关注求求啦~)
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)