《Hadoop权威指南:大数据的存储与分析》—5.2 压缩
5.2 压缩
文件压缩有两大好处:减少存储文件所需要的磁盘空间,并加速数据在网络和磁盘上的传输。这两大好处在处理大量数据时相当重要,所以我们值得仔细考虑在Hadoop中文件压缩的用法。
有很多种不同的压缩格式、工具和算法,它们各有千秋。表5-1列出了与Hadoop结合使用的常见压缩方法。
表5-1. 压缩格式总结
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 |
DEFLATE① | 无 | DEFLATE | .deflate | 否 |
gzip | gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | bzip2 | .bz2 | 是 |
LZO | lzop | LZO | .lzo | 否② |
LZ4 | 无 | LZ4 | .lz4 | 否 |
Snappy | 无 | Snappy | .snappy | 否 |
① DEFLATE是一个标准压缩算法,该算法的标准实现是zlib。没有可用于生成DEFLATE文件的常用命令行工具,因为通常都用gzip格式。注意,gzip文件格式只是在DEFLATE格式上增加了文件头和一个文件尾。.deflate文件扩展名是Hadoop约定的。
② 但是如果LZO文件已经在预处理过程中被索引了,那么LZO文件是可切分的。详情参见5.2.2节。
所有压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。表5-1列出的所有压缩工具都提供9个不同的选项来控制压缩时必须考虑的权衡:选项-1为优化压缩速度,-9为优化压缩空间。例如,下述命令通过最快的压缩方法创建一个名为file.gz的压缩文件:
%gzip -1 file
不同压缩工具有不同的压缩特性。gzip是一个通用的压缩工具,在空间/时间性能的权衡中,居于其他两个压缩方法之间。bzip2的压缩能力强于gzip,但压缩速度更慢一点。尽管bzip2的解压速度比压缩速度快,但仍比其他压缩格式要慢一些。另一方面,LZO、LZ4和Snappy均优化压缩速度,其速度比gzip快一个数量级,但压缩效率稍逊一筹。Snappy和LZ4的解压缩速度比LZO高出很多。[1]
表5-1中的“是否可切分”列表示对应的压缩算法是否支持切分(splitable),也就是说,是否可以搜索数据流的任意位置并进一步往下读取数据。可切分压缩格式尤其适合MapReduce,更多讨论,可以参见5.2.2节。
5.2.1 codec
Codec是压缩-解压缩算法的一种实现。在Hadoop中,一个对CompressionCodec接口的实现代表一个codec。例如,GzipCodec包装了gzip的压缩和解压缩算法。表5-2列举了Hadoop实现的codec。
表5-2. Hadoop的压缩codec
压缩格式 | HadoopCompressionCodec |
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
LZO代码库拥有GPL许可,因而可能没有包含在Apache的发行版本中,因此,Hadoop的codec需要单独从Google(http://code.google.com/p/hadoop-gpl-compression)或GitHub(http://github.com/kevinweil/hadoop-lzo)下载,该代码库包含有修正的软件错误及其他一些工具。LzopCodec与lzop工具兼容,LzopCodec基本上是LZO格式的但包含额外的文件头,因此这通常就是你想要的。也有针对纯LZO格式的LzoCodec,并使用.lzo_deflate作为文件扩展名(类似于DEFLATE,是gzip格式但不包含文件头)。
1. 通过CompressionCodec对数据流进行压缩和解压缩
CompressionCodec包含两个函数,可以轻松用于压缩和解压缩数据。如果要对写入输出数据流的数据进行压缩,可用createOutputStream (OutputStream out)方法在底层的数据流中对需要以压缩格式写入在此之前尚未压缩的数据新建一个CompressionOutputStream对象。相反,对输入数据流中读取的数据进行解压缩的时候,则调用createInputStream(InputStream in)获取CompressionInputStream,可以通过该方法从底层数据流读取解压缩后的数据。
CompressionOutputStream和CompressionInputStream,类似于java.util. zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream,只不过前两者能够重置其底层的压缩或解压缩方法,对于某些将部分数据流(section of data stream)压缩为单独数据块(block)的应用,例如SequenceFile(详情参见5.4.1节对SequenceFile类的讨论),这个能力是非常重要的。
范例5-1显示了如何用API来压缩从标准输入中读取的数据并将其写到标准输出。
范例5-1. 该程序压缩从标准输入读取的数据,然后将其写到标准输出
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}
该应用希望将符合CompressionCodec实现的完全合格名称作为第一个命令行参数。我们使用ReflectionUtils新建一个codec实例,并由此获得在System.out上支持压缩的一个包裹方法。然后,对IOUtils对象调用copyBytes()方法将输入数据复制到输出,(输出由CompressionOutputStream对象压缩)。最后,我们对CompressionOutputStream对象调用finish()方法,要求压缩方法完成到压缩数据流的写操作,但不关闭这个数据流。我们可以用下面这行命令做一个测试,通过GzipCodec的StreamCompressor对象对字符串“Text”进行压缩,然后使用gunzip从标准输入中对它进行读取并解压缩操作:
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io. compress.GzipCodec \
| gunzip
Text
2. 通过CompressionCodecFactory推断CompressionCodec
在读取一个压缩文件时,通常可以通过文件扩展名推断需要使用哪个codec。如果文件以.gz结尾,则可以用GzipCodec来读取,如此等等。前面的表5-1为每一种压缩格式列举了文件扩展名。
通过使用其getCodec()方法,CompressionCodecFactory提供了一种可以将文件扩展名映射到一个CompressionCodec的方法,该方法取文件的Path对象作为参数。范例5-2所示的应用便使用这个特性来对文件进行解压缩。
范例5-2. 该应用根据文件扩展名选取codec解压缩文件
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri =
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
一旦找到对应的codec,便去除文件扩展名形成输出文件名,这是通过CompressionCodecFactory对象的静态方法removeSuffix()来实现的。按照这种方法,一个名为file.gz的文件可以通过调用该程序解压为名为file的文件:
% hadoop FileDecompressor file.gz
CompressionCodecFactory加载表5-2中除LZO之外的所有codec,同样也加载io.compression.codecs配置属性(参见表5-3)列表中的所有codec。在默认情况下,该属性列表是空的,你可能只有在你拥有一个希望注册的定制codec(例如外部管理的LZO codec)时才需要加以修改。每个codec都知道自己默认的文件扩展名,因此CompressionCodecFactory可通过搜索注册的codec找到匹配指定文件扩展名的codec(如果有的话)。
表5-3. 压缩codec的属性
属性名称 | 类型 | 默认值 | 描述 |
io.compression.codecs | 逗号分隔的类名 | 用于压缩/解压缩的额外的CompressionCodec类的列表 |
3. 原生类库
为了提高性能,最好使用“原生”(native)类库来实现压缩和解压缩。例如,在一个测试中,使用原生gzip类库可以减少约一半的解压缩时间和约10%的压缩时间(与内置的Java实现相比)。表5-4说明了每种压缩格式是否有Java实现和原生类库实现。所有的格式都有原生类库实现,但是并非所有格式都有Java实现(如LZO)。
表5-4. 压缩代码库的实现
压缩格式 | 是否有Java实现 | 是否有原生实现 |
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
LZ4 | 否 | 是 |
Snappy | 否 | 是 |
Apache Hadoop 二进制压缩包本身包含有为64位Linux构建的原生压缩二进制代码,称为libhadoop.so。对于其他平台,需要自己根据位于源文件树最顶层的BUILDING.txt指令编译代码库。
可以通过Java系统的java.library.path属性指定原生代码库。etc/hadoop文件夹中的hadoop脚本可以帮你设置该属性,但如果不用这个脚本,则需要在应用中手动设置该属性。
默认情况下,Hadoop会根据自身运行的平台搜索原生代码库,如果找到相应的代码库就会自动加载。这意味着,你无需为了使用原生代码库而修改任何设置。但是,在某些情况下,例如调试一个压缩相关问题时,可能需要禁用原生代码库。将属性io.native.lib.available的值设置成false即可,这可确保使用内置的Java代码库(如果有的话)。
4. CodecPool
如果使用的是原生代码库并且需要在应用中执行大量压缩和解压缩操作,可以考虑使用CodecPool,它支持反复使用压缩和解压缩,以分摊创建这些对象的开销。
范例5-3中的代码显示了API函数,不过在这个程序中,它只新建了一个Compressor,并不需要使用压缩/解压缩池。
范例5-3. 使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out =
codec.createOutputStream(System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
} finally {
CodecPool.returnCompressor(compressor);
}
}
}
在codec的重载方法createOutputStream()中,对于指定的CompressionCodec,我们从池中获取一个Compressor实例。通过使用finally数据块,我们在不同的数据流之间来回复制数据,即使出现IOException异常,也可以确保compressor可以返回池中。
- 点赞
- 收藏
- 关注作者
评论(0)