Hadoop Streaming完成大数据处理详解(下)

举报
最后一个好人 发表于 2020/12/26 10:21:41 2020/12/26
【摘要】 书接上回

Hadoop Streaming完成大数据处理详解(下)


为作业指定附加配置参数

用户可以使用“-jobconf =”增加一些配置变量。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper\

    -reducer /bin/wc \

    -jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用两个reducer完成作业。
-jobconf mapred.map.tasks=2表明用两个mapper完成作业。

其他选项

Streaming 作业的其他选项如下表:

选项

可选/必须

描述

-cluster name

可选

在本地Hadoop集群与一个或多个远程集群间切换

-dfs host:port or local

可选

覆盖作业的HDFS配置

-jt host:port or local

可选

覆盖作业的JobTracker配置

-additionalconfspec specfile

可选

用一个类似于hadoop-site.xml的XML文件保存所有配置,从而不需要用多个"-jobconf name=value"类型的选项单独为每个配置变量赋值

-cmdenv name=value

可选

传递环境变量给streaming命令

-cacheFile fileNameURI

可选

指定一个上传到HDFS的文件

-cacheArchive fileNameURI

可选

指定一个上传到HDFS的jar文件,这个jar文件会被自动解压缩到当前工作目录下

-inputreader JavaClassName

可选

为了向下兼容:指定一个record reader类(而不是input format类)

-verbose

可选

详细输出

使用-cluster 实现“本地”Hadoop和一个或多个远程Hadoop集群间切换。默认情况下,使用hadoop-default.xml和hadoop-site.xml;当使用-cluster 选项时,会使用$HADOOP_HOME/conf/hadoop-.xml。

下面的选项改变temp目录:

-jobconf dfs.data.dir=/tmp

下面的选项指定其他本地temp目录:

-jobconf mapred.local.dir=/tmp/local

-jobconf mapred.system.dir=/tmp/system

-jobconf mapred.temp.dir=/tmp/temp

更多有关jobconf的细节请参考:http://wiki.apache.org/hadoop/JobConfFile

streaming命令中设置环境变量:

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

高级功能与其他例子

使用自定义的方法切分行来形成Key/Value对

之前已经提到,当Map/Reduce框架从mapper的标准输入读取一行时,它把这一行切分为key/value对。 在默认情况下,每行第一个tab符之前的部分作为key,之后的部分作为value(不包括tab符)。

但是,用户可以自定义,可以指定分隔符是其他字符而不是默认的tab符,或者指定在第n(n>=1)个分割符处分割而不是默认的第一个。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

    -jobconf stream.map.output.field.separator=. \

    -jobconf stream.num.map.output.key.fields=4

在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作为map输出内容的分隔符,并且从在第四个“.”之前的部分作为key,之后的部分作为value(不包括这第四个“.”)。 如果一行中的“.”少于四个,则整行的内容作为key,value设为空的Text对象(就像这样创建了一个Text:new Text(""))。

同样,用户可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”来指定reduce输出的行中,第几个分隔符处分割key和value。

一个实用的Partitioner类 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 选项)

Hadoop有一个工具类org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在应用程序中很有用。Map/reduce框架用这个类切分map的输出, 切分是基于key值的前缀,而不是整个key。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    -jobconf stream.map.output.field.separator=. \

    -jobconf stream.num.map.output.key.fields=4 \

    -jobconf map.output.key.field.separator=. \

    -jobconf num.key.fields.for.partition=2 \

    -jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用这两个变量来得到mapper的key/value对。

上面的Map/Reduce 作业中map输出的key一般是由“.”分割成的四块。但是因为使用了 -jobconf num.key.fields.for.partition=2 选项,所以Map/Reduce框架使用key的前两块来切分map的输出。其中,-jobconf map.output.key.field.separator=. 指定了这次切分使用的key的分隔符。这样可以保证在所有key/value对中, key值前两个块值相同的所有key被分到一组,分配给一个reducer。

这种高效的方法等价于指定前两块作为主键,后两块作为副键。 主键用于切分块,主键和副键的组合用于排序。一个简单的示例如下:

Map的输出(key)

11.12.1.2

11.14.2.3

11.11.4.1

11.12.1.1

11.14.2.2

切分给3个reducer(前两块的值用于切分)

11.11.4.1

-----------

11.12.1.2

11.12.1.1

-----------

11.14.2.3

11.14.2.2

在每个切分后的组内排序(四个块的值都用于排序)

11.11.4.1

-----------

11.12.1.1

11.12.1.2

-----------

11.14.2.2

11.14.2.3

Hadoop聚合功能包的使用(-reduce aggregate 选项)

Hadoop有一个工具包“Aggregate”( https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。 “Aggregate”提供一个特殊的reducer类和一个特殊的combiner类, 并且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用于聚合一组value的序列。 用户可以使用Aggregate定义一个mapper插件类, 这个类用于为mapper输入的每个key/value对产生“可聚合项”。 combiner/reducer利用适当的聚合器聚合这些可聚合项。

要使用Aggregate,只需指定“-reducer aggregate”:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper myAggregatorForKeyCount.py \

    -reducer aggregate \

    -file myAggregatorForKeyCount.py \

    -jobconf mapred.reduce.tasks=12

python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/python

import sys;

def generateLongCountToken(id):

    return "LongValueSum:" + id + "\t" + "1"

def main(argv):

    line = sys.stdin.readline();

    try:

        while line:

            line = line[:-1];

            fields = line.split("\t");

            print generateLongCountToken(fields[0]);

            line = sys.stdin.readline();

    except "end of file":

        return Noneif __name__ == "__main__":

     main(sys.argv)

字段的选取(类似于unix中的 'cut' 命令)

Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduce帮助用户高效处理文本数据, 就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab), 可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\

    -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    -jobconf map.output.key.field.separa=. \

    -jobconf num.key.fields.for.partition=2 \

    -jobconf mapred.data.field.separator=. \

    -jobconf map.output.key.value.fields.spec=6,5,1-3:0- \

    -jobconf reduce.output.key.value.fields.spec=0-2:5- \

    -jobconf mapred.reduce.tasks=12

选项“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何为map的输出选取key和value。Key选取规则和value选取规则由“:”分割。 在这个例子中,map输出的key由字段6,5,1,2和3组成。输出的value由所有字段组成(“0-”指字段0以及之后所有字段)。

选项“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(译者注:此处应为”0-2:5-“)指定如何为reduce的输出选取value。 本例中,reduce的输出的key将包含字段0,1,2(对应于原始的字段6,5,1)。 reduce输出的value将包含起自字段5的所有字段(对应于所有的原始字段)。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。