hdfs源码解析之客户端写数据(二)
接着hdfs源码解析之客户端写数据(一)继续。
fs.write(a.getBytes());
上面write方法郑振调用的是FSOutputSummer.write,FSOutputSummer维护了一个本地缓冲区buf,大小初始为9*chunkSize,append文件时初始化方法不同。循环写buf.length字节数据,buf满了就开始调用writeChecksumChunks写packet。
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
//循环写buf.length字节数据,buf满了就开始写packet
for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}
}
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
writeChecksumChunks(b, off, length);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
创建文件时,是每次写getBytesPerChecksum,刚好一个chunk的大小,追加文件时第一次写文件最后一个block的最后一个chunk空的部分,这样就可以组成一个完整的chunk,后面就按照create文件一样每次写chunk大小。所以每次写的大小是根据create还是append区别的。
//创建文件时,是每次写getBytesPerChecksum,刚好一个chunk的大小,追加文件时第一次写文件最后一个block的最后一个chunk空的部分,这样就可以组成一个完整的chunk,后面就按照create文件一样每次写chunk大小。所以每次写的大小是根据create还是append区别的。
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
最核心的方法是writeChunk()
/**
* cklen :校验和大小
* 写数据到packet,每次只写一个chunk大小的数据
*/
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();//检查DFSClient对象的状态
checkClosed();//检查DFSOutputStream对象的状态
//输出的数据比一个校验块(chunk)还大
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
//要写入的校验和大小与给定的大小不一致
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}
//当前要写入的packet为空,则新建
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}
//开始写数据,先写校验和checksum,再写chunkdata
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
//chunk个数自增
currentPacket.numChunks++;
//block中的偏移量
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//packet的chunk个数等于packet设置的最大chunk个数,则packet满了,就开始传输,如果bytesCurBlock大于blockSize呢?
//如何处理
//已解决:通过computePacketChunkSize我们知道,
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.seqno +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
//当前packet放入到队列,等待消费
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
//如果重新打开的文件没有在chunk块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的crc块。
//block中刚好存储整数个完整的chunk块,如果分配的block中已经存在数据
//通过对文件进行追加操作,然后逐步调试,终于明白了appendChunk的含义,在对已经存在的文件进行append操作时,会构建DFSOutputStream对象,而这个对象的初始化和新建
//文件时的方法是不同的。append操作的对象初始化会从namenode把文件最后一个block(block存在一个list中)的信息拿到,然后把这个block的信息初始化给DFSOutputStream
//本地缓冲区buf就是blockSize-bytesCurBlock,且当前packet的chunksize=blockSize-bytesCurBlock
//如果是追加数据,且追加后构成一个完整的chunk块,那么就需要把之前指定的buf重置成正常值
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
// 为何这个操作?buf置空
//使用指定大小的新缓冲区重置现有缓冲区。
resetChecksumBufSize();
}
if (!appendChunk) {
//计算下一个packet的大小,保证写入时不会超过blocksize
/* 就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,
以保证新的Packet大小不会超过Block的剩余大小
如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:
blockSize-bytesCurBlock,也就是3kb*/
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
//如果block满了,发送空包,重置变量
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}
如果重新打开的文件没有在chunk块边界结束,并且上面的写入填满了它的部分块。 告诉夏天从现在开始生成完整的crc块。block中刚好存储整数个完整的chunk块,如果分配的block中已经存在数据。通过对文件进行追加操作,然后逐步调试,终于明白了appendChunk的含义,在对已经存在的文件进行append操作时,会构建DFSOutputStream对象,而这个对象的初始化和新建文件时的方法是不同的。append操作的对象初始化会从namenode把文件最后一个block(block存在一个list中)的信息拿到,然后把这个block的信息初始化给DFSOutputStream。本地缓冲区buf就是blockSize-bytesCurBlock,且当前packet的chunksize=blockSize-bytesCurBlock
。如果是追加数据,且追加后构成一个完整的chunk块,那么就需要把之前指定的buf重置成正常值。
- 点赞
- 收藏
- 关注作者
评论(0)