kafka源码解析之三:Log模块读写源码分析——(一)
Log模块读写源码分析
1. 基本原理
注:本文部分内容摘自互联网
1.1 Kafka消息存储方式
首先深入的了解一下Kafka中的Topic.Topic是发布的消息的类别或者种子Feed名。对于每一个Topic, Kafka集群维护这一个分区的log,就像下图中的示例:
每一个分区都是一个顺序的、不可变的消息队列,并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了。实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。
可以看到这种设计对消费者来说操作自如,一个消费者的操作不会影响其它消费者对此log的处理。
再说说分区。Kafka中采用分区的设计有几个目的。一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。第二,分区可以作为并行处理的单元。
Kafka每个分区会创建一个文件夹,文件夹的命名为topicName-分区序号,例如如下图:
而分区是有由多个segment组成,分成多个可以方便做日志清理、恢复等工作。每个segment以该segment第一条消息的offset命名并以“.log”为后缀。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,前面文件名命名也是一样的,以“.index”为后缀。如下图:
我们再来看看索引和日志文件内部的关系是什么,如下图:
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
segment 的log文件由许多message组成,下面详细说明message物理结构如下:
参数说明:
1.2 如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
· 第一步查找segment file
以上面为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
· 第二步通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
2. Log包相关组件
2.1 LogManager
LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录)。这个类应该说是log包中最重要的类了,也是kafka日志管理子系统的入口。日志管理器(log manager)负责创建日志、获取日志、清理日志。所有的日志读写操作都交给具体的日志实例来完成。日志管理器维护多个路径下的日志文件,并且它会自动地比较不同路径下的文件数目,选择在最少的日志路径下创建新的日志。Log manager不会尝试去移动分区,另外专门有一个后台线程定期地裁剪过量的日志段。下面我们先看看这个类的构造函数参数
1. logDirs: log manager管理的多组日志目录
2. topicConfigs: topic=>topic的LogConfig的映射
3. defaultConfig: 一些全局性的默认日志配置
4. cleanerConfig: 日志压缩清理的配置
5. ioThreads: 每个数据目录都可以创建一组线程执行日志恢复和写入磁盘,这个参数就是这组线程的数目,由num.recovery.threads.per.data.dir属性指定
6. flushCheckMs: 日志磁盘写入线程检查日志是否可以写入磁盘的间隔,默认是毫秒,由属性log.flush.scheduler.interval.ms指定。
7. flushCheckpointMs: Kafka标记上一次写入磁盘结束点为一个检查点用于日志恢复的间隔,由属性log.flush.offset.checkpoint.interval.ms指定,默认是1分钟,Kafka强烈建议不要修改此值。
8. retentionCheckMs: 检查日志段是否可以被删除的时间间隔,由属性log.retention.check.interval.ms指定,默认是5分钟
9. scheduler: 任务调度器,用于指定日志删除、写入、恢复等任务
10. brokerState: Kafka broker的状态类(kafka.server包中)。Broker的状态默认有未运行(Not Running),启动中(Starting),从上次未正常关闭恢复中(Recovering from unclean shutdown),作为Broker运行中(running as broker),作为Controller运行中(running as controller),挂起中(pending)以及关闭中(shutting down)。当然Kafka允许用于自定制状态。
11. time: 和很多类的构造函数参数一样,提供时间服务的变量
Kafka在恢复日志的时候是借助检查点文件来做的,因此每一个需要做日志恢复的路径下都需要有这么一个检查点文件,名字也固定叫"recovery-point-offset-checkpoint"。另外由于在做一些操作时需要将目录下的文件锁住,因此Kafka还创建了一个后缀是.lock的文件标识这个目录当前是被锁住的。
下面我们针对具体的方法一一分析:
1. createAndValidateLogDirs: 创建并验证给定日志路径的合法性,特别要保证不能出现重复路径,并且要创建那些不存在的路径,而且还要检查每个目录都是可读的
2. lockLogDirs: 在给定的所有路径下创建一个.lock文件,如果某个路径下已经有.lock文件说明Kafka的另一个进程或线程正在使用这个路径
3. loadLogs: 恢复并加载给定路径下的所有日志。具体做法是为每个路径创建一个线程池。为了向后兼容,该方法要在路径下寻找是否存在一个.kafka_cleanshutdown的文件,如果存在的话就跳过这个恢复阶段,否则的话就将broker的状态设置为恢复中状态,真正的恢复工作是由Log实例来完成的。然后读取对应路径下的recovery-point-offset-checkpoint文件读出要恢复检查点。前面说到了检查点文件的格式大概类似于以下的内容:
第一行必须是版本0,第二行数是topic/分区数,以下每行都是三个字段: topic partition offset,读完这个文件之后会创建一个TopicAndPartition => offset的map。
========================
0
9
log-topic 0 48
kafkatopic 1 0
abcd 1 0
abcd 0 0
autocreated 0 2
abc 0 0
accesslog_topic 0 0
kafkatopic 0 0
autocreated 1 1
========================
之后为每个目录下的子目录都构建一个Log实例然后使用线程池调度执行清理任务。之后删除这些任务对应的cleanShutdown文件。至此日志加载过程结束
4.startup: 开启后台线程做日志冲刷(flush)和日志清理。主要是使用调度器安排3个调度任务: cleanupLogs、flushDirtyLogs和checkpointRecoveryPointOffsets——自然地有3个对应的方法来实现对应的方法。同时判断一下是否启用了日志压缩,如果启用了调用cleaner的startup方法开启日志清理。
def startup() {
/* Schedule the cleanup task
to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
5. shutdown:
关闭所有日志。首先关闭所有清理者线程,然后为每个日志目录创建一个线程池执行目录下日志文件的写入磁盘与关闭操作同时更新外层文件中检查点文件的对应记录
6. logsByTopicPartition:
返回一个map保存TopicAndPartition => log的映射
7. allLogs:
返回所有topic所有分区的日志
8. logsByDir:
日志路径 => 路径下所有日志的映射
9. flushDirtyLogs:
将任何超过了写入间隔且有未写入消息的日志全部冲刷到磁盘上
10. checkpointLogsInDir:
在给定的路径中标记一个检查点
11. checkpointRecoveryPointOffsets:
将日志路径下所有日志的检查点写入到一个文本文件中(recovery-point-offset-checkpoint)
12. truncateTo:
截断分区日志到指定的offset并使用这个offset作为新的检查点(恢复点)。具体做法就是遍历给定的map集合,获取对应的分区的日志,如果要截断的offset比该日志当前正在使用的日志段的基础位移还小的话(也就是说要截断一部分当前日志段),就需要暂停清理者线程了。之后开始执行阶段操作,最后再恢复清理者线程。
13. tuncateFullyAndStartAt:
删除一个分区所有的数据并在新offset处开启日志。操作前后需分别需要暂停和恢复清理者线程
14. getLog:
返回某个分区的日志
15. createLog:
为给定分区创建一个新的日志。如果日志已经存在则返回
16. deleteLog:
删除一个日志
17. nextLogDir:
创建日志时选择下一个路径,目前的实现是计算每个路径下的分区数然后选择最少的那个。
18. cleanupExpiredSegments:
删除那些过期的日志段,就是当前时间减去最近修改时间超出了规定的那些日志段,并且返回被删除日志段的个数
19. cleanupSegmentsToMaintainSize:
如果没有设定log.retention.bytes,那么直接返回0,表示不需要清理任何日志段(这也是默认情况,因为log.retention.bytes默认是-1);否则计算出该属性值与日志大小的差值。如果这个差值弄够容纳某个日志段的大小,那么这个日志段就需要被删除。
20. cleanupLogs
: 删除所有满足条件的日志,返回被删除的日志数
2.2
Log
日志类,个人认为是这个包最重要的两个类之一(另一个是LogManager)。以伴生对象的方式提供。先说Log object,既然是object,就定义了一些类级别的变量,比如定义了一个日志文件的后缀名是.log; 索引文件的后缀名是.index; 要被删除的文件的后缀名是.deleted; 要被执行日志清理的临时文件后缀名是.cleaned; 在做swap过程中的临时文件后缀名是.swap。还有一个后缀名.kafka_cleanshutdown,这个在0.8.2版本的Kafka中已经不使用了。除了这些后缀名,这个object还定义了一些常用的方法:
1. filenamePrefixFromOffset:
其实就是为给定的offset前面补至20位生成日志段文件名用于文件名排序(使用ls命令)
2. logFilename:
使用给定的基础offset在给定的路径下生成对应的日志文件,比如位移是1,那么生成的文件名是0...(总共19个)1.log
3. indexFilename:
与logFilename类似,只不过生成的索引文件名是0...(共19个)1.index
4. parseTopicPartitionName:
将日志所在路径的名称解析成topic+分区封装到一个TopicPartition对象返回。比如路径名字是log-topic-0,那么topic名字就是log-topic,分区号就是0
说完了Log object,现在说说Log class了。这里要说的是日志是只能在尾部追加消息的,一个日志对象就是一组日志段(LogSegment)对象,每一个日志段都有一个基础offset标识该日志段中的第一条消息的位置。Kafka支持基于大小和时间的规则创建新的日志段。
Log
类有5个构造器参数,分别是:
1. dir:
日志段被创建在哪个目录下
2. config:
日志配置信息
3. recoveryPoint:
恢复的起始offset——即尚未被写入磁盘的第一个offset
4. scheduler:
用于后台操作的一个调度器线程池。主要用于异步地删除日志段和日志段切分时使用
5. time:
提供时间服务的对象实例
该类被标记为是线程安全的,因此一定需要一个锁对象来保护对日志的并发修改。另外还定义了一个字段专门保存该日志上次被写入磁盘的时间:lastflushedTime。前面说过一个日志对象是由多个日志段组成的,所以该类定义了一个一组日志段Map对象(key就是日志段的基础offset,value就是该日志段)表示该日志包含的所有日志段:segments。值得注意的是,这个map对象使用了java.concurrent包中的ConcurrentNavigableMap——该类提供了很多对于Map的便捷的导航方法。另外,Log类还定义了一个nextOffsetMetadata变量用于计算下一条消息的位移,主要计算的方法就是调用LogSegment的nextOffset方法。同时,根据给定的路径名,该类还定义了一个TopicAndPartition对象,把从路径名中提取出来的topic和分区信息保存起来表明这个日志是属于哪个topic,哪个分区的。最后类定义了4个度量元分别统计日志的起始位移、结束位移、日志大小以及日志段的数目。
okay
,分析完了所有类成员字段,我们对于类定义的方法进行逐一分析:
1. name:
返回日志路径名称
2. updateLogEndOffset:
使用给定的offset创建一个新的LogOffsetMetadata对象更新到nextOffsetMetadata变量
3. hasCleanShutdownFile:
判断是否存在clean shutown文件——这个功能在Kafka0.8.2以后就不再使用了,主要是为了后向兼容。
4. numberOfSegments:
日志的日志段的数目——主要调用Map.size方法实现,该方法时间复杂度是O(n),所以在日志段数目比较多时要慎用
5. close:
以同步的方式将日志段map中的所有日志段都关闭
6. size:
所有日志段字节数总和
7. logStartOffset:
日志段集合中第一个日志段的基础位移,也就是这个日志对象的基础位移
8. logEndOffsetMetadata:
下一条将要被加入到日志的消息的位移元数据,直接返回nextOffsetMetadata字段
9. logEndOffset:
下一条将要被加入到日志的消息的位移
10. unflushedMessage:
已加入日志但未写入磁盘的消息数
11. flush:
写入所有日志段中尚未持久化的消息
12. delete:
完全删除该日志文件以及目录,并清空日志段map
13. lastFlushTime:
返回最近一次写入磁盘的时间
14. activeSegment:
日志段map中的最后一个日志段表示当前活跃的日志段
15. logSegments
无参版: 返回按照offset递增排序的日志段集合
16. addSegment:
将给定的日志段对象加入到现有的日志段map中
17. asyncDeleteSegement:
异步删除给定的日志段对象,通常是请求发起1分钟之后开始执行
18. replaceSegments:
该方法只在启用了日志清理(log cleaning)时候才会被用到。主要逻辑就是将.cleaned文件换成.swap文件,然后再将.swap加入到现有的日志段map中。之后,遍历整个要删除的日志段集合,如果要删除的日志段的基础offset与要新加的日志段的基础位移不一样,说明该日志段的确要被删除,那么调用异步删除日志段的方法将其删除。遍历之后,再将那个新加的日志段的后缀名.swap去掉
19. deleteSegment:
异步删除日志段,在日志段map中将日志段记录去除,并且将对应索引文件改名为*.deleted
20. logSegments
带参版: 首先获取不大于from的最大位移,如果不存在这样的offset,那么直接返回小于to的所有位移对应的日志段; 否则返回[floor, to)范围内的日志段
21. truncateFullyAndStartAt:
删除日志所有数据,创建一个空的日志段并重设新的offset(设置起始offset和结束offset为newOffset),最后更新恢复位移点
22. truncateTo:
将日志截断使之保存的最大offset不会超过给定的targetOffset。当然,如果targetOffset就比现有日志的结束位移还要大自然什么都不做。另外在截断的过程中,还需要判断该log的最小位移(也就是第一个日志段的基础位移)如果比targetOffset大的话,那么直接调用truncateFullyAndStartAt方法删除所有日志数据并设置新的位移点,否则逐一删除那些起始位移比targetOffset大的日志段。此时activeSegment会自动变成当前删除之后最新的那个日志段,所以还要对activeSegment进行截断操作。这些做完之后更新下一条消息offset并重设恢复点位移
23. roll:
日志段的切分。使用当前结束位移作为新日志段的起始位移并把新日志段加入到日志段map中,然后发起一个异步调度任务将旧有日志段的数据同步到磁盘上,最后返回新创建的日志段
24. maybeRoll:
也是做日志段的切分,不过是有条件的: 1. 日志段已经满了; 2. 已过最大时间; 3. 索引文件已经满了。三个条件满足一个就会触发切分操作。
25. deleteOldSegments:
删除掉那些满足条件的日志段——所谓的条件无非就是满足大小或时间方面的要求,而这个函数返回的就是删除日志段的个数。具体做法就是筛选出那些同时满足给定条件并且不能是当前激活日志段或大小不为空的日志段,然后比较一下看看要删除的是否所有日志段,如果是的话直接调用roll方法进行切分,因为Kafka至少要保留一个日志段,如果否的话直接遍历该候选日志段集合,然后删除之。
26. convertToOffsetMetadata:
将给定的位移转化成对应的LogOffsetMetadat对象,这个方法主要用于副本管理使用
- 点赞
- 收藏
- 关注作者
评论(0)