Hadoop之MapReduce02【自定义wordcount案例】
【摘要】
创建MapperTask
创建一个java类继承Mapper父类
接口形参说明
参数说明K1默认是一行一行读取的偏移量的类型V1默认读取的一行的类型K2用户处理完成后返回的数据的key的类型V...
创建MapperTask
创建一个java类继承Mapper父类
接口形参说明
参数 | 说明 |
---|---|
K1 | 默认是一行一行读取的偏移量的类型 |
V1 | 默认读取的一行的类型 |
K2 | 用户处理完成后返回的数据的key的类型 |
V2 | 用户处理完成后返回的value的类型 |
注意数据经过网络传输,所以需要序列化
数据类型 | 序列化类型 |
---|---|
Integer | IntWritable |
Long | LongWritable |
Double | DoubleWritable |
Float | FloatWritable |
String | Text |
null | NullWritable |
Boolean | BooleanWritable |
… |
/**
* 注意数据经过网络传输,所以需要序列化
*
* KEYIN:默认是一行一行读取的偏移量 long LongWritable
* VALUEIN:默认读取的一行的类型 String
*
* KEYOUT:用户处理完成后返回的数据的key String LongWritable
* VALUEOUT:用户处理完成后返回的value integer IntWritable
* @author 波波烤鸭
* dengpbs@163.com
*/
public class MyMapperTask extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* Map阶段的业务逻辑写在Map方法中
* 默认是 每读取一行记录就会调用一次该方法
* @param key 读取的偏移量
* @param value 读取的那行数据
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 根据空格切割单词
String[] words = line.split(" ");
for (String word : words) {
// 将单词作为key 将1作为值 以便于后续的数据分发
context.write(new Text(word), new IntWritable(1));
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
创建ReduceTask
创建java类继承自Reducer父类。
参数 | 说明 |
---|---|
KEYIN | 对应的是map阶段的 KEYOUT |
VALUEIN | 对应的是map阶段的 VALUEOUT |
KEYOUT | reduce逻辑处理的输出Key类型 |
VALUEOUT | reduce逻辑处理的输出Value类型 |
/**
* KEYIN和VALUEIN 对应的是map阶段的 KEYOUT和VALUEOUT
*
* KEYOUT: reduce逻辑处理的输出类型
* VALUEOUT:
* @author 波波烤鸭
* dengpbs@163.com
*/
public class MyReducerTask extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* @param key map阶段输出的key
* @param values map阶段输出的相同的key对应的数据集
* @param context 上下文
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count = 0 ;
// 统计同一个key下的单词的个数
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
创建启动工具类
package com.bobo.mr.wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WcTest {
public static void main(String[] args) throws Exception {
// 创建配置文件对象
Configuration conf = new Configuration(true);
// 获取Job对象
Job job = Job.getInstance(conf);
// 设置相关类
job.setJarByClass(WcTest.class);
// 指定 Map阶段和Reduce阶段的处理类
job.setMapperClass(MyMapperTask.class);
job.setReducerClass(MyReducerTask.class);
// 指定Map阶段的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定job的原始文件的输入输出路径 通过参数传入
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务,并等待响应
job.waitForCompletion(true);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
打包部署
maven打包为jar包
上传测试
在HDFS系统中创建wordcount案例文件夹,并测试
hadoop fs -mkdir -p /hdfs/wordcount/input
hadoop fs -put a.txt b.txt /hdfs/wordcount/input/
- 1
- 2
执行程序测试
hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/
- 1
执行成功
[root@hadoop-node01 ~]# hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/
19/04/03 16:56:43 INFO client.RMProxy: Connecting to ResourceManager at hadoop-node01/192.168.88.61:8032
19/04/03 16:56:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner t
o remedy this.19/04/03 16:56:48 INFO input.FileInputFormat: Total input paths to process : 2
19/04/03 16:56:49 INFO mapreduce.JobSubmitter: number of splits:2
19/04/03 16:56:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1554281786018_0001
19/04/03 16:56:52 INFO impl.YarnClientImpl: Submitted application application_1554281786018_0001
19/04/03 16:56:53 INFO mapreduce.Job: The url to track the job: http://hadoop-node01:8088/proxy/application_1554281786018_0001/
19/04/03 16:56:53 INFO mapreduce.Job: Running job: job_1554281786018_0001
19/04/03 16:57:14 INFO mapreduce.Job: Job job_1554281786018_0001 running in uber mode : false
19/04/03 16:57:14 INFO mapreduce.Job: map 0% reduce 0%
19/04/03 16:57:38 INFO mapreduce.Job: map 100% reduce 0%
19/04/03 16:57:56 INFO mapreduce.Job: map 100% reduce 100%
19/04/03 16:57:57 INFO mapreduce.Job: Job job_1554281786018_0001 completed successfully
19/04/03 16:57:57 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=181
FILE: Number of bytes written=321388
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=325
HDFS: Number of bytes written=87
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=46511
Total time spent by all reduces in occupied slots (ms)=12763
Total time spent by all map tasks (ms)=46511
Total time spent by all reduce tasks (ms)=12763
Total vcore-milliseconds taken by all map tasks=46511
Total vcore-milliseconds taken by all reduce tasks=12763
Total megabyte-milliseconds taken by all map tasks=47627264
Total megabyte-milliseconds taken by all reduce tasks=13069312
Map-Reduce Framework
Map input records=14
Map output records=14
Map output bytes=147
Map output materialized bytes=187
Input split bytes=234
Combine input records=0
Combine output records=0
Reduce input groups=10
Reduce shuffle bytes=187
Reduce input records=14
Reduce output records=10
Spilled Records=28
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=1049
CPU time spent (ms)=5040
Physical memory (bytes) snapshot=343056384
Virtual memory (bytes) snapshot=6182891520
Total committed heap usage (bytes)=251813888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=91
File Output Format Counters
Bytes Written=87
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
查看结果
[root@hadoop-node01 ~]# hadoop fs -cat /hdfs/wordcount/output/part-r-00000
ajax 1
bobo烤鸭 1
hello 2
java 2
mybatis 1
name 1
php 1
shell 2
spring 2
springmvc 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
OK~
文章来源: dpb-bobokaoya-sm.blog.csdn.net,作者:波波烤鸭,版权归原作者所有,如需转载,请联系作者。
原文链接:dpb-bobokaoya-sm.blog.csdn.net/article/details/88999814
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)