RocketMQ(三):面对高并发请求,如何高效持久化消息?
RocketMQ(三):面对高并发请求,如何高效持久化消息?
上篇文章我们分析完RocketMQ发送消息的原理,得到结果客户端会通过RPC组件向Broker进行通信
Broker收到请求后需要将消息进行持久化,一旦涉及到持久化,服务器的性能会急速降低,并且消费者进行消费时还需要读取消息,从磁盘的读取也是会大大降低性能的
本篇文章就要来分析,RocketMQ的Broker在这种频繁读写的场景,是如何进行高效读写的
存储文件
Broker这种频繁读写的场景中,提供三种文件满足持久化消息时的高效读写,分别为CommitLog、ConsumerQueue、IndexFile
CommitLog
为了避免消息持久化时的写入性能开销过大,Broker采用顺序写入,无论消息属于哪个Topic又或者是哪个队列,都顺序写入CommitLog文件
CommitLog文件目录下以起始偏移量进行命名,每个文件固定为1G,从00000000000000000000
文件开始,接着是00000000001073741824
,然后是00000000002147483648
文件,后续以此类推
这些以偏移量命名的文件在源码中被定义为MappedFile
,从名字可以看出它使用内存映射mmap,在频繁读写、大文件的场景,使用mmap避免数据拷贝的性能开销
由于消息大小不一,持久化到MappedFile
时每条消息偏移量也不是固定的
MappedFile
的创建是需要扩容时才创建的,一连串的MappedFile
组成CommitLog文件
ConsumerQueue
虽然顺序写能够大大节省写入的开销,但是并不方便查询,因为消息被写顺序写入CommitLog时,并没有区分是哪个Topic、队列的
这样在进行消费时要读取消息,根据topic、队列、队列上的偏移量来获取消息时,遍历查找是不现实的,因此还要生成逻辑文件,便于进行查找
ConsumerQueue(消费队列),以Topic进行一级分类,然后再以Topic下的队列ID进行二级分类,队列下的每个文件(固定大小6,000,000 B)可以存储30W条ConsumerQueue记录(20 B)
并且命名也是以起始偏移量进行命名,比如00000000000000000000
,00000000000006000000
,00000000000012000000
ConsumerQueue记录固定大小为20B,其中8B记录对应消息在CommitLog上的偏移量、4B记录消息长度、8B记录tag哈希,其中依靠前两个字段可以快速找到CommitLog中的消息
ComsumerQueue与CommitLog文件的关系,类比MySQL数据库中的二级索引与聚簇索引
通过二级索引找到满足条件的记录后,回表快速定位到聚簇索引上的记录(如果不理解MySQL索引的同学可以查看MySQL进阶专栏)
最后一个字段tag哈希用于消息过滤,消费者拉取指定tag消息时,如果哈希不满足就不会进行拉取
在图中ConsumerQueue文件以Topic进行分类,分为Topic A、B两个文件,其中TopicA下根据队列ID存在0、1、2三个文件
文件中存储的记录由commitlog offset消息在commitlog的偏移量、size消息大小和tag哈希值组成
消费者组中的消费者A向队列0、1拉取消息,消费者B向队列2拉取消息
拉取消息时Broker根据ConsumerQueue记录的commitlog offset可以在CommitLog文件中找到需要的消息
需要注意的是,CommitLog与ConsumerQueue文件虽然占用空间不同,但底层都是使用MappedFile的,一个MappedFile相当于一个文件
IndexFile
IndexFile文件为哈希索引,文件分为三个部分:文件头、哈希槽、索引项
文件头用于存储通用数据并固定为40B
public void load() {
//最早消息存储到commitlog时间 8B 用来计算时间差
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
//最晚时间 8B
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
//消息存储到CommitLog最小偏移量 8B
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
//消息存储到CommitLog最大偏移量 8B
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
//最大可以存储的哈希槽数量
this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
//以使用的索引数量
this.indexCount.set(byteBuffer.getInt(indexCountIndex));
//从1开始
if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}
哈希槽每个占用4B,用来寻址,能够找到索引项
索引项每个占用20B,4B存储哈希值、8B存储消息在CommitLog中的偏移量、4B存储与前一个消息持久化的时间差(单位秒,用来时间范围查询)、4B存储下个索引项的偏移量,用于寻址(哈希冲突时链地址法)
文件大小 = 文件头 40B + 哈希槽数量 * 哈希槽固定大小 4B + 索引数量 * 索引固定大小 4B
int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
使用流程与哈希表类似:
- 通过
topic + # + key
的形式来计算哈希值,哈希值模上哈希槽的数量就找到对应的哈希槽 - 通过哈希槽找到对应的索引项,对比哈希值
- 哈希值相同则获取偏移量,再去CommitLog寻找
- 哈希值不相同则根据联表向后查找下一个索引项
Broker存储消息的流程中除了CommitLog、ConsumerQueue、IndexFile文件外,还会存储其他文件,后文用到了再说,最终Broker存储的架构图如下:
数据刷盘与同步
在分析原理前,再来说下数据刷盘和同步的几种方式:
数据刷盘:将操作系统内核缓冲区中的数据刷入磁盘
刷盘可以分为同步、异步两种方式,默认异步:
- 同步:向内核缓冲区写完数据后开始等待,直到对应的脏页刷入磁盘再返回;性能差,但可靠性好
- 异步:向内核缓冲区写完数据后立即返回,根据频率、页数等配置将脏页数据刷入磁盘;性能好,但可靠性差
数据同步:主Broker持久化数据后,再将数据同步给从Broker,也叫主从复制
主从复制也分为同步、异步两种方式,默认异步:
- 同步:向内核缓冲区写完数据后开始等待,直到从broker也持久化完数据;性能差,但可靠性好
- 异步:向内核缓冲区写完数据后直接返回,异步线程对从broker进行数据同步
后文将会从源码的角度进行分析
存储原理
因为上篇分析过生产者发送消息流程的原理,介绍完存储文件后,我们从Broker接收消息的源码开始分析,正好接上原理分析的流程
(Broker的源码去GitHub拉下来查看,如果要跑起来,还要配置Broker的一些参数)
Netty服务器接收请求
上篇文章发送消息RPC时说过RocketMQ网络通信使用的Netty框架,那么Broker作为服务端一定会有Netty的服务器
在这个Netty服务器中pipeline可以配置一些处理器用于处理请求、响应
NettyRemotingServer.start
在启动的流程中,会将NettyServerHandler加入pipeline处理请求
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
//NettyServerHandler 加入 pipeline
serverHandler
);
}
});
不认识netty的同学也不要慌,就当它是个网络通信的组件
NettyServerHandler处理请求时会调用processMessageReceived
方法
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
processMessageReceived
方法会根据类型分请求/响应两种情况进行处理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
调用processRequestCommand
方法,会先通过请求的类型获取Pair
Pair中存在NettyRequestProcessor(处理请求)和ExecutorService(对应的线程池)
然后将任务提交到线程池,任务代码我先省略,待会查看,如果不支持请求类型会直接响应
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//通过请求code获取Pair,NettyRequestProcessor为处理请求,ExecutorService为对应的线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
//任务..
try {
//将任务提交到线程池
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
//...
}
} else {
//响应 请求类型不支持处理
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
任务中主要做几件事:
- 解析请求地址
- 执行处理前的钩子
- 封装异步回调,包括:执行处理后的钩子和写回响应
- 调用NettyRequestProcessor处理器的处理方法,如果是异步就携带回调,如果不是则执行完再调用下回调(流程类似)
Runnable run = new Runnable() {
@Override
public void run() {
try {
//解析请求地址
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//rpc处理前的钩子
doBeforeRpcHooks(remoteAddr, cmd);
//异步回调 处理后的钩子和写响应
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
//处理后的钩子
doAfterRpcHooks(remoteAddr, cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
try {
//写响应
ctx.writeAndFlush(response);
} catch (Throwable e) {
//...
}
}
}
}
};
//调用处理器的方法
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
//...
}
}
};
SendMessageProcessor 处理请求
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
}
public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
}
处理请求的处理器SendMessageProcessor是异步的,因此会调用asyncProcessRequest
方法
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).
thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());
}
asyncProcessRequest
处理请求分成两种情况:
- 消费者发送的,调用
asyncConsumerSendMsgBack
- 剩下的情况就是生产者发送的消息,需要解析请求头、执行处理发送消息前的钩子、查看是否批量消息(分情况处理)
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
//解析请求头
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
//执行处理发送消息前的钩子
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
//是否批量消息 分情况处理
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
查看普通消息的情况,调用 asyncSendMessage
,才开始真正处理发送消息的请求:
- 获取消息体、队列ID、Topic等信息,将数据注入MessageExtBrokerInner
- 是否为事务消息,使用MessageExtBrokerInner后续执行
private CompletableFuture<RemotingCommand> asyncSendMessage(
ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {
//生成通用响应
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
//获取消息体、队列ID、Topic等信息
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//将信息封装到MessageExtBrokerInner
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
//set...
CompletableFuture<PutMessageResult> putMessageResult = null;
//分情况处理,一个是事务消息,另一个是普通消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(transFlag)) {
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
//写响应
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
继续查看普通消息的后续流程,this.brokerController.getMessageStore().asyncPutMessage(msgInner)
getMessageStore
会获取消息存储的组件MessageStore进行后续的存储流程
MessageStore 存储消息
MessageStore负责消息的存储,除了写CommitLog文件外,还要去写ConsumerQueue、IndexFile等文件
采用DefaultMessageStore处理消息:
(我们先把写CommitLog的主流程继续走完)
- 先检查存储、消息、本地MQ是否正常,这里的本地MQ并不是指RocketMQ,而是本地内存MQ
- 调用CommitLog处理消息(历尽千辛万苦终于看到熟悉的CommitLog了~)
- 处理完统计耗时、失败次数等
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
//检查存储、消息、本地MQ
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
//...
long beginTime = this.getSystemClock().now();
//调用CommitLog处理消息
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
//处理后要统计下耗时,在哪个时间段,累计失败次数等
putResultFuture.thenAccept(result -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
return putResultFuture;
}
写CommitLog
CommitLog追加消息
CommitLog.asyncPutMessage
终于到达CommitLog存储消息的流程,这里的流程还是比较重要的:
- 写前加锁 (由于会顺序写同一个文件,那一定会出现并发写数据的情况,因此要加锁)
- 获取最后一个MappedFile,没有就创建 (MappedFile就是对应以偏移量命名的文件,第一个文件不一定为
00000000000000000000
,因为消费完会删除,第一个文件为偏移量最小的) - 使用MappedFile追加消息
- 处理结果,写后释放锁
- 提交刷盘请求 (mmap只是将数据写到page cache,还需要根据同步/异步刷盘策略再进行刷盘)
- 提交主从复制的请求 (也是同步/异步的策略进行主从复制)
- 刷盘、主从复制请求完成后才返回
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
//处理事务消息 省略...
//...
//加锁 自旋锁或可重入锁
putMessageLock.lock();
try {
//获取mappedFileQueue队列中最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//mappedFile为空要创建
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//mappedFile追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
//处理结果
switch (result.getStatus()) {
//...
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} finally {
beginTimeInLock = 0;
//释放锁
putMessageLock.unlock();
}
//...
//提交刷盘请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交传输从节点数据请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
//都提交后返回
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
mappedFile追加消息就是计算偏移量,再将数据写入缓冲区
创建MappedFile
CommitLog追加消息的流程中,如果MappedFile不存在,则会创建,最终调用doCreateMappedFile
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
//采用allocateMappedFileService创建
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
//否则自己创建
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
return mappedFile;
}
AllocateMappedFileService
是负责创建MappedFile的组件,会将请求放入内存队列this.requestQueue.offer
后续由它的线程取出请求进行创建
刷盘
追加完消息后会提交两个请求:
submitFlushRequest
提交刷盘请求,主要分两种情况:同步、异步
submitReplicaRequest
提交主从复制请求,也是分为同步情况与刷盘类似,只是增加HA的组件
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush 同步
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
//GroupCommitService添加请求
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
//异步唤醒
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
如果是同步,则会使用GroupCommitService
进行同步刷盘,将请求放入它的队列
如果是异步,则唤醒FlushRealTimeService
进行刷盘
默认情况下,刷盘策略为异步
返回后,由外层的 DefaultMessageStore
的 waitForPutResult
进行阻塞等待结果
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
return waitForPutResult(asyncPutMessage(msg));
}
同步刷盘
CommitLog
在构造时根据字段 flushDiskType
判断刷盘策略是同步还是异步(默认异步)
同步使用GroupCommitService,异步使用FlushRealTimeService
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//...
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
}
GroupCommitService负责同步刷盘,提供两个队列
requestsWrite负责写,提交刷盘任务时会被放入队列中
requestsRead负责读,每次刷盘时取出请求进行刷盘
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
GroupCommitService 会轮询取出请求进行刷盘
public void run() {
while (!this.isStopped()) {
try {
//等待 其中会交换队列
this.waitForRunning(10);
//取出请求 刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
}
//结束前再刷盘兜底
synchronized (this) {
this.swapRequests();
}
this.doCommit();
}
waitForRunning
等待期间会调用交互请求队列,将读写队列互换
private void swapRequests() {
lock.lock();
try {
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}
doCommit
会遍历读队列进行刷盘,通过比较偏移量判断是否刷盘成功,刷盘可能涉及两个mappedFile,因此可能循环两次
private void doCommit() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
//通过比较偏移量判断是否刷盘成功
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
//刷盘可能涉及两个mappedFile因此可能要循环两次
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
//填充结果:成功 或 刷盘超时
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead = new LinkedList<>();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
// 读队列没数据的情况下直接刷
CommitLog.this.mappedFileQueue.flush(0);
}
}
其中 flush(0)
表示刷盘没有最低页数的要求,会尽力刷盘
最终调用 force
进行刷盘,fileChannel
是文件的channel,mappedByteBuffer
是它的直接内存缓冲区
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
值得思考的是读写队列都采用线程不安全的LinkedList
,在并发提交刷盘任务的情况下需要加锁同步,那为啥不使用JUC下的队列呢?
也许是因为刷盘读队列消费时为单线程并不需要使用同步手段,最终才选择LinkedList
异步刷盘
FlushRealTimeService 负责异步刷盘
当使用异步刷盘时,也是通过轮询刷盘,可以通过配置参数调整刷盘频率、每次刷盘最少的页数等
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//是否使用Thread.sleep 默认true
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//刷盘频率 默认500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//每次最少刷屏页数 默认4页
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//...
try {
//等待 刷盘频率的时间
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
//刷盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
//...
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
//结束前 兜底刷盘
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
与同步刷盘不同:刷盘频率很少(等待时间久)、每次刷盘并不是尽力刷而是根据最少页数进行刷盘
小结
至此CommitLog文件被刷入磁盘,主从复制的“支线”后续文章再来分析
由于消息写入CommitLog流程漫长、复杂,我们总结核心流程,简化流程:
- Netty服务器接收请求
- SendMessageProcessor将请求转化为MessageExtBrokerInner (Broker内部处理的消息)
- MessageStore 检查消息、存储状态,调用CommitLog持久化
- CommitLog 写前加锁防并发写
- CommitLog 没有MappedFile或者已满,将请求放入内存队列,通过AllocateMappedFileService异步创建
- CommitLog 使用最后一个MappedFile,追加数据,最终通过fileChannel或它的缓冲区mappedByteBuffer进行force刷盘
- CommitLog 写完释放锁
- CommitLog 提交刷盘请求:默认异步刷盘根据配置的频率和每次刷盘页数进行刷盘,同步刷盘会将请求提交到写队列,循环消费,消费前置换读写队列,取出请求尽量刷盘
- CommitLog 提交主从复制请求
- MessageStore 刷盘、主从复制请求完成后返回
再精简下流程:Netty -> SendMessageProcessor -> MessageStore -> CommitLog -> MappedFile -> fileChannel/mappedByteBuffer force
当消费者获取消息后,CommitLog中存储的消息就不需要保存了,因此会有清理线程来进行定时清理
实现定时清理的组件是MessageStore下的CleanCommitLogService,实际上就是操作MappedFile,销毁关闭资源,这里就不过多叙述
写ConsumerQueue
整理完“主线”,消息持久化并没有结束,还有部分的“支线”要挖掘
(就像黑神话悟空一样,要把图舔干净才有美妙的探索感~)
与CommitLog文件对标的ConsumerQueue、IndexFile文件似乎没有出现主流程的源码中,它们是啥时候生成的?接下来让我们继续分析:
ReputMessageService 重投消息
DefaultMessageStore下的**ReputMessageService
用于重投消息,它会根据CommitLog上的偏移量封装成请求,重投给其他CommitLogDispatcher
进行后续处理**
常见的CommitLogDispatcher
有写ConsumerQueue的CommitLogDispatcherBuildConsumeQueue
、写IndexFile的CommitLogDispatcherBuildIndex
线程循环执行,每次循环等待1ms,然后调用doReput
,其中reputFromOffset为它记录的偏移量
- 根据重投偏移量获取CommitLog(封装为SelectMappedBufferResult)
- 然后每次循环检查、解析每条消息,封装为DispatchRequest
- 如果成功就使用对应处理器进行调用 doDispatch
private void doReput() {
//如果reputFromOffset偏移量比commitLog最小偏移量还小,就从最小偏移量开始重投
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
//偏移量没超过commitLog最大就一直循环重投
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//根据偏移量获取MappedFile以及缓冲池
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
//每次获取一条消息进行调用
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//检查并解析每条消息
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//成功进行调用
DefaultMessageStore.this.doDispatch(dispatchRequest);
//如果不止从节点且 开启长轮询 且 消息到达监听器不为空 会调用消息到达监听器 用于消费的长轮询(下篇文章再说消息消费)
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
//调用消息到达监听器
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
}
//...
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
doDispatch
方法会遍历 CommitLogDispatcher
进行处理
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
CommitLogDispatcherBuildConsumeQueue 追加数据
CommitLogDispatcherBuildConsumeQueue 会调用putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}
先通过topic、队列id获取对应的ConsumerQueue,然后调用putMessagePositionInfo
先在ConsumerQueue的缓冲区上写ConsumerQueue记录(消息偏移量、消息大小、tag哈希),然后获取映射文件MappedFile,最后再往文件里追加数据
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//先在自己的缓冲区上写数据
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//8B 消息在CommitLog偏移量
this.byteBufferIndex.putLong(offset);
//4B 消息大小
this.byteBufferIndex.putInt(size);
//8B tag哈希
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//...
//往文件里追加消息
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
注意MappedFile是映射文件,虽然ConsumerQueue与CommitLog都是往MappedFile追加数据,但它们的映射的文件不同
其实流程是类似的,这里只追加了数据,剩下的刷盘还要其他组件异步去做
FlushConsumeQueueService 异步刷盘
FlushConsumeQueueService 负责ConsumerQueue文件的异步刷盘
根据配置flushIntervalConsumeQueue
刷盘频率默认1000ms,也就是循环中的等待时间,等待后调用doFlush
进行刷盘
流程与异步刷盘类型,也是读取刷盘页数、频率、间隔等参数进行刷盘
不同的是ConsumerQueue文件要从Topic、队列ID目录一层一层遍历刷盘
private void doFlush(int retryTimes) {
//刷盘页数 默认2页
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
if (retryTimes == RETRY_TIMES_OVER) {
flushConsumeQueueLeastPages = 0;
}
long logicsMsgTimestamp = 0;
//两次刷盘间隔 默认60s
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
//如果超过间隔时间 则尽量刷盘
this.lastFlushTimestamp = currentTimeMillis;
flushConsumeQueueLeastPages = 0;
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
//Key为Topic Value中的Key则是Topic下的队列ID,ConsumerQueue则是队列ID目录下的ConsumerQueue文件
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
//遍历Topic
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
//遍历队列
for (ConsumeQueue cq : maps.values()) {
//遍历队列下的ConsumerQueue文件
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
//ConsumerQueue文件刷盘
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
//...
}
ConsumerQueue刷盘优先对自己进行刷盘,如果开启扩展文件(consumerqueue_ext)也会对其进行刷盘
public boolean flush(final int flushLeastPages) {
boolean result = this.mappedFileQueue.flush(flushLeastPages);
if (isExtReadEnable()) {
result = result & this.consumeQueueExt.flush(flushLeastPages);
}
return result;
}
this.mappedFileQueue.flush
刷盘上文以及说过,就不再分析了
ConsumerQueue记录对应的消息被消费完就不需要进行存储,因此也会有清理文件的组件
清理文件的组件是MessageStore下的CleanConsumeQueueService
,同时它也会去清理InfexFile文件(流程类似,这里就不多说了)
至此ConsumerQueue文件也进行了持久化,IndexFile文件追加数据的原理类似
写IndexFile
写IndexFile也是由ReputMessageService
重投的,由CommitLogDispatcherBuildIndex
进行处理
会先获取IndexFile,最终调用getAndCreateLastIndexFile
,主要流程为:
- 先从列表中获取最后一个IndexFile文件 (期间加读锁,因为列表是线程不安全的)
- 如果没获取到或者已满,则创建IndexFile后,加写锁放入列表再解锁
- 如果创建IndexFile要启动刷盘的线程持久化上一个IndexFile
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
//如果列表中有IndexFile则取最后一个IndexFile 期间加读锁
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
//已满也要创建
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
if (indexFile == null) {
try {
//如果没获取到则创建IndexFile
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
//并把它放入列表中 期间加写锁
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
if (indexFile != null) {
//开启负责刷盘的线程 将上一个文件进行刷盘
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
获取IndexFile后,就会调用putKey构建哈希索引
上篇文章说过,在默认消息发送的实现中,如果消息不是批量消息则会设置唯一标识
这里的Key是通过topic、#、唯一标识拼接而成的
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
//哈希槽的位置
int slotPos = keyHash % this.hashSlotNum;
//哈希槽偏移量
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
//哈希槽的值 可以找到
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//计算时间差 单位秒
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//计算要写入索引项的偏移量位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//写入索引项记录
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//覆盖哈希槽的值 下次再通过哈希槽找到的是这个新的索引项 头插法
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//...
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
}
//...
}
小结
至此已经描述完从接收消息到消息持久化CommitLog的主流程,以及异步持久化ConsumerQueue与IndexFile文件的流程
为了达到高性能,在这个持久化的过程中并不是同步的,也不是原子操作,这种持久化设计采用的是数据最终一致性,一旦节点宕机,文件漏写就会导致数据不一致
为了解决这个问题,需要在broker启动时判断上次是否为异常宕机,持久化文件是否写齐,如果不齐需要有对应的数据恢复过程(本文就不分析这部分的源码了,后续文章再说)
因此还会涉及到消息的重投,从而引起重复消费,这也是RocketMQ在保证消息不丢的同时,不能保证消息不重复
最终将相关操作绘制成一张流程图,如下:
总结
Broker为了持久化消息会写很多文件,其中主要为CommitLog、ConsumerQueue、IndexFile文件
为了实现高性能的写入,写入文件通常都是使用mmap(内存映射)对应源码中的MappedFile
CommitLog为消息顺序写入的文件,通过顺序写的方式提高写入性能,文件名以起始偏移量命名,固定1G,消息被消费后会删除
ConsumerQueue文件为消息的逻辑文件,会以Topic、队列ID、偏移量进行多级目录分类,每个也是以起始偏移量命名,固定6MB可以存储30W条ConsumerQueue记录
ConsumerQueue记录固定为20B,8B记录消息在CommitLog上的偏移量,4B记录消息大小,8B记录tag哈希值,通过前两个字段可以快速在CommitLog中找到对应消息,tag哈希值用于消息过滤,要拉取指定tag消息时,如果tag哈希值不对则过滤
IndexFile索引文件,可以看成消息Key和消息在CommitLog上偏移量的哈希索引文件,key由topic和消息唯一标识组成,通过key的哈希值模上哈希槽数量得到对应的哈希槽,根据哈希槽找到对应索引项,索引项上存储消息偏移量,能够快速找到消息(如果冲突则根据指针寻找下一个索引项)
默认刷盘和主从复制的方式都是异步,性能好但可靠性不好,同步虽然性能差但可靠性较好
Broker会使用Netty接收请求,再通过各种Processor对各种请求进行处理,如果是发送消息的请求最终会使用MessageStore进行存储
MessageStore复制消息的存储,读、写、清理、管理各种消息持久化相关文件
MessageStore会调用CommitLog进行写消息,CommitLog则是通过它的MappedFile进行写数据,在此期间可能多个线程需要写同一个MappedFile,因此需要加锁
如果没有或需要扩容,就要创建MappedFile,CommitLog将创建MappedFile的操作异步交给AllocateMappedFileService初始化MappedFile
MappedFile写完数据,数据就被映射在内核缓冲区,但此时还没有刷入磁盘,写完数据,还需要提交刷盘请求和同步请求
如果使用的是异步刷盘,则唤醒FlushRealTimeService的线程,根据配置的频率、页数、间隔等参数进行异步刷盘
如果使用的是同步刷盘,则将请求放入GroupCommitService的写队列中(需要使用同步手段防止并发),后续如果没返回值会阻塞
GroupCommitService会每隔10ms循环消费读队列中的请求进行刷盘,消费前等待时会将读写队列转换(因为写队列可能多线程写,而读队列只有一个线程读)
同步请求会交给HAService进行处理,流程类型刷盘,只不过过程是网络通信
最后主流程会等待刷盘、同步请求执行完,同步的话会阻塞,如果期间超时则会返回对应的响应状态,标识消息持久化可能失败(CommitLog持久化流程结束)
消费被消费后则不需要再存储,MessageStore会使用CleanCommitLogService定时清理
写完主要的CommitLog文件后,MessageStore会异步使用ReputMessageService重投消息,根据它的偏移量获取CommitLog文件,然后封装每一条消息交给CommitLogDispatcher调度器执行
写ConsumerQueue文件由CommitLogDispatcherBuildConsumeQueue进行调度,通过消息的Topic和队列ID找到对应的ConsumerQueue,再往对应MappedFile上追加数据
对于ConsumerQueue文件的刷盘,MessageStore会使用FlushConsumeQueueService读取配置异步进行刷盘,由于ConsumerQueue根据Topic、队列ID进行多级分类,因此刷盘时也要多层遍历刷盘
同时MessageStore会使用CleanConsumeQueueService定时清理过期的ConsumerQueue、IndexFile文件
写IndexFile文件由CommitLogDispatcherBuildIndex调度,它会调用IndexFileService对IndexFile文件进行追加数据,期间创建新的IndexFile才对上一个文件进行异步刷盘
为了高性能,在消息持久化的过程中使用顺序写、MMap、最终一致性等特点,节点宕机可能导致文件数据未写入,因此启动时会检查文件是否写入成功,如果写入文件不齐全,还会涉及到恢复文件,消息重投,可能导致消息重复消费
最后(点赞、收藏、关注求求啦~)
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)