【SparkAPI】flatMap、flatMapToDouble、flatMapToPair、flatMapValues

举报
Copy工程师 发表于 2022/01/24 15:36:27 2022/01/24
【摘要】 JavaPairRDD的flatMap方法讲解 官方文档/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ 说明首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD然后将结果展平。 ...

JavaPairRDD的flatMap方法讲解

官方文档
/**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
说明

首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD
然后将结果展平。

函数原型
// java
public static <U> JavaRDD<U> flatMap(FlatMapFunction<T,U> f)
// scala
def flatMap[U](f: FlatMapFunction[(K, V), U]): JavaRDD[U]
示例
public class FlatMap {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        
        JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(
                new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),
                new Tuple2<String, String>("cat", "13"), new Tuple2<String, String>("pig", "44")), 2);
                
        // 数据扁平化
        JavaRDD<String> javaRDD = javaPairRDD1.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
            public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                List<String> list = Lists.newArrayList();
                list.add(stringStringTuple2._1);
                list.add(stringStringTuple2._2);
                return list.iterator();
            }
        });
        
        // 输出数据
        javaRDD.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
}

结果
19/03/21 20:49:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
cat
11
dog
22
19/03/21 20:49:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
19/03/21 20:49:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4900 bytes)
19/03/21 20:49:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/21 20:49:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 158 ms on localhost (executor driver) (1/2)
cat
13
pig
44
19/03/21 20:49:54 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver

扁平化的效果就是每次获取一个元素然后返回一个迭代器
就比如
1,3
5,4
6,9
啪,一巴掌摁下去 就成了
1
3
5
4
6
9

JavaPairRDD的flatMapToDouble方法讲解

官方文档
 /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
说明

首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD,然后将结果展平。

函数原型
// java
public static JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f)
// scala
def flatMapToDouble(f: DoubleFlatMapFunction[(K, V)]): JavaDoubleRDD
示例
public class FlatMapToDouble {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(
                new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),
                new Tuple2<String, String>("cat", "13"), new Tuple2<String, String>("pig", "44")), 2);
        JavaDoubleRDD javaDoubleRDD = javaPairRDD1.flatMapToDouble(new DoubleFlatMapFunction<Tuple2<String, String>>() {
            public Iterator<Double> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                return Arrays.asList(Double.parseDouble(stringStringTuple2._2)).iterator();
            }
        });
        
        // 输出
        javaDoubleRDD.foreach(new VoidFunction<Double>() {
            public void call(Double aDouble) throws Exception {
                System.out.println(aDouble);
            }
        });
    }
}
结果
19/03/22 11:17:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
11.0
22.0
19/03/22 11:17:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
19/03/22 11:17:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4900 bytes)
19/03/22 11:17:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/22 11:17:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 265 ms on localhost (executor driver) (1/2)
13.0
44.0
19/03/22 11:17:01 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 708 bytes result sent to driver

JavaPairRDD的flatMapToPair方法讲解

官方文档
 /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
说明

首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD,然后将结果展平。

函数原型
// java
public static <K2,V2> JavaPairRDD<K2,V2> flatMapToPair(PairFlatMapFunction<T,K2,V2> f)
// scala
def flatMapToPair[K2, V2](f: PairFlatMapFunction[(K, V), K2, V2]): JavaPairRDD[K2, V2]
示例
public class flatMapToPair {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("xiaoming,12345,1","wangxi,546546,1","liming,789789897,2"),2);

        // javaRDD-》javaPairRDD
        System.out.println("flatMapToPair过程:");
        JavaPairRDD<String,String> javaPairRDD = javaRDD.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
            public Iterator<Tuple2<String, String>> call(String s) throws Exception {
                String[] strings = s.split(",",-1);
                List<Tuple2<String,String>> list = Lists.newArrayList();
                for (int i = 0; i < strings.length; i++) {
                    list.add(new Tuple2<String, String>(strings[i],"1"));
                }
                return list.iterator();
            }
        });
        System.out.println("输出结果:");
        javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
            public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
                System.out.println(stringStringTuple2);
            }
        });

        JavaPairRDD<String,String> javaPairRDD1 = javaRDD.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) throws Exception {
                String[] strings = s.split(",",-1);
                return new Tuple2<String, String>(strings[0], s);
            }
        });

        // 一个计算字符串中相邻字符出现的次数
        // 填充数据
        String ss = "A;B;C;D;B;D;C;B;D;A;E;D;C;A;B";
        JavaRDD<String> javaRDDs = sc.parallelize(Arrays.asList(ss));
        // 遍历RDD
        JavaRDD<String[]> javaRDD1 = javaRDDs.map(new Function<String, String[]>() {
            public String[] call(String s) throws Exception {
                return s.split(";",-1);
            }
        });
       /**
         *   A; B; C; D; B; D; C; B; D; A; E; D; C; A; B
         */
        //  flatMapToPair 过程
        System.out.println("flatMapToPair 过程:");
        JavaPairRDD<String,Integer> javaPairRDD2 = javaRDD1.flatMapToPair(new PairFlatMapFunction<String[], String, Integer>() {
            public Iterator<Tuple2<String, Integer>> call(String[] strings) throws Exception {
                List<Tuple2<String,Integer>> list = Lists.newArrayList();
                for (int i = 0; i < strings.length-1; i++) {
                    String ss = strings[i]+strings[i+1];
                    list.add(new Tuple2<String, Integer>(ss, 1));
                }
                return list.iterator();
            }
        });
        // 输出中间结果
        System.out.println("输出中间结果:");
        javaPairRDD2.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
        // 合并key 计算次数
        JavaPairRDD<String,Integer> javaPairRDD3 = javaPairRDD2.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });
        // 输出结果
        System.out.println("输出结果:");
        javaPairRDD3.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
    }
}
结果
19/03/22 15:40:45 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.124.209.6, 65099, None)
flatMapToPair过程:
输出结果:
19/03/22 15:40:46 INFO SparkContext: Starting job: foreach at flatMapToPair.java:36
19/03/22 15:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4846 bytes)
19/03/22 15:40:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
(xiaoming,1)
(12345,1)
(1,1)
(wangxi,1)
(546546,1)
(1,1)
(liming,1)
(789789897,1)
(2,1)
19/03/22 15:40:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver
19/03/22 15:40:47 INFO DAGScheduler: Job 0 finished: foreach at flatMapToPair.java:36, took 0.886961 s

