hdfs源码解析之客户端写数据(二)

举报
dayu_dls 发表于 2021/04/26 16:20:48 2021/04/26
【摘要】 接着hdfs源码解析之客户端写数据(一)继续。fs.write(a.getBytes());上面write方法郑振调用的是FSOutputSummer.write,FSOutputSummer维护了一个本地缓冲区buf,大小初始为9*chunkSize,append文件时初始化方法不同。循环写buf.length字节数据,buf满了就开始调用writeChecksumChunks写packe...

接着hdfs源码解析之客户端写数据(一)继续。

fs.write(a.getBytes());

上面write方法郑振调用的是FSOutputSummer.write,FSOutputSummer维护了一个本地缓冲区buf,大小初始为9*chunkSize,append文件时初始化方法不同。循环写buf.length字节数据,buf满了就开始调用writeChecksumChunks写packet。

@Override
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
//循环写buf.length字节数据,buf满了就开始写packet
    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
    }
  }
 
private int write1(byte b[], int off, int len) throws IOException {
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      final int length = buf.length;
      writeChecksumChunks(b, off, length);
      return length;
    }
    
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }


创建文件时,是每次写getBytesPerChecksum,刚好一个chunk的大小,追加文件时第一次写文件最后一个block的最后一个chunk空的部分,这样就可以组成一个完整的chunk,后面就按照create文件一样每次写chunk大小。所以每次写的大小是根据create还是append区别的。 

//创建文件时,是每次写getBytesPerChecksum,刚好一个chunk的大小,追加文件时第一次写文件最后一个block的最后一个chunk空的部分,这样就可以组成一个完整的chunk,后面就按照create文件一样每次写chunk大小。所以每次写的大小是根据create还是append区别的。

private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }


最核心的方法是writeChunk() 

 /**
   * cklen :校验和大小
   * 写数据到packet,每次只写一个chunk大小的数据
   */
  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();//检查DFSClient对象的状态
    checkClosed();//检查DFSOutputStream对象的状态
//输出的数据比一个校验块(chunk)还大
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    //要写入的校验和大小与给定的大小不一致
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }
//当前要写入的packet为空,则新建
    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
//开始写数据,先写校验和checksum,再写chunkdata
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    //chunk个数自增
    currentPacket.numChunks++;
    //block中的偏移量
    bytesCurBlock += len;
 
    // If packet is full, enqueue it for transmission
    //packet的chunk个数等于packet设置的最大chunk个数,则packet满了,就开始传输,如果bytesCurBlock大于blockSize呢?
    //如何处理
    //已解决:通过computePacketChunkSize我们知道,
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      //当前packet放入到队列,等待消费
      waitAndQueueCurrentPacket();
 
      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      //如果重新打开的文件没有在chunk块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的crc块。
      //block中刚好存储整数个完整的chunk块,如果分配的block中已经存在数据
      //通过对文件进行追加操作,然后逐步调试,终于明白了appendChunk的含义,在对已经存在的文件进行append操作时,会构建DFSOutputStream对象,而这个对象的初始化和新建
      //文件时的方法是不同的。append操作的对象初始化会从namenode把文件最后一个block(block存在一个list中)的信息拿到,然后把这个block的信息初始化给DFSOutputStream
      //本地缓冲区buf就是blockSize-bytesCurBlock,且当前packet的chunksize=blockSize-bytesCurBlock
      //如果是追加数据,且追加后构成一个完整的chunk块,那么就需要把之前指定的buf重置成正常值
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        // 为何这个操作?buf置空
        //使用指定大小的新缓冲区重置现有缓冲区。
        resetChecksumBufSize();
      }
 
      if (!appendChunk) {
          //计算下一个packet的大小,保证写入时不会超过blocksize
        /*  就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,
          以保证新的Packet大小不会超过Block的剩余大小
          如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:
          blockSize-bytesCurBlock,也就是3kb*/
 
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      //如果block满了,发送空包,重置变量
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

如果重新打开的文件没有在chunk块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的crc块。block中刚好存储整数个完整的chunk块,如果分配的block中已经存在数据。通过对文件进行追加操作,然后逐步调试,终于明白了appendChunk的含义,在对已经存在的文件进行append操作时,会构建DFSOutputStream对象,而这个对象的初始化和新建文件时的方法是不同的。append操作的对象初始化会从namenode把文件最后一个block(block存在一个list中)的信息拿到,然后把这个block的信息初始化给DFSOutputStream。本地缓冲区buf就是blockSize-bytesCurBlock,且当前packet的chunksize=blockSize-bytesCurBlock
。如果是追加数据,且追加后构成一个完整的chunk块,那么就需要把之前指定的buf重置成正常值。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。