海量小文件处理方式——合并算法介绍
大数据分析平台以下面的格式存储设备文件:/var/work/${file type}/year/month/day,形象的图如下:
比如/var/work/pcap/2018/10/02/file1,file2......
大数据分析平台小文件合并算法是基于上述目录分层的,算法输入是上面的某个目录,比如/var/work/pcap/2018/10/02,算法输出是一个或者多个包含数据和index文件的子目录。
一、主要输入参数
1)输入目录 Input directory
包含很多文件类型相似的文件的目录。
2)阈值 Threshold
HDFS块大小的阈值,范围(0,1],默认值0.8,意味着HDFS块大小的80%。如果文件大小超过HDFS块大小80%则需要分开存储。
3)最大块数量 maxBlockNumber
每一个输出文件包含的HDFS块最大个数,需要为正整数。
包含很多小文件的目录经过合并算法之后会被分成多个子目录,子目录包含一个数据文件和一个索引文件。默认一个子目录包含的文件个数是10个,100个小文件的话,会被分成10个子目录存储,如果是101个小文件,会被分成11个子目录存储。
4)压缩格式
常见的压缩格式,比如deflate、snappy、bzip2,如果压缩个数输出为null,则表示不需要压缩。
5)索引间隔 Indexinterval
为写入数据文件的每个' indexInterval '文件记录存储一个索引键。后面看算法代码大家可以继续理解这个参数的意思。
二、合并算法描述
下面是合并算法的伪代码:
(一)
hdfsBlockSize = configured HDFS block size
maxEligibleSize = hdfsBlockSize * threshold
mergableBytes = 0
mergeableFileList = new ArrayList<String> ()
(二)
for (currentFile: files in input directory) {
fileLength = currentFile.length()
if (fileLength <= maxEligibleSize) {
mergableBytes = mergableBytes + fileLength
mergeableFileList.add(currentFile)
}
}
(三)
//块个数
numBlocks = mergableBytes / hdfsBlockSize
if (numBlocks * hdfsBlockSize != mergableBytes) {
numBlocks = numBlocks + 1
}
//子目录个数
mergedFileNum = numBlocks / maxBlockNumber
If (mergedFileNum * maxBlockNumber !=
numBlocks){
mergedFileNum = mergedFileNum + 1
}
(四)
mergeableFileList.sort() by file size in ascending order
(五)
fileLists = new ArrayList<List<String>>(mergedFileNum);
for (i=0; i< mergedFileNum; i++) {
count = i
while (count < mergeableFileList.size()) {
currentFile = mergeableFileList.get(count)
fileLists.get(i).add(currentFile)
count = count + mergedFileNum
}
}
(六)
for (currentList: file Lists) {
currentList.sort() by file name in ascending order
subDir = new File(“inputDirectory/subDir”).mkdirs()
data = new File((“inputDirectory/subDir/data”)
index = new File((“inputDirectory/subDir/index”)
writtenFileRecord = 0
for (currentFile: currentList) {
data.append(currentFile)
writtenFileRecord++
If (writtenFileRecord %% indexInterval == 0) {
fileSize = data.sync()
index.append(currentFile.getName(),fileSize)
}
}
}
for (currentFile: mergeableFileList)
currentFile.delete()
我们一起来走读下上述伪代码吧。上面我们根据颜色和标号将代码分成了六大部分。
第一段代码,就做了算法参数初始化的事。hdfsBlockSize代表HDFS块大小,mergableBytes代表一个目录下是和的合并文件大小,maxEligibleSize代表目录下最大合并文件大小。
第二段代码,计算了mergableBytes大小,并且将文件加入合并列表。
第三段代码,获取小文件合并成大文件的个数
第四段代码,按照文件大小升序将mergableFileList排序。
第五段代码,将文件列表分配成不同的文件列表,比如将file0,file3,file6,file9分配成子列0中,将file1,file4.file7,file10分配成子列1中。以此类推。
第六段代码,对于下面的子列:
- 按照文件名称升序排列;
- 创建包含一个数据文件和一个索引文件的子目录。
-------------------------------------------------------------------
算法就是上面的了,下期我们尝试画个结合HDFS架构图来理解下
- 点赞
- 收藏
- 关注作者
评论(0)