kafka源码解析之三:Log模块读写源码分析——(四)

举报
步步清风 发表于 2017/11/21 11:18:39 2017/11/21
【摘要】 FileMessageSet.append的代码比较简单,直接写到起FileChannel中 /** * Append these messages to the message set */ def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messag

FileMessageSet.append的代码比较简单,直接写到起FileChannel中

  /**

   * Append these messages to the message set

   */

  def append(messages: ByteBufferMessageSet) {

    val written = messages.writeTo(channel, 0, messages.sizeInBytes)

    _size.getAndAdd(written)

  }


OffsetIndex.append代码如下:

/**

   * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.

   */

  def append(offset: Long, position: Int) {

    inLock(lock) {

      require(!isFull, "Attempt to append to a full index (size = " + size + ").")

      if (size.get == 0 || offset > lastOffset) {

        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))

        this.mmap.putInt((offset - baseOffset).toInt)

        this.mmap.putInt(position)

        this.size.incrementAndGet()

        this.lastOffset = offset

        require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")

      } else {

        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."

          .format(offset, entries, lastOffset, file.getAbsolutePath))

      }

    }

  }

代码里面就比较明显了,按照章节2中OffsetIndex的写入描述,先写4字节的offset-baseOffset,然后再写入日志在log文件中的位置

5.    Log.read日志流程

流程图如下:

image.png

这个函数通过指定开始读的startOffset和最大读长度等参数,返回FetchDataInfo信息。原理很简单,就是从保存的segments map中找到baseOffset与startOffset最接近的segment,开始查找和读取数据。

/**

   * Read messages from the log

   *

   * @param startOffset The offset to begin reading at

   * @param maxLength The maximum number of bytes to read

   * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).

   *

   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.

   * @return The fetch data information including fetch starting offset metadata and messages read

   */

  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {

    trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))


    // check if the offset is valid and in range

    //检查一下startoffset是否就是nextOffset,如果是表明日志还不存在则返回空消息集。

    val next = nextOffsetMetadata.messageOffset

    if(startOffset == next)

      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

  

    //返回一个Map.EntrybaseOffset,LogSegment),其baseOffset是最大的小于等于startOffset的。

    var entry = segments.floorEntry(startOffset)

     

    // attempt to read beyond the log end offset is an error

    if(startOffset > next || entry == null)

      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))

   

    // do the read on the segment with a base offset less than the target offset

    // but if that segment doesn't contain any messages with an offset greater than that

    // continue to read from successive segments until we get some messages or we reach the end of the log

    while(entry != null) {

      //调用LogSegment.read获得fetchInfo

      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)

      //如果为空,则找其连续的下一个Logsegment去读取

      if(fetchInfo == null) {

        entry = segments.higherEntry(entry.getKey)

      } else {

        return fetchInfo

      }

    }

   

    // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,

    // this can happen when all messages with offset larger than start offsets have been deleted.

    // In this case, we will return the empty set with log end offset metadata

    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)

  }


5.1   LogSegment.read

/**

   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include

   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.

   *

   * @param startOffset A lower bound on the first offset to include in the message set we read

   * @param maxSize The maximum number of bytes to include in the message set we read

   * @param maxOffset An optional maximum offset for the message set we read

   *

   * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,

   *         or null if the startOffset is larger than the largest offset in this log

   */

  @threadsafe

  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {

    if(maxSize < 0)

      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

    //获得改logsegment的size

    val logSize = log.sizeInBytes // this may change, need to save a consistent copy

    //把逻辑offset转为实际物理地址,并返回一个OffsetPosition,其offset是大于等于startOffset的。后面会对这个函数详细介绍

    val startPosition = translateOffset(startOffset)


    // if the start position is already off the end of the log, return null

    if(startPosition == null)

      return null

    //生成一个LogOffsetMetadata对象,包含逻辑offset和物理地址等信息

    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)


    // if the size is zero, still return a log segment but with zero size

    if(maxSize == 0)

      return FetchDataInfo(offsetMetadata, MessageSet.Empty)


    //因为这个接口可以指定最大读取到的maxOffset,下面会计算实际最大能读取的长度

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset

    val length =

      maxOffset match {

        case None =>

          // no max offset, just use the max size they gave unmolested

          maxSize

        case Some(offset) => {

          // there is a max offset, translate it to a file position and use that to calculate the max read size

          if(offset < startOffset)

            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))

          //再一次把max offset转为物理地址,然后取maxSizemaxOffsetstartOffset之间的小值作为读取长度

          val mapping = translateOffset(offset, startPosition.position)

          val endPosition =

            if(mapping == null)

              logSize // the max offset is off the end of the log, use the end of the file

            else

              mapping.position

          min(endPosition - startPosition.position, maxSize)

        }

      }

    //返回fetchDataInfo对象

    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

  }


