行为抽象和Lambda分区

举报
xcc-2022 发表于 2022/07/22 14:48:56 2022/07/22
【摘要】 MapReduce功能实现系列:      MapReduce功能实现一—Hbase和Hdfs之间数据相互转换      MapReduce功能实现二—排序      MapReduce功能实现三—Top N      MapReduce功能实现四—小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)      MapReduce功能实现五—去重(Distinct)、计数(C...

MapReduce功能实现系列
      MapReduce功能实现一—Hbase和Hdfs之间数据相互转换
      MapReduce功能实现二—排序
      MapReduce功能实现三—Top N
      MapReduce功能实现四—小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
      MapReduce功能实现五—去重(Distinct)、计数(Count)
      MapReduce功能实现六—最大值(Max)、求和(Sum)、平均值(Avg)
      MapReduce功能实现七—小综合(多个job串行处理计算平均值)
      MapReduce功能实现八—分区(Partition)
      MapReduce功能实现九—Pv、Uv
      MapReduce功能实现十—倒排索引(Inverted Index)
      MapReduce功能实现十一—join
 

一、背景

  在Hadoop的MapReduce过程中,每个map task处理完数据后,如果存在自定义Combiner类,会先进行一次本地的reduce操作,然后把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

上面的getPartition函数的作用:

  1. 获取key的哈希值
  2. 使用key的哈希值对reduce任务数求模
  3. 这样做的目的是可以把(key,value)对均匀的分发到各个对应编号的reduce task节点上,达到reduce task节点的负载均衡。

上面的代码只是实现了(key,value)键值对的均匀分布,但是无法实现如下需求:

  1. 假设输入的数据文件有4个,里面包含各个部门各个季度的销售额
  2. 使用mapreduce程序进行统计各个部门全年销售额,同时每个部门对应一个输出文件

  由于输出的文件是区分数据类型的(部门类型),所以这个时候就需要我们自定义partition,分别把各个部门的数据分发到各自的reduce task上。

  在面试的时候我能够以WorldCount为例将MapReduce的过程说清楚,但是面试官可能会问如何把想要的数据放到一个reduce中呢?一开我还有点懵,后来面试结束后觉得这是在问自定义分区啊。(在hive中可以通过distribute by实现)
 

二、自定义分区

  自定义分区很简单,我们只需要继承抽象类Partitioner,重写getPartition方法即可,另外还要给任务设置分区:job.setPartitionerClass(),就可以了。

注意:自定义分区的数量需要和reduce task的数量保持一致。设置分区数:job.setNumReduceTasks(3);

1.模拟数据:

[hadoop@h71 q1]$ vi aa.txt 
aa 1 2
bb 2 22
cc 11
dd 1
ee 99 99999
ff 12 23123
注意:这里的分隔符是/t(Tab键)而不是空格
[hadoop@h71 q1]$ hadoop fs -put aa.txt /input

2.java代码:

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Partitioner;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  
 
public class MyPartitioner {  
 
    public static class MyPartitionerMap extends Mapper<LongWritable, Text, Text, Text> {  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)  
        throws java.io.IOException, InterruptedException {  
            String arr_value[] = value.toString().split("\t");  
            if (arr_value.length > 3) {  
                context.write(new Text("long"), value);  
            } else if (arr_value.length < 3) {  
                context.write(new Text("short"), value);  
            } else {  
                context.write(new Text("right"), value);  
            }  
        }  
    }  
  
    /** 
    * partitioner的输入就是map的输出 
    */  
    public static class MyPartitionerPar extends Partitioner<Text, Text> {  
        @Override  
        public int getPartition(Text key, Text value, int numPartitions) {  
            int result = 0;  
            /*********************************************************************/  
            /***key.toString().equals("long")  must use toString()!!!!  ***/  
            /***开始的时候我没有用 ,导致都在一个区里,结果也在一个reduce输出文件中。  ***/  
            /********************************************************************/  
            if (key.toString().equals("long")) {  
                result = 0 % numPartitions;  
            } else if (key.toString().equals("short")) {  
                result = 1 % numPartitions;  
            } else if (key.toString().equals("right")) {  
                result = 2 % numPartitions;  
            }  
            return result;  
        }  
    }  
  
    public static class MyPartitionerReduce extends Reducer<Text, Text, Text, Text> {  
        protected void reduce(Text key, java.lang.Iterable<Text> value, Context context) throws java.io.IOException,  
        InterruptedException {  
            for (Text val : value) {  
                context.write(key, val);  
                //context.write(key, val);  
            }  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        Configuration conf = new Configuration();  
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: MyPartitioner <in> <out>");  
            System.exit(2);  
        }
        conf.set("mapred.jar","mp1.jar");
        Job job = new Job(conf, "MyPartitioner");  
        job.setNumReduceTasks(3);  
          
        job.setJarByClass(MyPartitioner.class);  
          
        job.setMapperClass(MyPartitionerMap.class);  
          
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
          
        job.setPartitionerClass(MyPartitionerPar.class);  
        job.setReducerClass(MyPartitionerReduce.class);  
          
        job.setOutputKeyClass(NullWritable.class);  
        job.setOutputValueClass(Text.class);  
          
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}  

3.执行:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MyPartitioner.java 
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MyPartitioner*class
[hadoop@h71 q1]$ hadoop jar xx.jar MyPartitioner /input/aa.txt /output

4.查看数据:

[hadoop@h71 q1]$ hadoop fs -lsr /output
rw-r--r--   2 hadoop supergroup          0 2017-03-18 22:55 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         36 2017-03-18 22:55 /output/part-r-00000
-rw-r--r--   2 hadoop supergroup         23 2017-03-18 22:55 /output/part-r-00001
-rw-r--r--   2 hadoop supergroup         27 2017-03-18 22:55 /output/part-r-00002

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
long    ff      12      23      123
long    ee      99      99      999
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00001
short   dd      1
short   cc      11
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00002
right   bb      2       22
right   aa      1       2
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。