SparkStreaming介绍

举报
米兰的小铁匠 发表于 2020/06/16 10:46:38 2020/06/16
【摘要】 spark streaming 是在spark core基础上的一个高吞吐、高容错的无状态的微批处理框架,其是基于批量数据的处理。本文的设定是你对spark运行机制和rdd (the resilient distributed dataset)编程有一定的了解。

     spark streaming 是在spark core基础上的一个高吞吐、高容错的无状态的微批处理框架,其是基于批量数据的处理。(spark streaming uses a microbatch architecture for continuous data processing and relies on the exactly once neture of batch processing to ensure correctness)。其编程接口主要由streamingContext和DStream提供。本文的设定是你对spark运行机制和rdd (the resilient distributed dataset)编程有一定的了解。

 1. SparkStreaming 整体架构

SparkStreaming 和 Spark批处理一样,有Driver、Executor、clustermanager(本文不讲)、worker等基本概念。后面的内容会介绍这些组件里的细节。

2. StreamingContext

2.1 初始化

StreamingContext是Spark Streaming的入口类,首先创建sparkConf,

 val conf = new SparkConf()
 val ssc = new StreamingContext(conf, Seconds(duration.toString.toInt))
 ssc.start

 初始化StreamingContext

class StreamingContext private[streaming] (  _sc: SparkContext,  _cp: Checkpoint,  _batchDur: Duration ) extends Logging {
  private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf) }
  private[streaming] val graph: DStreamGraph
   private[streaming] val scheduler = new JobScheduler(this)
  // 开启作业流
  def start(): Unit = synchronized {

StreamingContenxt 有spark core中的 sparkContext、CheckPoint等spark core本身的属性,同时具备Duration这个时间上的维度。

初始化StreamingContext时,会初始化SparkContext对象,其最终作业处理还是交给sparkContext处理,2.3 节会有介绍。

StreamingContext会初始化DStreamGraph用来处理Dstream的依赖关系, 2.2节会有介绍。

2.2 启动

在Driver中,StreamingContext会初始化JobScheduler,JobScheduler包含JobGenerator,JobGenerator会生成RecurringTimer对象,其负责定时生成Job,发送给Executor执行,Executor中Receiver会接收数据,执行从Driver提交过来的作业逻辑。

启动时,StreamingContext 调用start 启动作业

  executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()       // receiverTrack 启动
    jobGenerator.start()          // jobGenerator启动
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")

JobGenerator 通过sc 调用 DStreamGraph 生成job。

/**
 * This class generates jobs from DStreams as well as drives checkpointing and cleaning
 * up DStream metadata.
 */
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Generate jobs and perform checkpointing for the given `time`.  */  周期性的生成作业和checkpoint
private def generateJobs(time: Time): Unit = {
  // Checkpoint all RDDs marked for checkpointing to ensure their lineages are truncated periodically. 记录血缘关系
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  Try {
     // allocate received blocks to batch
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)    将receive接收的block分配给batch    2.3 节有介绍
    // generate jobs using allocated block  此处在DStreamGraph周期性生成job
    graph.generateJobs(time) 
  }

JobGenerator 中会周期性的作业, 记录血缘关系,将receiver接收的block分配给batch

2.3 DStream生成,作业提交

Driver端生成Receiver对象,发送至Executor端,Receiver接收流数据,收集成Block,同步至Driver所收集的block元数据信息,

/**
 * Here is the approach to schedule executors:
   First, schedule all the receivers with preferred locations (hosts), evenly among the
 *       executors running on those host.
 *        Then, schedule all other receivers evenly among all the executors such that overall
 *       distribution over all the receivers is even
 * This method is called when we start to launch receivers at the first time.
 * @return a map for receivers and their scheduled locations
 */
def scheduleReceivers(
    receivers: Seq[Receiver[_]],
    executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {


该方法指定receiver运行在那些Executor上,然后在executor上启动receiver,接收数据。

/**
 * Abstract class that is responsible for supervising a Receiver in the worker.
 *  It provides all the necessary interfaces for handling the data received by the receiver.
 * Supervisor 是 receiver监视器,同时负责数据的写操作。
 */
private[streaming] abstract class ReceiverSupervisor() {
def start(): Unit = {
    onStart()
    // 2.2 中 receiverTracker.start 调用
    startReceiver()
  }
   override protected def onReceiverStart(): Boolean = {
    val msg = RegisterReceiver(streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
    trackerEndpoint.askSync[Boolean](msg)
  }
 override def createBlockGenerator(
      blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
    // Cleanup BlockGenerators that have already been stopped
    val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
    stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    registeredBlockGenerators.add(newBlockGenerator)
    newBlockGenerator
  }

在receiver启动时,向Driver端ReceiverTracker进行通信,发送元数据信息,保持心跳。同时创建BlockGenerator。

/**
 * Generates batches of objects received by a
 * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
 * named blocks at regular intervals. This class starts two threads,
 * one to periodically start a new batch and prepare the previous batch of as a block,
 * the other to push the blocks into the block manager.
 *
 * Note: Do not create BlockGenerator instances directly inside receivers. Use
 * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
 */
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {
  /**
 * Push a single data item into the buffer.
 */
  // 接收数据
def addData(data: Any): Unit = {
  if (state == Active) {
    waitToPush()
    synchronized {
      if (state == Active) {
        currentBuffer += data
      } else {
}
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
  try {
    var newBlock: Block = null
    synchronized {
      if (currentBuffer.nonEmpty) {
        val newBlockBuffer = currentBuffer
        currentBuffer = new ArrayBuffer[Any]
        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
        listener.onGenerateBlock(blockId)
        //生成block
        newBlock = new Block(blockId, newBlockBuffer)
      }
    }

存储block 并将元信息发送给driver
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ): Unit = {
   // 存储block
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  // 元数据发送给Driver
  if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) {
    throw new SparkException("Failed to add block to receiver tracker.")
  }


BlockGenretor 完成数据接收,Block生成,回调ReceiverSupervisor存储block,并将元数据发送给driver,

回到Driver端.

class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
/** Allocate all unallocated blocks to the given batch. */
  def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
    // 分配block
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
    / Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          // 周期性生成rdd构成DStream 
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }    
        // 调用sparkcontext提交作业,回到sparkcontext
     val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => (), ())
    }

回到sparkcontext 通过DagScheduler 提交作业

class SparkContext(config: SparkConf) extends Logging {
/**
 * Submit a job for execution and return a FutureJob holding the result.
 * @param rdd target RDD to run tasks on
 * @param resultHandler callback to pass each result to
 * @param resultFunc function to be executed when the result is ready
 */
def submitJob[T, U, R](
    rdd: RDD[T] processPartition: Iterator[T] => U, partitions: Seq[Int],resultHandler: (Int, U) => Unit,
    resultFunc: => R): SimpleFutureAction[R] ={
  assertNotStopped()
  val cleanF = clean(processPartition)
  val callSite = getCallSite
  // 回到sparkcontext 通过DagScheduler 提交作业
  val waiter = dagScheduler.submitJob(
    rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
    partitions,   callSite,  resultHandler, localProperties.get)
  new SimpleFutureAction(waiter, resultFunc)
}

回到Dirver,ReceiverTracker 调用sc生成周期性时间间隔rdd构成DStream,通过sparkcontext调用DagScheduler提交作业,最终还是被转化成rdd和dag,是不是又回到了熟悉的spark作业流程。


3. DStream处理

    DStream 本质上是对rdd的一个封装,DStream中的每个rdd是包含来自一个时间间隔的数据,(spark represents stream as a continuous series of Rdds, Each rdd is processed as a batch)

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging {
  /** Time interval after which the DStream generates an RDD */
def slideDuration: Duration

/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]

/** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)

/**
 * Return a new DStream in which each RDD contains all the elements in seen in a   返回窗口时间上对应的rdd
 * sliding window of time over this DStream. The new DStream generates RDDs with
 * the same interval as this DStream.  窗口长度必须是duration的整数倍
 * @param windowDuration width of the window; must be a multiple of this DStream's interval.
 */
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * 'this' DStream will be registered as an output stream and therefore materialized.
 */
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
  val cleanedF = context.sparkContext.clean(foreachFunc, false)
  foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}

从DStream源码中我们可以看到它的关键特性

1. 后面的DStream依赖前面的DStream

2. DStream每个时间间隔生成一个rdd,也就是所说的miniRdd

DStream和rdd的关系就是随时间不断产生的rdd,对DStream的操作就是固定时间操作时间间隔上的rdd。就是在rdd的基础上增加了时间维度,spark streaming是一小批一小批的数据进行处理,且处理的逻辑在一批数据准备好之后才会进行计算。

对比spark原生rdd基础算子,DStream多出window、updateStateByKey、mapstateByKey、 foreachRdd等几个算子,下面介绍下这几个算子,其他rdd算子操作参照spark rdd基础操作,在这里不再介绍。和原生rdd一样,DStream的转换操作也是lazy机制,要有action操作才被触发。

3.1 window

     window是流计算中一个非常重要的概念,window操作是对过去固定长度的时间段的数据进行处理(windowing is the process of slicing up

 a data source along temporal boundaries),DStream调用window方法时会有两个参数duration(窗口长度)、slide(滑动步长,非必须) 生成WindowStream,只配置duration参数时是滚动窗口,加上slide参数是滑动窗口.

 class WindowedDStream[T: ClassTag](
    parent: DStream[T],   _windowDuration: Duration,   _slideDuration: Duration)
  extends DStream[T](parent.ssc) {
  override def compute(validTime: Time): Option[RDD[T]] = {
    val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) // 该窗口的终止时间 - 起始时间
    val rddsInWindow = parent.slice(currentWindow)
    Some(ssc.sc.union(rddsInWindow))
  }
}


 可以看到经过运算后是返回对应该窗口时间的rdd,然后你可以在基于该rdd进行函数运算生成新的DStream。

3.2 UpdateStateByKey

当你需要根据之前的计算结果和当前数据一起计算来求算结果时,可使用此算子,该算子为我们提供了记录及更新状态的功能。

/**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * In every batch the updateFunc will be called for each state even if there are no new values.
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   * @param updateFunc State update function. If `this` function returns None, then
   *                   corresponding state key-value pair will be eliminated.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
  }

在updateStateByKey中,你需要定义状态和状态更新函数,使用updateFunc来更新状态,Seq[V]为本次传入的数据,Option[S]为前次计算的结果,最后返回DStream,使用updateBykey时需要对所有数据进行扫描,再用key分组。如果数据量过大,可以考虑使用mapwithstate。

3.3 foreachRdd

   是sparkstreaming 中的常用的输出操作,

 /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
    val cleanedF = context.sparkContext.clean(foreachFunc, false)
    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
  }

  这里传入的foreachFunc会作用在DStream中的每个rdd上,通常foreachFunc会将rdd的数据输出到外部系统,同时也可以建立连接从外部系统如db读取数据,同时其他算子如print、saveAsxxx也可完成输出操作。

4. 流速控制

4.1 消费速率动态控制

      利用spark反压机制(backPressure),相应的配置项是spark.streaming.backpressure.enabled,默认值是false,如果需要开启的话设置为true,spark反压采用的是令牌桶机制,用RateController控制接收速率,

case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
/**
 * A component that estimates the rate at which an `InputDStream` should ingest
 * records, based on updates at every batch completion.
 * 用来评估inputStream的消费能力,根据消费能力调整接收速率
 * Please see `org.apache.spark.streaming.scheduler.RateController` for more details.
 */
private[streaming] trait RateEstimator extends Serializable {
 def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get(BACKPRESSURE_RATE_ESTIMATOR) match {
      case "pid" =>
        val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL)
        val integral = conf.get(BACKPRESSURE_PID_INTEGRAL)
        val derived = conf.get(BACKPRESSURE_PID_DERIVED)
        val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
        }
/**
 * Provides waitToPush() method to limit the rate at which receivers consume data.
 * updateRate 更新速率
 */
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
 def waitToPush(): Unit = {
    rateLimiter.acquire()
  }
 /**
   * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
   * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
     @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
   */
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

spark 会评估作业的消费能力,进而调整消费速率,使消费的数据量适应上消费能力,但你资源不足时,会造成消息堆积,消费延迟,并不能解决消费能力不足的问题.

4.2 消息队列端限制

     即静态限速,比如消费kafka时,可设置消费kafka的最大速率,防止作业初次启动时拉取过多数据,

spark.streaming.kafka.maxRatePerPartition 一次数据拉取的最大量就是 该maxRate * kafka partition数量  *  spark duration

4.3 动态资源分配

     当数据的生产速度存在波动时,所需要的计算资源也不一样,比如高峰时需要更多资源,低谷时又不需要那么多资源,我们可以使用spark.dynamicAllocation.enabled 配置项设置动态分配策略,但此特性还处于试验阶段。

if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
        logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
      } else if (!testing) {
        throw new SparkException("Dynamic allocation of executors requires the external " +
          "shuffle service. You may enable this through spark.shuffle.service.enabled.")
      }
    }
if (Utils.isDynamicAllocationEnabled(sc.conf) ||
      ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
      logWarning("Dynamic Allocation is enabled for this application. " +
        "Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " +
        "Write Ahead Log is not enabled for non-replayable sources. " +
        "See the programming guide for details on how to enable the Write Ahead Log.")
 }

5. 容错与调优简介

     Spark Streaming天然具有spark的容错的机制,有基于rdd lineage的容错机制,有使用checkpoint容错,有Driver和Executor的容错,Streaming模块也有自己的容错机制。调优方面spark本身的调优,streaming还有有些如保障作业7 * 24 调优机制,后续会专门写一篇SparkStreaming容错和调优,敬请期待和拍砖。

6. 各流框架特性对比

   针对市面上各大流计算框架,对比其吞吐、时延、时间语义等特性

     框架  特性

spark-streaming

structured-streaming

Flink

Storm

时延

High

Low

Very low

Low

吞吐量

High

High

High

Low

性能

Medium

High

High

Low

Stream-sql

Y

Y

Y

N

EventTime

N

Y

Y

N

动态资源

N

N

N

Y

Backpressure

Medium

Medium

Good

Medium

watemark

N

Y

Y

N

Model

microbatch

Native、batch

Native

Native

社区

维护

相对活跃

活跃

Decline

成熟度

High

Medium

High

Decline

7. Spark流展望

       Spark Streaming吞吐上还是有很大优势,但其本质上还是微批处理机制,并不是纯流处理,同时又无法支持EventTime时间语义这个致命弱点,同时满足不了极低的时延需求。Spark 1.3 后推出DataFrame的概念,将数据看做一张有界数据表,具备schema信息,Spark2.0后基于DataFrame推出Structured Streaming(结构化流处理)(Structured Streaming is a scalable and fault-tolerant stream processing micro-batch engine built on the Spark SQL engine),将流式的数据看做一张无界表 (unbounded table),新来的一条数据都作为新的一行加到表的后面。 Spark 2.3后提出Continuous Processing(持续查询)(which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees)的概念,同属于sql模块,结构化流处理支持catalyst查询编译优化和Tungsten优化,降低了开发者开发门槛,同时支持有状态计算、eventTime时间语义等Stream Processing基本概念。所以未来是什么,未来是流批一体?是sql一统天下?


ps 本文代码来自于spark源码


【版权声明】本文为华为云社区用户翻译文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容, 举报邮箱:cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。