RocketMQ高可用设计之异步刷盘
【摘要】 RocketMQ高可用设计之异步刷盘比起同步刷盘,异步刷盘效率更高,也是生产中首选使用的刷盘策略,而RocketMQ默认采用异步刷盘,异步刷盘两种策略,分为开启缓冲池和不开启缓冲池两种模式。 CommitLog的handleDiskFlush方法:CommitLog的handleDiskFlush方法:public void handleDiskFlush(AppendMessageRes...
RocketMQ高可用设计之异步刷盘
比起同步刷盘,异步刷盘效率更高,也是生产中首选使用的刷盘策略,而RocketMQ默认采用异步刷盘,异步刷盘两种策略,分为开启缓冲池和不开启缓冲池两种模式。
CommitLog的handleDiskFlush方法:
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
通过这个刷盘的方法我们知道,CommitLog的handleDiskFlush()这个方法分为同步刷盘和异步刷盘,而同步刷盘我们在上篇文章进行了分析,我们主要看一下异步刷盘的逻辑
开启缓冲池模式
class CommitRealTimeService extends FlushCommitLogService {
private long lastCommitTimestamp = 0;
@Override
public String getServiceName() {
return CommitRealTimeService.class.getSimpleName();
}
@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//每次提交间隔200毫秒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//每次提交最少4页内存数据(16KB)
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//距离上次提交时间阈值为200毫秒
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。
- 判断是否超过200毫秒没提交,需要强制提交
- 提交到MappedFile,此时还未刷盘
- 然后唤醒刷盘线程
- 在Broker正常停止前,提交内存page中的数据
不开启缓冲池
不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。
FlushRealTimeService
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0;
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//每次Flush间隔500毫秒
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//每次Flush最少4页内存数据(16KB)
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//距离上次刷盘时间阈值为10秒
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return FlushRealTimeService.class.getSimpleName();
}
private void printFlushProgress() {
// CommitLog.log.info("how much disk fall behind memory, "
// + CommitLog.this.mappedFileQueue.howMuchFallBehind());
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
- 判断是否超过10秒没刷盘了,如果超过强制刷盘
- 等待Flush间隔500ms
- 通过MappedFile刷盘
- 设置StoreCheckpoint刷盘时间点
- 超过500ms的刷盘记录日志
- Broker正常停止前,把内存page中的数据刷盘
总结
这篇文章我们讲了RocketMQ的异步刷盘的逻辑,总体上分为开启缓存池的策略和不开启缓存池的策略,默认是不开启缓冲池。
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
- 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏
- 关注
盼盼小课堂
,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)