《进击大数据》系列教程之MapReduce篇

举报
小米粒-biubiubiu 发表于 2020/12/25 22:53:09 2020/12/25
【摘要】 一、MapReduce 安装 (1)分布式计算概述   访问 master:8088 查看yarn 是否启动成功。 (2)验证mapreduce 是否安装成功 运行 hadoop 安装包中 自带的 mapreduce 正则匹配例子。  看到控制台有如下输出说明mapReduce 任务正在运行中,同时可以在yarn 监控界面上看到任务执行记录 二、ha...

一、MapReduce 安装

(1)分布式计算概述

 

访问 master:8088 查看yarn 是否启动成功。

(2)验证mapreduce 是否安装成功

运行 hadoop 安装包中 自带的 mapreduce 正则匹配例子。

 看到控制台有如下输出说明mapReduce 任务正在运行中,同时可以在yarn 监控界面上看到任务执行记录

二、hadoop 序列化机制


使用 hadoop 的 writeable 接口 实现序列化


  
  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>2.7.5</version>
  5. <dependency>

  
  1. @Data
  2. @NoArgsConstructor
  3. @AllArgsConstructor
  4. @ToString
  5. public class BlockWritable implements Writable {
  6. private long blockId;
  7. private long numBytes;
  8. private long generationStamp;
  9. @Override
  10. public void write(DataOutput out) throws IOException {
  11. out.writeLong(blockId);
  12. out.writeLong(numBytes);
  13. out.writeLong(generationStamp);
  14. }
  15. @Override
  16. public void readFields(DataInput in) throws IOException {
  17. this.blockId = in.readLong();
  18. this.numBytes = in.readLong();
  19. this.generationStamp = in.readLong();
  20. }
  21. public static void main(String[] args) throws IOException {
  22. //序列化
  23. BlockWritable blockWritable = new BlockWritable(34234L, 234324345L, System.currentTimeMillis());
  24. DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream("D:/block.txt"));
  25. blockWritable.write(dataOutputStream);
  26. //反序列化
  27. Writable writable = WritableFactories.newInstance(BlockWritable.class);
  28. DataInputStream dataInputStream = new DataInputStream(new FileInputStream("D:/block.txt"));
  29. writable.readFields(dataInputStream);
  30. System.out.println((BlockWritable) writable);
  31. }
  32. }

hadoop 封装的一套序列化机制,序列化之后文件大小比java 的序列化要小很多,在大数据量的情况下,对性能有很大的提升。

三、使用mapReduce 实现分布式文本行数计算

(1)分布式文本行数计算

(2)项目添加mapReduce 依赖


  
  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-mapreduce-client-core</artifactId>
  4. <version>2.7.5</version>
  5. <dependency>

 (3)编写mapReduce代码


  
  1. package com.dzx.hadoopdemo.mapred;
  2. import org.apache.hadoop.conf.Configurable;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapred.FileInputFormat;
  8. import org.apache.hadoop.mapred.FileOutputFormat;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapred.JobContext;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import java.io.FileInputStream;
  15. import java.io.FileOutputStream;
  16. import java.io.IOException;
  17. /**
  18. * @author duanzhaoxu
  19. * @ClassName:
  20. * @Description:
  21. * @date 2020年12月24日 14:28:59
  22. */
  23. public class DistributeCount {
  24. public static class ToOneMapper extends Mapper<Object, Text, Text, IntWritable> {
  25. private final static IntWritable ONE = new IntWritable(1);
  26. private Text text = new Text();
  27. @Override
  28. protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  29. this.text.set("count");
  30. context.write(this.text, ONE);
  31. }
  32. }
  33. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  34. private IntWritable result = new IntWritable(0);
  35. @Override
  36. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  37. int sum = 0;
  38. for (IntWritable val : values) {
  39. sum += val.get();
  40. }
  41. result.set(sum);
  42. context.write(key, result);
  43. }
  44. }
  45. public static void main(String[] args) throws Exception {
  46. Configuration configuration = new Configuration();
  47. //创建JOB
  48. Job job = Job.getInstance(configuration, "distribute-count");
  49. //设置启动类
  50. job.setJarByClass(DistributeCount.class);
  51. //设置mapper类
  52. job.setMapperClass(ToOneMapper.class);
  53. // job.setCombinerClass(IntSumReducer.class);
  54. //设置reduce类
  55. job.setReducerClass(IntSumReducer.class);
  56. //设置输出结果key类型
  57. job.setOutputKeyClass(Text.class);
  58. //设置输出结果value类型
  59. job.setOutputValueClass(IntWritable.class);
  60. JobConf jobConf = new JobConf(configuration);
  61. //设置文件输入路径
  62. FileInputFormat.addInputPath(jobConf, new Path(args[0]));
  63. //设置结果输出文件路径
  64. FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
  65. //等待任务执行完成之后结束进程,设置为true会打印一些日志
  66. System.exit(job.waitForCompletion(true) ? 0 : 1);
  67. }
  68. }

