《Hadoop权威指南:大数据的存储与分析》—5.3.3 实现定制的Writable集合
5.3.3 实现定制的Writable集合
Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在有些情况下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。由于Writable是MapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。虽然Hadoop自带的Writable实现已经过很好的性能调优,但如果希望将结构调整得更好,更好的做法往往是新建一个Writable类型(而不是组合打包的类型)。
如果你正考虑写一个定制的Writable,值得尝试另一种序列化框架,例如Avro,允许你以声明方式定义定制的类型。详情可以参见5.3.4节有关序列化框架的内容及第12章。
为了演示如何新建一个定制的Writable,我们写一个表示一对字符串的实现,名为TextPair。范例5-7展示了最基本的实现。
范例5-7. 存储一对Text对象的Writable实现
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
这个定制Writable实现的第一部分非常直观:包括两个Text实例变量(first和second)和相关的构造函数,以及setter和getter(即设置函数和提取函数)。所有Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行实例化,然后再调用readFields()函数查看(填充)各个字段的值。Writable实例是可变的并且通常可以重用,所以应该尽量避免在write()或readFields()方法中分配对象。
通过让Text对象自我表示,TextPair类的write()方法依次将每个Text对象序列化到输出流中。类似的,通过每个Text对象的表示,readFields()方法对来自输入流的字节进行反序列化。DataOutput和DataInput接口有一套丰富的方法可以用于对Java基本类型进行序列化和反序列化,所以,在通常情况下,你可以完全控制Writable对象在线上传输/交换(的数据)的格式(数据传输格式)。
就像针对Java语言构造的任何值对象那样,需要重写java.lang.Object中的hashCode()、equals()和toString()方法。HashPartitioner (MapReduce中的默认分区类)通常用hashCode()方法来选择reduce分区,所以应该确保有一个比较好的哈希函数来保证每个reduce分区的大小相似。
即便计划结合使用TextOutputFormat和定制的Writable,也得自己动手实现toString()方法。TextOutputFormat对键和值调用toString()方法,将键和值转换为相应的输出表示。针对TextPair,我们将原始的Text对象作为字符串写到输出,各个字符串之间要用制表符来分隔。
TextPair是WritableComparable的一个实现,所以它提供了compareTo()方法,该方法可以强制数据排序:先按第一个字符排序,如果第一个字符相同,则按照第二个字符排序。注意,除了可存储的Text对象数目, TextPair不同于TextArrayWritable(前一小节中已经提到),因为TextArrayWritable只继承Writable,并没有继承WritableComparable。
1. 为提高速度实现一个RawComparator
范例5-7中的TextPair代码可以按照其描述的基本方式运行;但我们也可以进一步优化。按照5.3.1节的说明,当TextPair被用作MapReduce中的键时,需要将数据流反序列化为对象,然后再调用compareTo()方法进行比较。那么有没有可能看看它们的序列化表示就可以比较两个TextPair对象呢?
事实证明,我们可以这样做,因为TextPair是两个Text对象连接而成的,而Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。详细过程参见范例5-8 (注意,这段代码已嵌入TextPair类中)。
范例5-8. 用于比较TextPair字节表示的RawComparator
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
事实上,我们采取的做法是继承WritableComparable类,而非实现RawComparator接口,因为它提供了一些比较好用的方法和默认实现。这段代码最本质的部分是计算firstL1和firstL2,这两个参数表示每个字节流中第一个Text字段的长度。两者分别由变长整数的长度(由WritableUtils的decodeVIntSize()方法返回)和编码值(由readVInt()方法返回)组成。
2. 定制的comparator
从TextPair可以看出,编写原始的comparator需要谨慎,因为必须要处理字节级别的细节。如果真的需要自己写comparator,有必要参考org.apache.hadoop.io包中对Writable接口的实现。WritableUtils提供的方法也比较好用。
如果可能,定制的comparator也应该继承自RawComparator。这些comparator定义的排列顺序不同于默认comparator定义的自然排列顺序。范例5-9显示了一个针对TextPair类型的comparator,称为FirstCompartator,它只考虑TextPair对象的第一个字符串。注意,我们重载了针对该类对象的compare()方法,使两个compare()方法有相同的语法。
范例5-9. 定制的RawComparator用于比较TextPair对象字节表示的第一个字段
public static class FirstComparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt (b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt (b2, s2);
return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first);
}
return super.compare(a, b);
}
}
第9章在介绍MapReduce的连接操作和辅助排序(参见9.3节)的时候,将使用这个comparator。
- 点赞
- 收藏
- 关注作者
评论(0)