《Hadoop权威指南:大数据的存储与分析》—5.3 序列化

举报
清华大学出版社 发表于 2019/10/12 18:48:56 2019/10/12
【摘要】 本节书摘来自清华大学出版社《Hadoop权威指南:大数据的存储与分析》一书中第五章,第5.3.1节,作者是Tom White , 王 海 华 东 刘 喻 吕粤海 译。

5.3  序列化

序列化(serialization)是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。反序列化(deserialization)是指将字节流转回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储。

Hadoop中,系统中多个节点上进程间的通信是通过远程过程调用”(RPCremote procedure call)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下。

l   紧凑
紧凑格式能充分利用网络带宽(数据中心中最稀缺的资源)

 

l   快速
进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是最基本的。

 

l   可扩展
为了满足新的需求,协议不断变化。所以在控制客户端和服务器的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增添新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)

 

l   支持互操作
对于某些系统来说,希望能支持以不同语言写的客户端与服务器交互,所以需要设计需要一种特定的格式来满足这一需求。

 

表面看来,序列化框架对选择用于数据持久存储的数据格式应该会有不同的要求。毕竟,RPC的存活时间不到1秒钟,持久存储的数据却可能在写到磁盘若干年后才会被读取。但结果是,RPC序列化格式的四大理想属性对持久存储格式而言也很重要。我们希望存储格式比较紧凑(进而高效使用存储空间)、快速(/写数据的额外开销比较小)、可扩展(可以透明地读取老格式的数据)且可以互操作(以可以使用不同的语言读/写永久存储的数据)

Hadoop使用的是自己的序列化格式Writable,它绝对紧凑、速度快,但不太容易用Java以外的语言进行扩展或使用。因为WritableHadoop的核心(大多数MapReduce程序都会为键和值类型使用它),所以在接下来的三个小节中,我们要进行深入探讨,然后再介绍Hadoop支持的其他序列化框架。Avro(一个克服了Writable部分不足的序列化系统)将在第12章中讨论。

5.3.1  Writable接口

Writable接口定义了两个方法:一个将其状态写入DataOutput二进制流,另一个从DataInput二进制流读取状态:

package org.apache.hadoop.io;

 

import java.io.DataOutput;

import java.io.DataInput;

import java.io.IOException;

 

public interface Writable {

  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;

}

 

让我们通过一个特殊的Writable类来看看它的具体用途。我们将使用IntWritable来封装Java int类型。我们可以新建一个对象并使用set()方法来设置它的值:

IntWritable writable = new IntWritable();

writable.set(163);

 

也可以通过使用一个整数值作为输入参数的构造函数来新建一个对象:

IntWritable writable = new IntWritable(163);

 

为了检查IntWritable的序列化形式,我们在java.io.DataOutputStream (java.io.DataOutput的一个实现)中加入一个帮助函数来封装java.io.ByteArrayOutputSteam,以便在序列化流中捕捉字节:

public static byte[] serialize(Writable writable) throws IOException {

  ByteArrayOutputStream out = new ByteArrayOutputStream();

  DataOutputStream dataOut = new DataOutputStream(out);

  writable.write(dataOut);

  dataOut.close();

  return out.toByteArray();

}

 

一个整数占用4个字节(因为我们使用JUnit4进行声明)

byte[] bytes = serialize(writable);

assertThat(bytes.length, is(4));

 

每个字节是按照大端顺序写入的(按照java.io.DataOutput接口中的声明,最重要的字节先写入流),并且通过HadoopStringUtils我们可以看到这些字节的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

 

让我们试试反序列化。我们再次新建一个辅助方法,从一个字节数组中读取一个Writable对象:

public static byte[] deserialize(Writable writable, byte[] bytes)

    throws IOException {

  ByteArrayInputStream in = new ByteArrayInputStream(bytes);

  DataInputStream dataIn = new DataInputStream(in);

  writable.readFields(dataIn);

  dataIn.close();

  return bytes;

}

 

我们构建了一个新的、空值的 IntWritable对象,然后调用deserialize()方法从我们刚写的输出数据中读取数据。最后,我们看到该值(通过get()方法获取)是原始的数值163

IntWritable newWritable = new IntWritable();

deserialize(newWritable, bytes);

assertThat(newWritable.get(), is(163));

WritableComparable接口和comparator

IntWritable实现原始的WritableComparable接口,该接口继承自Writablejava.lang.Comparable接口:

package org.apache.hadoop.io;

 

public interface WritableComparable<T> extends Writable, Comparable<T> {

}

 

MapReduce来说,类型比较非常重要,因为中间有个基于键的排序阶段。Hadoop提供的一个优化接口是继承自Java ComparatorRawComparator口:

package org.apache.hadoop.io;

import java.util.Comparator;

public interface RawComparator<T> extends Comparator<T> {

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

 

}

 

该接口允许其实现直接比较数据流中的记录,无须先把数据流反序列化为对象,这样便避免了新建对象的额外开销。例如,我们根据IntWritable接口实现的comparator实现原始的compare()方法,该方法可以从每个字节数组b1b2中读取给定起始位置(s1s2)以及长度(l1l2)的一个整数进而直接进行比较。

WritableComparator是对继承自WritableComparable类的RawComparator类的一个通用实现。它提供两个主要功能。第一,它提供了对原始compare()方法的一个默认实现,该方法能够反序列化将在流中进行比较的对象,并调用对象的compare()方法。第二,它充当的是RawComparator实例的工厂(已注册Writable的实现)。例如,为了获得IntWritablecomparator,我们直接如下调用:

RawComparator<IntWritable> comparator = WritableComparator.get (IntWritable.class);

 

这个comparator可以用于比较两个IntWritable对象:

IntWritable w1 = new IntWritable(163);

IntWritable w2 = new IntWritable(67);

assertThat(comparator.compare(w1, w2), greaterThan(0));

 

或其序列化表示:

byte[] b1 = serialize(w1);

byte[] b2 = serialize(w2);

assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length),

greaterThan(0));


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。