// 小示例开始
flatMapToPair 过程:
输出中间结果:
19/03/22 15:40:47 INFO SparkContext: Starting job: foreach at flatMapToPair.java:76
19/03/22 15:40:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
(AB,1)
(BC,1)
(CD,1)
(DB,1)
(BD,1)
(DC,1)
(CB,1)
(BD,1)
(DA,1)
(AE,1)
(ED,1)
(DC,1)
(CA,1)
(AB,1)
19/03/22 15:40:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 665 bytes result sent to driver
19/03/22 15:40:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 18 ms on localhost (executor driver) (1/1)
19/03/22 15:40:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
19/03/22 15:40:47 INFO DAGScheduler: ResultStage 1 (foreach at flatMapToPair.java:76) finished in 0.019 s
19/03/22 15:40:47 INFO DAGScheduler: Job 1 finished: foreach at flatMapToPair.java:76, took 0.065599 s
输出结果:
19/03/22 15:40:47 INFO SparkContext: Starting job: foreach at flatMapToPair.java:89
19/03/22 15:40:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
(BC,1)
(CA,1)
(CB,1)
(DA,1)
(ED,1)
(AB,2)
(AE,1)
(CD,1)
(DB,1)
(DC,2)
(BD,2)
19/03/22 15:40:47 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 1095 bytes result sent to driver

flatMapToPair也可以看做是flatMap和MapToPair的两个过程的合并。

JavaPairRDD的flatMapValues方法讲解

官方文档
**
   * Pass each value in the key-value pair RDD through a flatMap function without changing the
   * keys; this also retains the original RDD's partitioning.
   */
说明

通过flatmap函数传递键值对rdd中的每个值,而不更改键;
这还保留了原始RDD的分区。

这个方法是扁平化的value 如果value里有多个值并且以,连接 呢么每个value返回的就是这些值一个list
和mapValue不同的是 mapValue只是遍历value值 输入时什么类型输出也是什么类型

函数原型
// java
public <U> JavaPairRDD<K,U> flatMapValues(Function<V,Iterable<U>> f)
// scala
def flatMapValues[U](f: Function[V, Iterable[U]]): JavaPairRDD[K, U]
示例
public class FlatMapValues {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(
                new Tuple2<String, String>("cat", "11,23,44"), new Tuple2<String, String>("dog", "22,11"),
                new Tuple2<String, String>("cat", "13,33,66,77"), new Tuple2<String, String>("pig", "44")), 2);
		javaPairRDD1.foreach(new VoidFunction<Tuple2<String, String>>() {
           public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
                System.out.println(stringStringTuple2);
            }
        });
        // 输入的是value值 输出的是list
        JavaPairRDD<String,String> javaPairRDD = javaPairRDD1.flatMapValues(new Function<String, Iterable<String>>() {
            public Iterable<String> call(String s) throws Exception {
                String[] strings =s.split(",", -1);
                List<String> list = Lists.newArrayList();
                for (String string : strings) {
                    list.add(string);
                }
                return list;
            }
        });
        // 输出
        javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
            public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
                System.out.println(stringStringTuple2);
            }
        });
    }
}
结果
// 源数据
(cat,11,23,44)
(dog,22,11)
(cat,13,33,66,77)
(pig,44)
19/03/22 15:57:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
// 结果数据
(cat,11)
(cat,23)
(cat,44)
(dog,22)
(dog,11)
19/03/22 15:57:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
19/03/22 15:57:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4909 bytes)
19/03/22 15:57:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/22 15:57:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 142 ms on localhost (executor driver) (1/2)
(cat,13)
(cat,33)
(cat,66)
(cat,77)
(pig,44)
19/03/22 15:57:54 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。