(4)执行job

将写好的mapReduce 代码打包成 mapreduce-course-1.0-SNAPSHOT.jar

准备一个较大的文本文件 big.txt 上传到 hdfs 上

hadoop  fs  -mkdir  -p  /user/hadoop-twq/mr/count/input

hadoop  fs -put  bih.txt   /user/hadoop-twq/mr/count/input/

yarn jar  mapreduce-course-1.0-SNAPSHOT.jar  com.dzx.hadoopdemo.mapred.DistributeCount   /user/hadoop-twq/mr/count/input/big.txt     /user/hadoop-twq/mr/count/output 

 如图所示任务执行完成之后 会在 output 下面 生成一个文件 ,查看文件内容 里面 显示 count  21000104 ,说明big.txt 文本文件有2100多万行数据

如果再次执行job,会报 输出 文件已存在的错误,要先删除原输出文件 

四、block与map的input  split的关系

一个block -》一个 input  split

一个不足一个block大小的文件 -》 一个input split

假设每个block块的大小是256M,那么一个 326M的big.txt 文件会被分成两个block存储,所以在运行job的时候,就可以在yarn监控界面上看到对应的map 任务有两个。从以下的日志输出也能看出这一点。

五、MapReduce在yarn上运行的原理


  
  1. //设置reduce任务数
  2. job.setNumReduceTasks(2)

RM代指yarn 的 ResourceManager

六、MapReduce 内存cpu资源配置

在 mapred-site.xml 增加如下配置

然后将以上 配置 同步到 slave1 和 slave2 

scp  mapred-site.xml hadoopq-twq@slave1:~/bigdata/hadoop-2.7.5/etc/hadoop/

scp  mapred-site.xml hadoopq-twq@slave2:~/bigdata/hadoop-2.7.5/etc/hadoop/

七、MapReduce 中的 Combiner 

(1)Combiner 讲解

使用combiner 提前在 每台机器上面 reduce 数据,减少最终数据的网络传输,提升性能。

代码中的 实现: job.setCombinerClass(IntSumReduce.class);

八、使用mapReduce 实现 wordCount 

