【SparkAPI】countApprox、countApproxDistinct、countApproxDistinctByK

举报
Copy工程师 发表于 2022/01/24 15:38:32 2022/01/24
【摘要】 JavaPairRDD的context方法讲解 官方文档/** * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. * * The confidence is...

JavaPairRDD的context方法讲解

官方文档
/**
   * Approximate version of count() that returns a potentially incomplete result
   * within a timeout, even if not all tasks have finished.
   *
   * The confidence is the probability that the error bounds of the result will
   * contain the true value. That is, if countApprox were called repeatedly
   * with confidence 0.9, we would expect 90% of the results to contain the
   * true count. The confidence must be in the range [0,1] or an exception will
   * be thrown.
   *
   * @param timeout maximum time to wait for the job, in milliseconds
   * @param confidence the desired statistical confidence in the result
   * @return a potentially incomplete result, with error bounds
   */
说明

count()的相似版本,返回可能不完整的结果,即使不是所有任务都已完成也要在规定时间内完成。
置信度是结果的误差范围将包含真值。也就是说,如果反复调用count近似值置信度为0.9时,我们预计90%的结果将包含真实计数。置信度必须在[0,1]范围内,否则异常将被扔掉。

@timeout 参数超时等待作业的最长时间(毫秒)
@confidence 参数置信度结果中所需的统计置信度
@返回一个可能不完整的结果,带有错误界限

函数原型
// java
public static PartialResult<BoundedDouble> countApprox(long timeout,
                                                       double confidence)
public static PartialResult<BoundedDouble> countApprox(long timeout)
// scala
def countApprox(timeout: Long): PartialResult[BoundedDouble]
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble]

JavaPairRDD的countApproxDistinct方法讲解

官方文档
/**
   * Return approximate number of distinct elements in the RDD.
   *
   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
   *
   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
   *                   It must be greater than 0.000017.
   */
说明

返回RDD中不同元素的近似数目。
所使用的算法基于streamlib实现的“HyperLogLog in Practice:

  • Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm"
    @参数相对精度。较小的值创建需要更多空间的计数器。
    必须大于0.000017。
函数原型
// java
public static long countApproxDistinct(double relativeSD)
// scala
def countApproxDistinct(relativeSD: Double): Long
示例
public class CountApproxDistinct {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);



        // 示例1 演示过程
        JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(
                new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),
                new Tuple2<String, String>("cat", "33"), new Tuple2<String, String>("pig", "44"),
                new Tuple2<String, String>("duck", "55"), new Tuple2<String, String>("cat", "66")), 3);

        System.out.println(javaPairRDD1.countApproxDistinct(0.1));
        System.out.println(javaPairRDD1.countApproxDistinct(0.001));
    }
}
结果
5
19/03/20 15:56:03 INFO DAGScheduler: Job 0 finished: countApproxDistinct at CountApproxDistinct.java:23, took 0.773368 s
19/03/20 15:56:03 INFO SparkContext: Starting job: countApproxDistinct at CountApproxDistinct.java:24
19/03/20 15:56:03 INFO DAGScheduler: Got job 1 (countApproxDistinct at CountApproxDistinct.java:24) with 3 output partitions
19/03/20 15:56:03 INFO DAGScheduler: ResultStage 1 (countApproxDistinct at CountApproxDistinct.java:24) finished in 0.469 s
19/03/20 15:56:03 INFO DAGScheduler: Job 1 finished: countApproxDistinct at CountApproxDistinct.java:24, took 0.521162 s
6
19/03/20 15:56:03 INFO SparkContext: Invoking stop() from shutdown hook

JavaPairRDD的countApproxDistinctByKey 方法讲解

官方文档
/**
   * Return approximate number of distinct values for each key in this RDD.
   */
说明

返回此RDD中每个键的近似不同值数

适用于键值对类型(tuple)的RDD。它countApproxDistinct 相似。但是返回的类型不同,这个计算的是RDD中每个key值的出现次数,返回的value值即次数。
参数relativeSD用于控制计算的精准度。 越小表示准确度越高。

函数原型
// java
public JavaPairRDD<K,Long> countApproxDistinctByKey(double relativeSD,
                                                    Partitioner partitioner)
public JavaPairRDD<K,Long> countApproxDistinctByKey(double relativeSD,
                                                    int numPartitions)
public JavaPairRDD<K,Long> countApproxDistinctByKey(double relativeSD)

// scala
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long]
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[(K, Long)]
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[(K, Long)]
示例
public class CountApproxDistinctByKey {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(
                new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),
                new Tuple2<String, String>("cat", "33"), new Tuple2<String, String>("pig", "44"),
                new Tuple2<String, String>("duck", "55"), new Tuple2<String, String>("cat", "66")), 3);
        
        JavaPairRDD<String, Long> javaPairRDD = javaPairRDD1.countApproxDistinctByKey(0.01);
        javaPairRDD.foreach(new VoidFunction<Tuple2<String, Long>>() {
            public void call(Tuple2<String, Long> stringLongTuple2) throws Exception {
                System.out.println(stringLongTuple2);
            }
        });

    }
}
结果
19/03/20 16:09:48 INFO Executor: Running task 2.0 in stage 3.0 (TID 11)
(duck,1)
(cat,3)
19/03/20 16:09:48 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 3 blocks
19/03/20 16:09:48 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
19/03/20 16:09:48 INFO Executor: Finished task 2.0 in stage 3.0 (TID 11). 1009 bytes result sent to driver
19/03/20 16:09:48 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 11) in 15 ms on localhost (executor driver) (3/3)
19/03/20 16:09:48 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
19/03/20 16:09:48 INFO DAGScheduler: ResultStage 3 (foreach at CountApproxDistinct.java:28) finished in 0.062 s
19/03/20 16:09:48 INFO DAGScheduler: Job 2 finished: foreach at CountApproxDistinct.java:28, took 0.317332 s
(dog,1)
(pig,1)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。