《Spark Streaming实时流式大数据处理实战》 ——3.7 共 享 变 量

举报
华章计算机 发表于 2020/02/22 18:33:22 2020/02/22
【摘要】 本节书摘来自华章计算机《Spark Streaming实时流式大数据处理实战》 —— 书中第3章,第3.7.1节,作者是肖力涛 。

3.7  共 享 变 量

  通过前面的介绍,我们知道Spark是多机器集群部署的,分为Driver、Master和Worker。Master负责资源调度,Worker是不同的运算节点,由Master统一调度,而Driver是我们提交Spark程序的节点,并且所有的reduce类型的操作都会汇总到Driver节点进行整合。

  节点之间会给每个节点传递一个map、reduce等操作函数的独立副本,这些变量也会被复制到每台机器上,而节点之间的运算是相互独立的。当我们利用RDD操作(如map、reduce)在远程节点执行一个功能函数时,其会在该节点开辟一块单独的变量空间供函数使用。

  这些变量会被复制到每一台机器上,并且当变量发生改变时,并不会同步传播回Driver程序。如果进行通用支持,任务间的读写共享变量需要大量的同步操作,这会导致低效。所以,Spark提供了两种受限类型的共享变量用于两种常见的使用模式:广播变量和累加器。

3.7.1  累加器(Accumulator)

  顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效地应用于并行操作中。累加器能够用来实现对数据的统计和求和操作。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。

  如果创建了一个具体名称的累加器,它可以在Spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用,如图3.8所示。

 image.png

图3.8  累加器展示图

  Spark内置了数值型累加器,一个数值累加器可以由SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()函数来创建,分别累加Long型或Double型数据。

  之后节点上的任务可以利用add方法进行累加操作,但是它们并不能读取累加器的值。只有Driver程序能够通过value方法读取累加器的值,其具体使用方式如下:

  

  scala> val accum = sc.longAccumulator("My Accumulator")

  accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0,

    name: Some(My Accumulator), value: 0)

  

  scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

  ...

  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

  

  scala> accum.value

  res2: Long = 10

  

  上面所述的代码是使用了累加器的内建支持类型Long,当然也可以通过集成AccumulatorV2的方式来创建支持我们自定义类型的累加器。

  AccumulatorV2是一个包含一些方法的抽象类,其中一些方法必须被覆写:reset方法使得累加器能够被重置为0,add方法即添加另一个值到累加器中,merge方法能够将另一个同类型累加器整合到当前累加器中。

  另外,其他必须覆写的方法可以参考API文档。这里我们参考官网的一个例子。假设有一个MyVector类型,表示数学中的向量,可以用以下方式来声明MyVector累加器:

  

  class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

    // 创建全0向量

    private val myVector: MyVector = MyVector.createZeroVector

    // 重置操作

    def reset(): Unit = {

      myVector.reset()

    }

    // 向量相加

    def add(v: MyVector): Unit = {

      myVector.add(v)

    }

    ...

  }

  // 创建向量类型的累加器

  val myVectorAcc = new VectorAccumulatorV2

  // 将累加器注册到Spark上下文中

  sc.register(myVectorAcc, "MyVectorAcc1")

  

  值得一提的是,对于自定义类型的累加器,我们可以设置不同于相加元素的输出元素。

  累加器只有在Action操作中才会被更新,Spark保证每个任务对于累加器的更新只会执行一次,如重新启动任务并不会更新累加器的值。在Transformation操作中,如果Task、Job或Stages被重新执行(根据计算图重新计算结果),那么累加器的更新有可能被执行多次。

  我们知道,Transformation会建立计算图,只有Action操作才会触发真正的计算,累加器也同样遵循这个懒惰(lazy)原则,即如果只在Transformation操作中调用累加器,其结果并不会改变,示例如下:

  // 创建long型累加器

  val accum = sc.longAccumulator

  // 在map操作内累加器进行累加

  data.map { x => accum.add(x); x }

  // 由于没有任何Action操作,所以map操作并没有被执行,accum值还是0

?特别注意:上文中关于累加器的使用只适合于Spark 2.0.0之后的版本,在此之前的版本中,累加器的声明方式如下:

  scala> val accum = sc.accumulator(0, "My Accumulator")

  accum: spark.Accumulator[Int] = 0

  scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

  ...

  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

  scala> accum.value

  res2: Int = 10

  

  这点在使用不同版本的Spark时要特别注意,因为在Spark 2.0.0之后的版本API接口有了很大变化。


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。