(1)代码编写


  
  1. package com.dzx.hadoopdemo.mapred;
  2. import org.apache.commons.io.FileUtils;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileUtil;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapred.FileInputFormat;
  9. import org.apache.hadoop.mapred.FileOutputFormat;
  10. import org.apache.hadoop.mapred.JobConf;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import java.io.File;
  15. import java.io.IOException;
  16. import java.util.StringTokenizer;
  17. /**
  18. * @author duanzhaoxu
  19. * @ClassName:
  20. * @Description:
  21. * @date 2020年12月25日 11:06:53
  22. */
  23. public class WordCount {
  24. public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
  25. private Text text = new Text();
  26. private final static IntWritable ONE = new IntWritable(1);
  27. @Override
  28. protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  29. // String s = value.toString();
  30. // String[] strArray = s.split(" ");
  31. // for (String item : strArray) {
  32. // text.set(item);
  33. // context.write(text, ONE);
  34. // }
  35. StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
  36. while (stringTokenizer.hasMoreTokens()) {
  37. text.set(stringTokenizer.nextToken());
  38. context.write(text, ONE);
  39. }
  40. }
  41. }
  42. public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  43. private IntWritable res = new IntWritable(0);
  44. @Override
  45. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  46. int sum = 0;
  47. for (IntWritable val : values) {
  48. sum += val.get();
  49. }
  50. res.set(sum);
  51. context.write(key, res);
  52. }
  53. }
  54. public static void main(String[] args) throws Exception {
  55. File file = new File(args[1]);
  56. if (file.exists()) {
  57. FileUtils.deleteQuietly(file);
  58. }
  59. Configuration configuration = new Configuration();
  60. Job job = Job.getInstance(configuration, "word-count");
  61. job.setJarByClass(WordCount.class);
  62. job.setMapperClass(WordCountMapper.class);
  63. job.setCombinerClass(WordCountReduce.class);
  64. job.setReducerClass(WordCountReduce.class);
  65. job.setOutputKeyClass(Text.class);
  66. job.setOutputValueClass(IntWritable.class);
  67. job.getConfiguration().set("yarn.app.mapreduce.am.resource.mb", "512");
  68. job.getConfiguration().set("yarn.app.mapreduce.am.command-opts", "-Xmx250m");
  69. job.getConfiguration().set("yarn.app.mapreduce.am.resource.cpu-vcores", "1");
  70. job.getConfiguration().set("mapreduce.map.memory.mb", "400");
  71. job.getConfiguration().set("mapreduce.map.java.opts", "-Xmx200m");
  72. job.getConfiguration().set("mapreduce.map.cpu.vcores", "1");
  73. job.getConfiguration().set("mapreduce.reduce.memory.mb", "400");
  74. job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xmx200m");
  75. job.getConfiguration().set("mapreduce.reduce.cpu.vcores", "1");
  76. JobConf jobConf = new JobConf(configuration);
  77. FileInputFormat.addInputPath(jobConf, new Path(args[0]));
  78. FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
  79. System.out.println(job.waitForCompletion(true) ? 0 : 1);
  80. }
  81. }

 将编写好的代码打成mapreduce-wordcount.jar包上传到服务器上,执行如下命令:

hadoop jar  mapreduce-wordcount.jar  com.dzx.hadoopdemo.mapred.WordCount  /user/hadoop-twq/mr/count/input/big_word.txt     /user/hadoop-twq/mr/count/output  

等待任务执行完成之后查看 结果输出文件,看到如下内容,说明完成了单词数量的统计

提高虚拟内存配置 

 重启yarn 之后可以看到 虚拟内存被放大了 4倍。

(2)word count 程序详解  - shuffle

job设置了 reduceTask 为 2  的情况下

 如图可看到,在maptask的 combine 阶段会将 map的结果按照 key 的 字母自然顺序进行排序。

(3)自定义分区器

在设置了 reduceTask 为2 的情况下,最终的任务输出文件会产出两个结果集文件,那么这个数据的分区是如何实现的,这里面就涉及到分区的规则 了。

hadoop 默认是按照 key  的哈希值进行分区的。

实际上就是 每个单词的hash值 对2 进行取模 

自定义分区器


  
  1. package com.dzx.hadoopdemo.mapred;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. * @author duanzhaoxu
  7. * @ClassName:
  8. * @Description:
  9. * @date 2020年12月25日 14:34:47
  10. */
  11. public class CustomPartitiner extends Partitioner<Text, IntWritable> {
  12. //自定义分区器
  13. @Override
  14. public int getPartition(Text text, IntWritable intWritable, int i) {
  15. if (text.toString().contains("s")) {
  16. return 0;
  17. }
  18. return 1;
  19. }
  20. }

重新打包上传到服务器,运行任务,发现key包含 s的结果输出到了part0 文件,key不包含s的结果输出到part1文件。

(4)MapReduce 应用

1. distinct 问题

利用mapReduce的key天然去重,把map输入的value作为reduce的key,即可自动去重

2.distcp

将hdfs  nn1 节点数据拷贝到 nn2节点

distcp  hdfs://nn1:8020/source/first   hdfs://nn1:8020/source/second   hdfs://nn2:8020/target

九、hadoop 压缩机制

通过一定的算法对数据进行特殊编码,使得数据占用的存储空间比较小,这个过程我们称之为压缩,反之为解压缩。

不管哪种压缩工具都需要权衡时间和空间,在大数据领域内还要考虑压缩文件的可分割性

Hadoop 支持的压缩工具有 ,DEFAULT,bzip,Snappy

十、avro 行式存储 和 parquet列式存储(暂不更新)

十一、avro文件和parquet文件(重要)的读写(暂不更新)

十二、sequenceFile 文件的读写(暂不更新)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/111608474

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。