Hadoop Streaming完成大数据处理详解(上)

举报
最后一个好人 发表于 2020/12/26 10:15:36 2020/12/26
【摘要】 Hadoop生态有非常多的工具可以用于大数据的管理和数据处理。这里我们给大家详细介绍一下,如何使用Hadoop Streaming这个方式,对大数据进行处理。

Hadoop Streaming完成大数据处理详解

Hadoop生态有非常多的工具可以用于大数据的管理和数据处理。这里我们给大家详细介绍一下,如何使用Hadoop Streaming这个方式,对大数据进行处理。

Hadoop Streaming

Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如在hadoop环境下的命令行可以执行:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper /bin/cat \

    -reducer /bin/wc

看不懂别着急,咱们马上来分析一下上述的代码。

Streaming工作原理

在上面的例子里,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。

如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。
mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。
默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。不过,这可以定制,在下文中将会讨论如何自定义key和value的切分方式。

如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。
Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。
默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。在下文中将会讨论如何自定义key和value的切分方式。

这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

用户也可以使用java类作为mapper或者reducer。上面的例子与这里的代码等价:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \

    -reducer /bin/wc

用户可以设定stream.non.zero.exit.is.failure true 或false 来表明streaming task的返回值非零时是 Failure 还是Success。默认情况,streaming task返回非零时表示失败。

将文件打包到提交的作业中

我们要开始讲关键点了,并不是每位同学都对java熟悉程度这么高。没关系,hadoop允许我们用脚本语言完成处理过程,并把文件打包提交到作业中,完成大数据的处理。

任何可执行文件都可以被指定为mapper/reducer。这些可执行文件不需要事先存放在集群上;如果在集群上还没有,则需要用-file选项让framework把可执行文件作为作业的一部分,一起打包提交。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper myPythonScript.py \

    -reducer /bin/wc \

    -file myPythonScript.py

上面的例子描述了一个用户把可执行python文件作为mapper。 其中的选项“-file myPythonScirpt.py”使可执行python文件作为作业提交的一部分被上传到集群的机器上。

除了可执行文件外,其他mapper或reducer需要用到的辅助文件(比如字典,配置文件等)也可以用这种方式打包上传。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper myPythonScript.py \

    -reducer /bin/wc \

    -file myPythonScript.py \

    -file myDictionary.txt

Streaming选项与用法

只使用Mapper的作业

有时只需要map函数处理输入数据。这时只需把mapred.reduce.tasks设置为零,Map/reduce框架就不会创建reducer任务,mapper任务的输出就是整个作业的最终输出。

为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-jobconf mapred.reduce.tasks=0”等价。

为作业指定其他插件

和其他普通的Map/Reduce作业一样,用户可以为streaming作业指定其他插件:

-inputformat JavaClassName -outputformat JavaClassName -partitioner JavaClassName -combiner JavaClassName 用于处理输入格式的类要能返回Text类型的key/value对。如果不指定输入格式,则默认会使用TextInputFormat。 因为TextInputFormat得到的key值是LongWritable类型的(其实key值并不是输入文件中的内容,而是value偏移量), 所以key会被丢弃,只把value用管道方式发给mapper。

用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。

Hadoop Streaming中的大文件和档案

任务使用-cacheFile和-cacheArchive选项在集群中分发文件和档案,选项的参数是用户已上传至HDFS的文件或档案的URI。这些文件和档案在不同的作业间缓存。用户可以通过fs.default.name.config配置参数的值得到文件所在的host和fs_port。

这个是使用-cacheFile选项的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

在上面的例子里,url中#后面的部分是建立在任务当前工作目录下的符号链接的名字。这里的任务的当前工作目录下有一个“testlink”符号链接,它指向testfile.txt文件在本地的拷贝。如果有多个文件,选项可以写成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive选项用于把jar文件拷贝到任务当前工作目录并自动把jar文件解压缩。例如:

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

在上面的例子中,testlink3是当前工作目录下的符号链接,它指向testfile.jar解压后的目录。

下面是使用-cacheArchive选项的另一个例子。其中,input.txt文件有两行内容,分别是两个文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向档案目录(jar文件解压后的目录)的符号链接,这个目录下有“cache.txt”和“cache2.txt”两个文件。

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

                  -input "/user/me/samples/cachefile/input.txt"  \

                  -mapper "xargs cat"  \

                  -reducer "cat"  \

                  -output "/user/me/samples/cachefile/out" \  

                  -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \  

                  -jobconf mapred.map.tasks=1 \

                  -jobconf mapred.reduce.tasks=1 \

                  -jobconf mapred.job.name="Experiment"

再来看一样过程和内容

$ ls test_jar/

cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .

added manifest

adding: cache.txt(in = 30) (out= 29)(deflated 3%)

adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hadoop dfs -put cachedir.jar samples/cachefile

$ hadoop dfs -cat /user/me/samples/cachefile/input.txt

testlink/cache.txt

testlink/cache2.txt

$ cat test_jar/cache.txt

This is just the cache string

$ cat test_jar/cache2.txt

This is just the second cache string

$ hadoop dfs -ls /user/me/samples/cachefile/out      

Found 1 items

/user/me/samples/cachefile/out/part-00000  <r 3>   69

$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000

This is just the cache string   

This is just the second cache string

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。