代码中一个主要函数是translateOffset把逻辑地址转为实际物理地址:

/**

   * Find the physical file position for the first message with offset >= the requested offset.

   *

   * The lowerBound argument is an optimization that can be used if we already know a valid starting position

   * in the file higher than the greatest-lower-bound from the index.

   *

   * @param offset The offset we want to translate

   * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and

   * when omitted, the search will begin at the position in the offset index.

   *

   * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.

   */

  @threadsafe

  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {

    //index buffer中查找最大的小于等于offsetOffsetPosition关系

    val mapping = index.lookup(offset)

    //根据OffsetPosition得到的物理地址,在log日志查找更接近的OffsetPosition(大于等于目标offset的位置)

    log.searchFor(offset, max(mapping.position, startingFilePosition))

  }


Index.lookup代码:

def lookup(targetOffset: Long): OffsetPosition = {

    maybeLock(lock) {

     //byteBuffer复制一份,复制的副本和mmap之间是共享内容的,新缓冲区的positionlimitmarkcapacity都初始化为原始缓冲区的索引值,然而,它们的这些值是相互独立的

      val idx = mmap.duplicate

      //用二分法在index buffer中找到最大的小于等于targetOffset的位置。

      val slot = indexSlotFor(idx, targetOffset)

      //找到索引在那个位置后,返回OffsetPosistion

      if(slot == -1)

        OffsetPosition(baseOffset, 0)

      else

        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))

      }

  }

其中relativeOffset与physical函数,就是根据Bytebuffer中的位置slot,返回相对逻辑offset与物理地址:

  /* return the nth offset relative to the base offset */

  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)

 

  /* return the nth physical position */

  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)


再看FileMessageSet.searchFor: 主要做的事情是从startingPosition开始,按顺序找到第一个offset大于等于目标offset的消息位置,并返回OffsetPosition类。

/**

   * Search forward for the file position of the last offset that is greater than or equal to the target offset

   * and return its physical position. If no such offsets are found, return null.

   * @param targetOffset The offset to search for.

   * @param startingPosition The starting position in the file to begin searching from.

   */

  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {

    var position = startingPosition

    //分配一个LogOverHead大小的buffer,用于读取消息头,LogOverHead大小为12 byte(MessageSize 4, OffsetLength 8)

    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)

    val size = sizeInBytes()

    while(position + MessageSet.LogOverhead < size) {

      buffer.rewind()

      channel.read(buffer, position)

      //如果消息头都读不出来,抛出异常

      if(buffer.hasRemaining)

        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"

                                        .format(targetOffset, startingPosition, file.getAbsolutePath))

      buffer.rewind()

      val offset = buffer.getLong()

      //返回大于等于targetOffsetOffsetPosition

      if(offset >= targetOffset)

        return OffsetPosition(offset, position)

      val messageSize = buffer.getInt()

      if(messageSize < Message.MessageOverhead)

        throw new IllegalStateException("Invalid message size: " + messageSize)

      //position移位到下一个消息,具体消息布局请看上面章节。

      position += MessageSet.LogOverhead + messageSize

    }

    null

  }


至此,Log模块的基本读写函数已经分析完毕


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。