kafka源码解析之三:Log模块读写源码分析——(三)

举报
步步清风 发表于 2017/11/21 09:43:02 2017/11/21
【摘要】 即MessageSet是MessageAndOffset类的集合。case class MessageAndOffset(message: Message, offset: Long)MessageAndOffset是一个case类,带有Message和offset这两个成员。从名字就知道是带ByteBuffer的MessageSet类,其构造函数类会调用create函数,里面就会创建一个Byte

即MessageSet是MessageAndOffset类的集合。

case class MessageAndOffset(message: Message, offset: Long)

MessageAndOffset是一个case类,带有Message和offset这两个成员。


从名字就知道是带ByteBuffer的MessageSet类,其构造函数类会调用create函数,里面就会创建一个ByteBuffer:

      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))

      for(message <- messages)

        writeMessage(buffer, message, offsetCounter.getAndIncrement)

      buffer.rewind()

      buffer


上面的writeMessage代码如下:

  private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {

    buffer.putLong(offset)

    buffer.putInt(message.size)

    buffer.put(message.buffer)

    message.buffer.rewind()

  }

从上面的函数我们看出buffer里是先写offset和message.size后面再写消息,这样我们就可以看出不压缩时消息的存储格式为:

ByteBufferMessageSet的消息格式:


MessageSet => [Offset MessageSize Message]

  Offset => int64

  MessageSize => int32


Message 的消息格式:


Message => Crc MagicByte Attributes Key Value

  Crc => int32

  MagicByte => int8

  Attributes => int8

  Key => bytes

  Value => bytes



4.    Log.append 日志流程

流程图如下:



image.png

这个函数把消息集合写入到真正的日志文件中,并返还LogAppendInfo:


def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {


    val appendInfo = analyzeAndValidateMessageSet(messages)

   

    // if we have any valid messages, append them to the log

    if(appendInfo.shallowCount == 0)

      return appendInfo

     

    //去掉一些不合法的字节,这些不合法的字节是通过检查CRC值来的

    // trim any invalid bytes or partial messages before appending it to the on-dilog

    var validMessages = trimInvalidBytes(messages, appendInfo)


    try {

      // they are valid, insert them in the log

      lock synchronized {

        // nextOffsetMetadata是一个LogOffsetMetadata,通过updateLogEndOffset函数每次更新messageOffset字段,就能得到当前日志的lastOffset。下一次写从这个offset查找就能知道下一次写入的offset是什么

        appendInfo.firstOffset = nextOffsetMetadata.messageOffset


        if(assignOffsets) {

          // assign offsets to the message set

          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)

          try {

              //validMessagesByteBufferMessageSet)消息组里面的每个消息的第一个字段offset进行赋值,这样每条写到日志里面的消息头就有offset

              validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

          } catch {

            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)

          }

          // offsetvalidMessages.assignOffsets中每遇到一个消息会自增,所以lastoffset 就是offset值减一

          appendInfo.lastOffset = offset.get - 1

        } else {

          // we are taking the offsets we are given

          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)

            throw new IllegalArgumentException("Out of order offsets found in " + messages)

        }

        //assignOffsets里会重新压缩,需要检查消息长度是否过长

        // re-validate message sizes since after re-compression some may exceed the limit

        for(messageAndOffset <- validMessages.shallowIterator) {

          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {

            // we record the original message set size instead of trimmed size

            // to be consistent with pre-compression bytesRejectedRate recording

            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)

            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)

            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."

              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))

          }

        }

       

        // check messages set size may be exceed config.segmentSize

        if(validMessages.sizeInBytes > config.segmentSize) {

          throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."

            .format(validMessages.sizeInBytes, config.segmentSize))

        }


        //如果当前的消息添加后超过active segments的文件长度,则创建一个新的日志文件再添加。

        // maybe roll the log if this segment is full

        val segment = maybeRoll(validMessages.sizeInBytes)

      

        //把消息追加到active segment中,如果字节数足够,就调用OffsetIndex.append添加索引

        // now append to the log

        segment.append(appendInfo.firstOffset, validMessages)


       //更新nextOffsetMetadata变量到最新的offset

       // increment the log end offset

        updateLogEndOffset(appendInfo.lastOffset + 1)


        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"

                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))


        if(unflushedMessages >= config.flushInterval)

          flush()


        appendInfo

      }

    } catch {

      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)

    }

  }

