海量小文件处理方式——合并算法介绍

举报
敏敏君主 发表于 2021/01/21 19:58:09 2021/01/21
【摘要】       大数据分析平台以下面的格式存储设备文件:/var/work/${file type}/year/month/day,形象的图如下:比如/var/work/pcap/2018/10/02/file1,file2......      大数据分析平台小文件合并算法是基于上述目录分层的,算法输入是上面的某个目录,比如/var/work/pcap/2018/10/02,算法输出是一个或者...

      大数据分析平台以下面的格式存储设备文件:/var/work/${file type}/year/month/day,形象的图如下:

比如/var/work/pcap/2018/10/02/file1,file2......

      大数据分析平台小文件合并算法是基于上述目录分层的,算法输入是上面的某个目录,比如/var/work/pcap/2018/10/02,算法输出是一个或者多个包含数据和index文件的子目录。

一、主要输入参数

1)输入目录 Input directory

      包含很多文件类型相似的文件的目录。

2)阈值 Threshold

      HDFS块大小的阈值,范围(0,1],默认值0.8,意味着HDFS块大小的80%。如果文件大小超过HDFS块大小80%则需要分开存储。

3)最大块数量 maxBlockNumber

      每一个输出文件包含的HDFS块最大个数,需要为正整数。

      包含很多小文件的目录经过合并算法之后会被分成多个子目录,子目录包含一个数据文件和一个索引文件。默认一个子目录包含的文件个数是10个,100个小文件的话,会被分成10个子目录存储,如果是101个小文件,会被分成11个子目录存储。

4)压缩格式

      常见的压缩格式,比如deflate、snappy、bzip2,如果压缩个数输出为null,则表示不需要压缩。

5)索引间隔 Indexinterval

      为写入数据文件的每个' indexInterval '文件记录存储一个索引键。后面看算法代码大家可以继续理解这个参数的意思。

二、合并算法描述

      下面是合并算法的伪代码:

(一)

  hdfsBlockSize = configured HDFS block size
  maxEligibleSize = hdfsBlockSize * threshold
  mergableBytes = 0
  mergeableFileList = new ArrayList<String> ()

(二)
  
  for (currentFile: files in input directory) {
      fileLength = currentFile.length()
      
      if (fileLength <= maxEligibleSize) {
          mergableBytes = mergableBytes + fileLength
          mergeableFileList.add(currentFile)
      }
  }
  (三)
  //块个数
  numBlocks = mergableBytes / hdfsBlockSize
  if (numBlocks * hdfsBlockSize != mergableBytes) {
      numBlocks = numBlocks + 1
  }
  
  //子目录个数
  mergedFileNum = numBlocks / maxBlockNumber
  If (mergedFileNum * maxBlockNumber != 
      numBlocks){
      mergedFileNum = mergedFileNum + 1
  }


(四)
  mergeableFileList.sort() by file size in ascending order
  

(五)
  fileLists = new ArrayList<List<String>>(mergedFileNum);
  
  for (i=0; i< mergedFileNum; i++) {
      count = i
      while (count < mergeableFileList.size()) {
          currentFile = mergeableFileList.get(count)
          fileLists.get(i).add(currentFile)
          count = count + mergedFileNum
      }
  }
  

(六)
  for (currentList: file Lists) {
      currentList.sort() by file name in ascending order
      
      subDir = new File(“inputDirectory/subDir”).mkdirs()
      data = new File((“inputDirectory/subDir/data”)
      index = new File((“inputDirectory/subDir/index”)
      
      writtenFileRecord = 0
      
      for (currentFile: currentList) {
          data.append(currentFile)
          writtenFileRecord++
          
          If (writtenFileRecord %% indexInterval == 0) {
              fileSize = data.sync()
              index.append(currentFile.getName(),fileSize)
          }
      }
  }
  
  for (currentFile: mergeableFileList)
      currentFile.delete()

我们一起来走读下上述伪代码吧。上面我们根据颜色和标号将代码分成了六大部分。

第一段代码,就做了算法参数初始化的事。hdfsBlockSize代表HDFS块大小,mergableBytes代表一个目录下是和的合并文件大小,maxEligibleSize代表目录下最大合并文件大小。

第二段代码,计算了mergableBytes大小,并且将文件加入合并列表。

第三段代码,获取小文件合并成大文件的个数

第四段代码,按照文件大小升序将mergableFileList排序。

第五段代码,将文件列表分配成不同的文件列表,比如将file0,file3,file6,file9分配成子列0中,将file1,file4.file7,file10分配成子列1中。以此类推。

第六段代码,对于下面的子列:

  1.                       按照文件名称升序排列;
  2.                       创建包含一个数据文件和一个索引文件的子目录。

    -------------------------------------------------------------------

算法就是上面的了,下期我们尝试画个结合HDFS架构图来理解下

【版权声明】本文为华为云社区用户翻译文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容, 举报邮箱:cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。