【摘要】 FileMessageSet.append的代码比较简单,直接写到起FileChannel中 /** * Append these messages to the message set */ def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messag



   * Append these messages to the message set


  def append(messages: ByteBufferMessageSet) {

    val written = messages.writeTo(channel, 0, messages.sizeInBytes)





   * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.


  def append(offset: Long, position: Int) {

    inLock(lock) {

      require(!isFull, "Attempt to append to a full index (size = " + size + ").")

      if (size.get == 0 || offset > lastOffset) {

        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))

        this.mmap.putInt((offset - baseOffset).toInt)



        this.lastOffset = offset

        require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")

      } else {

        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."

          .format(offset, entries, lastOffset, file.getAbsolutePath))





5.    Log.read日志流程



这个函数通过指定开始读的startOffset和最大读长度等参数,返回FetchDataInfo信息。原理很简单,就是从保存的segments map中找到baseOffset与startOffset最接近的segment,开始查找和读取数据。


   * Read messages from the log


   * @param startOffset The offset to begin reading at

   * @param maxLength The maximum number of bytes to read

   * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).


   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.

   * @return The fetch data information including fetch starting offset metadata and messages read


  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {

    trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

    // check if the offset is valid and in range


    val next = nextOffsetMetadata.messageOffset

    if(startOffset == next)

      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)



    var entry = segments.floorEntry(startOffset)


    // attempt to read beyond the log end offset is an error

    if(startOffset > next || entry == null)

      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))


    // do the read on the segment with a base offset less than the target offset

    // but if that segment doesn't contain any messages with an offset greater than that

    // continue to read from successive segments until we get some messages or we reach the end of the log

    while(entry != null) {


      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)


      if(fetchInfo == null) {

        entry = segments.higherEntry(entry.getKey)

      } else {

        return fetchInfo




    // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,

    // this can happen when all messages with offset larger than start offsets have been deleted.

    // In this case, we will return the empty set with log end offset metadata

    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)


5.1   LogSegment.read


   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include

   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.


   * @param startOffset A lower bound on the first offset to include in the message set we read

   * @param maxSize The maximum number of bytes to include in the message set we read

   * @param maxOffset An optional maximum offset for the message set we read


   * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,

   *         or null if the startOffset is larger than the largest offset in this log



  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {

    if(maxSize < 0)

      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))


    val logSize = log.sizeInBytes // this may change, need to save a consistent copy


    val startPosition = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null

    if(startPosition == null)

      return null


    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

    // if the size is zero, still return a log segment but with zero size

    if(maxSize == 0)

      return FetchDataInfo(offsetMetadata, MessageSet.Empty)


    // calculate the length of the message set to read based on whether or not they gave us a maxOffset

    val length =

      maxOffset match {

        case None =>

          // no max offset, just use the max size they gave unmolested


        case Some(offset) => {

          // there is a max offset, translate it to a file position and use that to calculate the max read size

          if(offset < startOffset)

            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))

          //再一次把max offset转为物理地址,然后取maxSizemaxOffsetstartOffset之间的小值作为读取长度

          val mapping = translateOffset(offset, startPosition.position)

          val endPosition =

            if(mapping == null)

              logSize // the max offset is off the end of the log, use the end of the file



          min(endPosition - startPosition.position, maxSize)




    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))




   * Find the physical file position for the first message with offset >= the requested offset.


   * The lowerBound argument is an optimization that can be used if we already know a valid starting position

   * in the file higher than the greatest-lower-bound from the index.


   * @param offset The offset we want to translate

   * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and

   * when omitted, the search will begin at the position in the offset index.


   * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.



  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {

    //index buffer中查找最大的小于等于offsetOffsetPosition关系

    val mapping = index.lookup(offset)


    log.searchFor(offset, max(mapping.position, startingFilePosition))



def lookup(targetOffset: Long): OffsetPosition = {

    maybeLock(lock) {


      val idx = mmap.duplicate

      //用二分法在index buffer中找到最大的小于等于targetOffset的位置。

      val slot = indexSlotFor(idx, targetOffset)


      if(slot == -1)

        OffsetPosition(baseOffset, 0)


        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))




  /* return the nth offset relative to the base offset */

  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)


  /* return the nth physical position */

  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)

再看FileMessageSet.searchFor: 主要做的事情是从startingPosition开始,按顺序找到第一个offset大于等于目标offset的消息位置,并返回OffsetPosition类。


   * Search forward for the file position of the last offset that is greater than or equal to the target offset

   * and return its physical position. If no such offsets are found, return null.

   * @param targetOffset The offset to search for.

   * @param startingPosition The starting position in the file to begin searching from.


  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {

    var position = startingPosition

    //分配一个LogOverHead大小的buffer,用于读取消息头,LogOverHead大小为12 byte(MessageSize 4, OffsetLength 8)

    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)

    val size = sizeInBytes()

    while(position + MessageSet.LogOverhead < size) {


      channel.read(buffer, position)



        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"

                                        .format(targetOffset, startingPosition, file.getAbsolutePath))


      val offset = buffer.getLong()


      if(offset >= targetOffset)

        return OffsetPosition(offset, position)

      val messageSize = buffer.getInt()

      if(messageSize < Message.MessageOverhead)

        throw new IllegalStateException("Invalid message size: " + messageSize)


      position += MessageSet.LogOverhead + messageSize