4.1     maybeRoll

我们再来看看maybeRoll

private def maybeRoll(messagesSize: Int): LogSegment = {

    val segment = activeSegment

    if (segment.size > config.segmentSize - messagesSize ||

        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||

        segment.index.isFull) {

      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."

            .format(name,

                    segment.size,

                    config.segmentSize,

                    segment.index.entries,

                    segment.index.maxEntries,

                    time.milliseconds - segment.created,

                    config.segmentMs - segment.rollJitterMs))

      roll()

    } else {

      segment

    }

  }

如果当前的消息添加后超过active segments的文件长度或者segment创建时间太久就会切文件,否则直接返回active segment。

Roll代码和注释如下:

/**

   * Roll the log over to a new active segment starting with the current logEndOffset.

   * This will trim the index to the exact size of the number of entries it currently contains.

   * @return The newly rolled segment

   */

  def roll(): LogSegment = {

    val start = time.nanoseconds

    lock synchronized {

      val newOffset = logEndOffset

      val logFile = logFilename(dir, newOffset)

      val indexFile = indexFilename(dir, newOffset)

      //如果文件存在,则先删除

      for(file <- List(logFile, indexFile); if file.exists) {

        warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")

        file.delete()

      }

   

      segments.lastEntry() match {

        case null =>

        case entry => entry.getValue.index.trimToValidSize()

      }

      //生成一个新的LogSegment

      val segment = new LogSegment(dir,

                                   startOffset = newOffset,

                                   indexIntervalBytes = config.indexInterval,

                                   maxIndexSize = config.maxIndexSize,

                                   rollJitterMs = config.randomSegmentJitter,

                                   time = time)

      //添加到segments列表中

      val prev = addSegment(segment)

      if(prev != null)

        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))

   

      //调度一个异步刷盘操作

      // schedule an asynchronous flush of the old segment

      scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

     

      info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))

     

      segment

    }

  }


我们再来看异步刷屏flush到底做了啥:

/**

   * Flush log segments for all offsets up to offset-1

   * @param offset The offset to flush up to (non-inclusive); the new recovery point

   */

  def flush(offset: Long) : Unit = {

    if (offset <= this.recoveryPoint)

      return

    debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +

          time.milliseconds + " unflushed = " + unflushedMessages)

    for(segment <- logSegments(this.recoveryPoint, offset))

      segment.flush()

    lock synchronized {

      if(offset > this.recoveryPoint) {

        this.recoveryPoint = offset

        lastflushedTime.set(time.milliseconds)

      }

    }

  }

这个函数主要做的事情就是读取从recveryPoint到offset之间日志段的刷屏,而Segment.flush 最后会分别调用FileMessageSet和OffsetIndex的flush函数刷盘。 最后刷完盘后会更新recoveryPoint到offset。

4.2     Segment.append

这个函数是LogSegment提供的append,作用是将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护

/**

   * Append the given messages starting with the given offset. Add

   * an entry to the index if needed.

   *

   * It is assumed this method is being called from within a lock.

   *

   * @param offset The first offset in the message set.

   * @param messages The messages to append.

   */

  @nonthreadsafe

  def append(offset: Long, messages: ByteBufferMessageSet) {

    if (messages.sizeInBytes > 0) {

      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))

      // append an entry to the index (if needed)

      //如果自上次写入index到现在之间写入log日志的字节大于配置的indexIntervalBytes,则往索引文件总写入当前offset

      if(bytesSinceLastIndexEntry > indexIntervalBytes) {

        index.append(offset, log.sizeInBytes())

        this.bytesSinceLastIndexEntry = 0

      }

      //调用FileMessageSet.append,把消息写入到channel

      // append the messages

      log.append(messages)

      this.bytesSinceLastIndexEntry += messages.sizeInBytes

    }

  }


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。