《Hadoop权威指南:大数据的存储与分析》—5.3.3 实现定制的Writable集合

举报
清华大学出版社 发表于 2019/10/12 22:09:52 2019/10/12
【摘要】 本节书摘来自清华大学出版社《Hadoop权威指南:大数据的存储与分析》一书中第五章,第5.3.3节,作者是Tom White , 王 海 华 东 刘 喻 吕粤海 译。

5.3.3  实现定制的Writable集合

Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在有些情况下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。由于WritableMapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。虽然Hadoop自带的Writable实现已经过很好的性能调优,但如果希望将结构调整得更好,更好的做法往往是新建一个Writable类型(而不是组合打包的类型)

如果你正考虑写一个定制的Writable,值得尝试另一种序列化框架,例如Avro,允许你以声明方式定义定制的类型。详情可以参见5.3.4节有关序列化框架的内容及第12章。

 

为了演示如何新建一个定制的Writable,我们写一个表示一对字符串的实现,名为TextPair。范例5-7展示了最基本的实现。

范例5-7. 存储一对Text对象的Writable实现

import java.io.*;

 

import org.apache.hadoop.io.*;

 

public class TextPair implements WritableComparable<TextPair> {

 

  private Text first;

  private Text second;

 

  public TextPair() {

    set(new Text(), new Text());

  }

 

  public TextPair(String first, String second) {

    set(new Text(first), new Text(second));

  }

 

  public TextPair(Text first, Text second) {

    set(first, second);

  }

 

  public void set(Text first, Text second) {

    this.first = first;

    this.second = second;

  }

 

  public Text getFirst() {

    return first;

  }

 

  public Text getSecond() {

    return second;

  }

 

  @Override

  public void write(DataOutput out) throws IOException {

    first.write(out);

    second.write(out);

  }

 

@Override

  public void readFields(DataInput in) throws IOException {

    first.readFields(in);

    second.readFields(in);

  }

 

  @Override

  public int hashCode() {

    return first.hashCode() * 163 + second.hashCode();

  }

 

  @Override

  public boolean equals(Object o) {

    if (o instanceof TextPair) {

      TextPair tp = (TextPair) o;

      return first.equals(tp.first) && second.equals(tp.second);

    }

    return false;

  }

 

@Override

  public String toString() {

    return first + "\t" + second;

  }

 

  @Override

  public int compareTo(TextPair tp) {

    int cmp = first.compareTo(tp.first);

    if (cmp != 0) {

      return cmp;

    }

    return second.compareTo(tp.second);

  }

}

 

这个定制Writable实现的第一部分非常直观:包括两个Text实例变量(firstsecond)和相关的构造函数,以及settergetter(即设置函数和提取函数)。所有Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行实例化,然后再调用readFields()函数查看(填充)各个字段的值。Writable实例是可变的并且通常可以重用,所以应该尽量避免在write()readFields()方法中分配对象。

通过让Text对象自我表示,TextPair类的write()方法依次将每个Text对象序列化到输出流中。类似的,通过每个Text对象的表示,readFields()方法对来自输入流的字节进行反序列化。DataOutputDataInput接口有一套丰富的方法可以用于对Java基本类型进行序列化和反序列化,所以,在通常情况下,你可以完全控制Writable对象在线上传输/交换(的数据)的格式(数据传输格)

就像针对Java语言构造的任何值对象那样,需要重写java.lang.Object中的hashCode()equals()toString()方法。HashPartitioner (MapReduce中的默认分区类)通常用hashCode()方法来选择reduce分区,所以应该确保有一个比较好的哈希函数来保证每个reduce分区的大小相似。

即便计划结合使用TextOutputFormat和定制的Writable,也得自己动手实现toString()方法。TextOutputFormat对键和值调用toString()方法,将键和值转换为相应的输出表示。针对TextPair,我们将原始的Text对象作为字符串写到输出,各个字符串之间要用制表符来分隔。

TextPairWritableComparable的一个实现,所以它提供了compareTo()方法,该方法可以强制数据排序:先按第一个字符排序,如果第一个字符相同,则按照第二个字符排序。注意,除了可存储的Text对象数目, TextPair不同TextArrayWritable(前一小节中已经提到),因TextArrayWritable只继承Writable,并没有继承WritableComparable

1. 为提高速度实现一个RawComparator

范例5-7中的TextPair代码可以按照其描述的基本方式运行;但我们也可以进一步优化。按照5.3.1节的说明,当TextPair被用作MapReduce中的键时,需要将数据流反序列化为对象,然后再调用compareTo()方法进行比较。那么有没有可能看看它们的序列化表示就可以比较两个TextPair对象呢?

事实证明,我们可以这样做,因为TextPair两个Text对象连接而成的,而Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。详细过程参见范例5-8 (注意,这段代码已嵌入TextPair类中)

范例5-8. 用于比较TextPair字节表示的RawComparator

public static class Comparator extends WritableComparator {

        private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public Comparator() {

      super(TextPair.class);

    }

 

    @Override

    public int compare(byte[] b1, int s1, int l1,

                         byte[] b2, int s2, int l2) {

       try {

        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);

        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);

        int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);

        if (cmp != 0) {

          return cmp;

        }

        return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,

                                          b2, s2 + firstL2, l2 - firstL2);

      } catch (IOException e) {

        throw new IllegalArgumentException(e);

      }

    }

  }

 

  static {

    WritableComparator.define(TextPair.class, new Comparator());

  }

 

事实上,我们采取的做法是继承WritableComparable类,而非实现RawComparator接口,因为它提供了一些比较好用的方法和默认实现。这段代码最本质的部分是计算firstL1firstL2,这两个参数表示每个字节流中第一个Text字段的长度。两者分别由变长整数的长度(WritableUtilsdecodeVIntSize()方法返回)和编码值(readVInt()方法返回)组成。

2. 定制的comparator

TextPair可以看出,编写原始的comparator需要谨慎,因为必须要处理字节级别的细节。如果真的需要自己写comparator,有必要参考org.apache.hadoop.io包中对Writable接口的实现。WritableUtils提供的方法也比较好用。

如果可能,定制的comparator也应该继承自RawComparator。这些comparator定义的排列顺序不同于默认comparator定义的自然排列顺序。范例5-9显示了一个针对TextPair类型的comparator,称为FirstCompartator,它只考虑TextPair对象的第一个字符串。注意,我们重载了针对该类对象的compare()方法,使两个compare()方法有相同的语法。

范例5-9. 定制的RawComparator用于比较TextPair对象字节表示的第一个字段

public static class FirstComparator extends WritableComparator {

    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public FirstComparator() {

      super(TextPair.class);

    }

 

    @Override

    public int compare(byte[] b1, int s1, int l1,

                        byte[] b2, int s2, int l2) {

      try {

        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt (b1, s1);

        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt (b2, s2);

        return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);

      } catch (IOException e) {

        throw new IllegalArgumentException(e);

      }

    }

   

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

      if (a instanceof TextPair && b instanceof TextPair) {

        return ((TextPair) a).first.compareTo(((TextPair) b).first);

      }

      return super.compare(a, b);

    }

  }

 

9章在介绍MapReduce的连接操作和辅助排序(参见9.3)的时候,将使用这个comparator


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。