kafka源码解析之三:Log模块读写源码分析——(四)
FileMessageSet.append的代码比较简单,直接写到起FileChannel中
/**
* Append these messages to the message set
*/
def append(messages: ByteBufferMessageSet) {
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
_size.getAndAdd(written)
}
OffsetIndex.append代码如下:
/**
* Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
*/
def append(offset: Long, position: Int) {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + size + ").")
if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
this.mmap.putInt((offset - baseOffset).toInt)
this.mmap.putInt(position)
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, entries, lastOffset, file.getAbsolutePath))
}
}
}
代码里面就比较明显了,按照章节2中OffsetIndex的写入描述,先写4字节的offset-baseOffset,然后再写入日志在log文件中的位置
5. Log.read日志流程
流程图如下:
这个函数通过指定开始读的startOffset和最大读长度等参数,返回FetchDataInfo信息。原理很简单,就是从保存的segments map中找到baseOffset与startOffset最接近的segment,开始查找和读取数据。
/**
* Read messages from the log
*
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
* @return The fetch data information including fetch starting offset metadata and messages read
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// check if the offset is valid and in range
//检查一下startoffset是否就是nextOffset,如果是表明日志还不存在则返回空消息集。
val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
//返回一个Map.Entry(baseOffset,LogSegment),其baseOffset是最大的小于等于startOffset的。
var entry = segments.floorEntry(startOffset)
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
// do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
//调用LogSegment.read获得fetchInfo
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
//如果为空,则找其连续的下一个Logsegment去读取
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
}
5.1 LogSegment.read
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
//获得改logsegment的size
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
//把逻辑offset转为实际物理地址,并返回一个OffsetPosition,其offset是大于等于startOffset的。后面会对这个函数详细介绍
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if(startPosition == null)
return null
//生成一个LogOffsetMetadata对象,包含逻辑offset和物理地址等信息
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
//因为这个接口可以指定最大读取到的maxOffset,下面会计算实际最大能读取的长度
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
//再一次把max offset转为物理地址,然后取maxSize与maxOffset到startOffset之间的小值作为读取长度
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
//返回fetchDataInfo对象
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
代码中一个主要函数是translateOffset把逻辑地址转为实际物理地址:
/**
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
*
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
//在index buffer中查找最大的小于等于offset的OffsetPosition关系
val mapping = index.lookup(offset)
//根据OffsetPosition得到的物理地址,在log日志查找更接近的OffsetPosition(大于等于目标offset的位置)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
Index.lookup代码:
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
//把byteBuffer复制一份,复制的副本和mmap之间是共享内容的,新缓冲区的position,limit,mark和capacity都初始化为原始缓冲区的索引值,然而,它们的这些值是相互独立的
val idx = mmap.duplicate
//用二分法在index buffer中找到最大的小于等于targetOffset的位置。
val slot = indexSlotFor(idx, targetOffset)
//找到索引在那个位置后,返回OffsetPosistion类
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
}
其中relativeOffset与physical函数,就是根据Bytebuffer中的位置slot,返回相对逻辑offset与物理地址:
/* return the nth offset relative to the base offset */
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
/* return the nth physical position */
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
再看FileMessageSet.searchFor: 主要做的事情是从startingPosition开始,按顺序找到第一个offset大于等于目标offset的消息位置,并返回OffsetPosition类。
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
//分配一个LogOverHead大小的buffer,用于读取消息头,LogOverHead大小为12 byte(MessageSize 4, OffsetLength 8)
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
//如果消息头都读不出来,抛出异常
if(buffer.hasRemaining)
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
.format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()
val offset = buffer.getLong()
//返回大于等于targetOffset的OffsetPosition
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
if(messageSize < Message.MessageOverhead)
throw new IllegalStateException("Invalid message size: " + messageSize)
//position移位到下一个消息,具体消息布局请看上面章节。
position += MessageSet.LogOverhead + messageSize
}
null
}
至此,Log模块的基本读写函数已经分析完毕
- 点赞
- 收藏
- 关注作者
评论(0)