Hadoop Streaming完成大数据处理详解(上)
Hadoop Streaming完成大数据处理详解(上)
Hadoop生态有非常多的工具可以用于大数据的管理和数据处理。这里我们给大家详细介绍一下,如何使用Hadoop Streaming这个方式,对大数据进行处理。
Hadoop Streaming
Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如在hadoop环境下的命令行可以执行:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
看不懂别着急,咱们马上来分析一下上述的代码。
Streaming工作原理
在上面的例子里,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。
mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。
默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。不过,这可以定制,在下文中将会讨论如何自定义key和value的切分方式。
如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。
Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。
默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。在下文中将会讨论如何自定义key和value的切分方式。
这是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。
用户也可以使用java类作为mapper或者reducer。上面的例子与这里的代码等价:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /bin/wc
用户可以设定stream.non.zero.exit.is.failure true 或false 来表明streaming task的返回值非零时是 Failure 还是Success。默认情况,streaming task返回非零时表示失败。
将文件打包到提交的作业中
我们要开始讲关键点了,并不是每位同学都对java熟悉程度这么高。没关系,hadoop允许我们用脚本语言完成处理过程,并把文件打包提交到作业中,完成大数据的处理。
任何可执行文件都可以被指定为mapper/reducer。这些可执行文件不需要事先存放在集群上;如果在集群上还没有,则需要用-file选项让framework把可执行文件作为作业的一部分,一起打包提交。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-file myPythonScript.py
上面的例子描述了一个用户把可执行python文件作为mapper。 其中的选项“-file myPythonScirpt.py”使可执行python文件作为作业提交的一部分被上传到集群的机器上。
除了可执行文件外,其他mapper或reducer需要用到的辅助文件(比如字典,配置文件等)也可以用这种方式打包上传。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-file myPythonScript.py \
-file myDictionary.txt
Streaming选项与用法
只使用Mapper的作业
有时只需要map函数处理输入数据。这时只需把mapred.reduce.tasks设置为零,Map/reduce框架就不会创建reducer任务,mapper任务的输出就是整个作业的最终输出。
为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-jobconf mapred.reduce.tasks=0”等价。
为作业指定其他插件
和其他普通的Map/Reduce作业一样,用户可以为streaming作业指定其他插件:
-inputformat JavaClassName -outputformat JavaClassName -partitioner JavaClassName -combiner JavaClassName 用于处理输入格式的类要能返回Text类型的key/value对。如果不指定输入格式,则默认会使用TextInputFormat。 因为TextInputFormat得到的key值是LongWritable类型的(其实key值并不是输入文件中的内容,而是value偏移量), 所以key会被丢弃,只把value用管道方式发给mapper。
用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。
Hadoop Streaming中的大文件和档案
任务使用-cacheFile和-cacheArchive选项在集群中分发文件和档案,选项的参数是用户已上传至HDFS的文件或档案的URI。这些文件和档案在不同的作业间缓存。用户可以通过fs.default.name.config配置参数的值得到文件所在的host和fs_port。
这个是使用-cacheFile选项的例子:
-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink
在上面的例子里,url中#后面的部分是建立在任务当前工作目录下的符号链接的名字。这里的任务的当前工作目录下有一个“testlink”符号链接,它指向testfile.txt文件在本地的拷贝。如果有多个文件,选项可以写成:
-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2
-cacheArchive选项用于把jar文件拷贝到任务当前工作目录并自动把jar文件解压缩。例如:
-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3
在上面的例子中,testlink3是当前工作目录下的符号链接,它指向testfile.jar解压后的目录。
下面是使用-cacheArchive选项的另一个例子。其中,input.txt文件有两行内容,分别是两个文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向档案目录(jar文件解压后的目录)的符号链接,这个目录下有“cache.txt”和“cache2.txt”两个文件。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input "/user/me/samples/cachefile/input.txt" \
-mapper "xargs cat" \
-reducer "cat" \
-output "/user/me/samples/cachefile/out" \
-cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \
-jobconf mapred.map.tasks=1 \
-jobconf mapred.reduce.tasks=1 \
-jobconf mapred.job.name="Experiment"
再来看一样过程和内容
$ ls test_jar/
cache.txt cache2.txt
$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)
$ hadoop dfs -put cachedir.jar samples/cachefile
$ hadoop dfs -cat /user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt
$ cat test_jar/cache.txt
This is just the cache string
$ cat test_jar/cache2.txt
This is just the second cache string
$ hadoop dfs -ls /user/me/samples/cachefile/out
Found 1 items
/user/me/samples/cachefile/out/part-00000 <r 3> 69
$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string
This is just the second cache string
- 点赞
- 收藏
- 关注作者
评论(0)