SparkStreaming介绍
spark streaming 是在spark core基础上的一个高吞吐、高容错的无状态的微批处理框架,其是基于批量数据的处理。(spark streaming uses a microbatch architecture for continuous data processing and relies on the exactly once neture of batch processing to ensure correctness)。其编程接口主要由streamingContext和DStream提供。本文的设定是你对spark运行机制和rdd (the resilient distributed dataset)编程有一定的了解。
1. SparkStreaming 整体架构
SparkStreaming 和 Spark批处理一样,有Driver、Executor、clustermanager(本文不讲)、worker等基本概念。后面的内容会介绍这些组件里的细节。
2. StreamingContext
2.1 初始化
StreamingContext是Spark Streaming的入口类,首先创建sparkConf,
val conf = new SparkConf() val ssc = new StreamingContext(conf, Seconds(duration.toString.toInt)) ssc.start
初始化StreamingContext
class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration ) extends Logging { private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf) } private[streaming] val graph: DStreamGraph private[streaming] val scheduler = new JobScheduler(this) // 开启作业流 def start(): Unit = synchronized {
StreamingContenxt 有spark core中的 sparkContext、CheckPoint等spark core本身的属性,同时具备Duration这个时间上的维度。
初始化StreamingContext时,会初始化SparkContext对象,其最终作业处理还是交给sparkContext处理,2.3 节会有介绍。
StreamingContext会初始化DStreamGraph用来处理Dstream的依赖关系, 2.2节会有介绍。
2.2 启动
在Driver中,StreamingContext会初始化JobScheduler,JobScheduler包含JobGenerator,JobGenerator会生成RecurringTimer对象,其负责定时生成Job,发送给Executor执行,Executor中Receiver会接收数据,执行从Driver提交过来的作业逻辑。
启动时,StreamingContext 调用start 启动作业
executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start() // receiverTrack 启动 jobGenerator.start() // jobGenerator启动 executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler")
JobGenerator 通过sc 调用 DStreamGraph 生成job。
/** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. */ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpointing for the given `time`. */ 周期性的生成作业和checkpoint private def generateJobs(time: Time): Unit = { // Checkpoint all RDDs marked for checkpointing to ensure their lineages are truncated periodically. 记录血缘关系 ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { // allocate received blocks to batch jobScheduler.receiverTracker.allocateBlocksToBatch(time) 将receive接收的block分配给batch 2.3 节有介绍 // generate jobs using allocated block 此处在DStreamGraph周期性生成job graph.generateJobs(time) }
JobGenerator 中会周期性的作业, 记录血缘关系,将receiver接收的block分配给batch
2.3 DStream生成,作业提交
Driver端生成Receiver对象,发送至Executor端,Receiver接收流数据,收集成Block,同步至Driver所收集的block元数据信息,
/** * Here is the approach to schedule executors: First, schedule all the receivers with preferred locations (hosts), evenly among the * executors running on those host. * Then, schedule all other receivers evenly among all the executors such that overall * distribution over all the receivers is even * This method is called when we start to launch receivers at the first time. * @return a map for receivers and their scheduled locations */ def scheduleReceivers( receivers: Seq[Receiver[_]], executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
该方法指定receiver运行在那些Executor上,然后在executor上启动receiver,接收数据。
/** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. * Supervisor 是 receiver监视器,同时负责数据的写操作。 */ private[streaming] abstract class ReceiverSupervisor() { def start(): Unit = { onStart() // 2.2 中 receiverTracker.start 调用 startReceiver() } override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver(streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askSync[Boolean](msg) } override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() } stoppedGenerators.foreach(registeredBlockGenerators.remove(_)) val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators.add(newBlockGenerator) newBlockGenerator }
在receiver启动时,向Driver端ReceiverTracker进行通信,发送元数据信息,保持心跳。同时创建BlockGenerator。
/** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */ private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging { /** * Push a single data item into the buffer. */ // 接收数据 def addData(data: Any): Unit = { if (state == Active) { waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { } /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) //生成block newBlock = new Block(blockId, newBlockBuffer) } } 存储block 并将元信息发送给driver def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ): Unit = { // 存储block val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) // 元数据发送给Driver if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) { throw new SparkException("Failed to add block to receiver tracker.") }
BlockGenretor 完成数据接收,Block生成,回调ReceiverSupervisor存储block,并将元数据发送给driver,
回到Driver端.
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { /** Allocate all unallocated blocks to the given batch. */ def allocateBlocksToBatch(batchTime: Time): Unit = { if (receiverInputStreams.nonEmpty) { // 分配block receivedBlockTracker.allocateBlocksToBatch(batchTime) } / Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct // 周期性生成rdd构成DStream ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } // 调用sparkcontext提交作业,回到sparkcontext val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => (), ()) }
回到sparkcontext 通过DagScheduler 提交作业
class SparkContext(config: SparkConf) extends Logging { /** * Submit a job for execution and return a FutureJob holding the result. * @param rdd target RDD to run tasks on * @param resultHandler callback to pass each result to * @param resultFunc function to be executed when the result is ready */ def submitJob[T, U, R]( rdd: RDD[T] processPartition: Iterator[T] => U, partitions: Seq[Int],resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] ={ assertNotStopped() val cleanF = clean(processPartition) val callSite = getCallSite // 回到sparkcontext 通过DagScheduler 提交作业 val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), partitions, callSite, resultHandler, localProperties.get) new SimpleFutureAction(waiter, resultFunc) }
回到Dirver,ReceiverTracker 调用sc生成周期性时间间隔rdd构成DStream,通过sparkcontext调用DagScheduler提交作业,最终还是被转化成rdd和dag,是不是又回到了熟悉的spark作业流程。
3. DStream处理
DStream 本质上是对rdd的一个封装,DStream中的每个rdd是包含来自一个时间间隔的数据,(spark represents stream as a continuous series of Rdds, Each rdd is processed as a batch)
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging { /** Time interval after which the DStream generates an RDD */ def slideDuration: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] /** Method that generates an RDD for the given time */ def compute(validTime: Time): Option[RDD[T]] // RDDs generated, marked as private[streaming] so that testsuites can access it @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]() /** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) /** * Return a new DStream in which each RDD contains all the elements in seen in a 返回窗口时间上对应的rdd * sliding window of time over this DStream. The new DStream generates RDDs with * the same interval as this DStream. 窗口长度必须是duration的整数倍 * @param windowDuration width of the window; must be a multiple of this DStream's interval. */ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { val cleanedF = context.sparkContext.clean(foreachFunc, false) foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true) }
从DStream源码中我们可以看到它的关键特性
1. 后面的DStream依赖前面的DStream
2. DStream每个时间间隔生成一个rdd,也就是所说的miniRdd
DStream和rdd的关系就是随时间不断产生的rdd,对DStream的操作就是固定时间操作时间间隔上的rdd。就是在rdd的基础上增加了时间维度,spark streaming是一小批一小批的数据进行处理,且处理的逻辑在一批数据准备好之后才会进行计算。
对比spark原生rdd基础算子,DStream多出window、updateStateByKey、mapstateByKey、 foreachRdd等几个算子,下面介绍下这几个算子,其他rdd算子操作参照spark rdd基础操作,在这里不再介绍。和原生rdd一样,DStream的转换操作也是lazy机制,要有action操作才被触发。
3.1 window
window是流计算中一个非常重要的概念,window操作是对过去固定长度的时间段的数据进行处理(windowing is the process of slicing up
a data source along temporal boundaries),DStream调用window方法时会有两个参数duration(窗口长度)、slide(滑动步长,非必须) 生成WindowStream,只配置duration参数时是滚动窗口,加上slide参数是滑动窗口.
class WindowedDStream[T: ClassTag]( parent: DStream[T], _windowDuration: Duration, _slideDuration: Duration) extends DStream[T](parent.ssc) { override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) // 该窗口的终止时间 - 起始时间 val rddsInWindow = parent.slice(currentWindow) Some(ssc.sc.union(rddsInWindow)) } }
可以看到经过运算后是返回对应该窗口时间的rdd,然后你可以在基于该rdd进行函数运算生成新的DStream。
3.2 UpdateStateByKey
当你需要根据之前的计算结果和当前数据一起计算来求算结果时,可使用此算子,该算子为我们提供了记录及更新状态的功能。
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * In every batch the updateFunc will be called for each state even if there are no new values. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) }
在updateStateByKey中,你需要定义状态和状态更新函数,使用updateFunc来更新状态,Seq[V]为本次传入的数据,Option[S]为前次计算的结果,最后返回DStream,使用updateBykey时需要对所有数据进行扫描,再用key分组。如果数据量过大,可以考虑使用mapwithstate。
3.3 foreachRdd
是sparkstreaming 中的常用的输出操作,
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope { val cleanedF = context.sparkContext.clean(foreachFunc, false) foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true) }
这里传入的foreachFunc会作用在DStream中的每个rdd上,通常foreachFunc会将rdd的数据输出到外部系统,同时也可以建立连接从外部系统如db读取数据,同时其他算子如print、saveAsxxx也可完成输出操作。
4. 流速控制
4.1 消费速率动态控制
利用spark反压机制(backPressure),相应的配置项是spark.streaming.backpressure.enabled,默认值是false,如果需要开启的话设置为true,spark反压采用的是令牌桶机制,用RateController控制接收速率,
case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } /** * A component that estimates the rate at which an `InputDStream` should ingest * records, based on updates at every batch completion. * 用来评估inputStream的消费能力,根据消费能力调整接收速率 * Please see `org.apache.spark.streaming.scheduler.RateController` for more details. */ private[streaming] trait RateEstimator extends Serializable { def create(conf: SparkConf, batchInterval: Duration): RateEstimator = conf.get(BACKPRESSURE_RATE_ESTIMATOR) match { case "pid" => val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL) val integral = conf.get(BACKPRESSURE_PID_INTEGRAL) val derived = conf.get(BACKPRESSURE_PID_DERIVED) val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE) new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate) } /** * Provides waitToPush() method to limit the rate at which receivers consume data. * updateRate 更新速率 */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { def waitToPush(): Unit = { rateLimiter.acquire() } /** * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. @param newRate A new rate in records per second. It has no effect if it's 0 or negative. */ private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } }
spark 会评估作业的消费能力,进而调整消费速率,使消费的数据量适应上消费能力,但你资源不足时,会造成消息堆积,消费延迟,并不能解决消费能力不足的问题.
4.2 消息队列端限制
即静态限速,比如消费kafka时,可设置消费kafka的最大速率,防止作业初次启动时拉取过多数据,
spark.streaming.kafka.maxRatePerPartition 一次数据拉取的最大量就是 该maxRate * kafka partition数量 * spark duration
4.3 动态资源分配
当数据的生产速度存在波动时,所需要的计算资源也不一样,比如高峰时需要更多资源,低谷时又不需要那么多资源,我们可以使用spark.dynamicAllocation.enabled 配置项设置动态分配策略,但此特性还处于试验阶段。
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } } if (Utils.isDynamicAllocationEnabled(sc.conf) || ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) { logWarning("Dynamic Allocation is enabled for this application. " + "Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " + "Write Ahead Log is not enabled for non-replayable sources. " + "See the programming guide for details on how to enable the Write Ahead Log.") }
5. 容错与调优简介
Spark Streaming天然具有spark的容错的机制,有基于rdd lineage的容错机制,有使用checkpoint容错,有Driver和Executor的容错,Streaming模块也有自己的容错机制。调优方面spark本身的调优,streaming还有有些如保障作业7 * 24 调优机制,后续会专门写一篇SparkStreaming容错和调优,敬请期待和拍砖。
6. 各流框架特性对比
针对市面上各大流计算框架,对比其吞吐、时延、时间语义等特性
框架 特性 |
spark-streaming |
structured-streaming |
Flink |
Storm |
时延 |
High |
Low |
Very low |
Low |
吞吐量 |
High |
High |
High |
Low |
性能 |
Medium |
High |
High |
Low |
Stream-sql |
Y |
Y |
Y |
N |
EventTime |
N |
Y |
Y |
N |
动态资源 |
N |
N |
N |
Y |
Backpressure |
Medium |
Medium |
Good |
Medium |
watemark |
N |
Y |
Y |
N |
Model |
microbatch |
Native、batch |
Native |
Native |
社区 |
维护 |
相对活跃 |
活跃 |
Decline |
成熟度 |
High |
Medium |
High |
Decline |
7. Spark流展望
Spark Streaming吞吐上还是有很大优势,但其本质上还是微批处理机制,并不是纯流处理,同时又无法支持EventTime时间语义这个致命弱点,同时满足不了极低的时延需求。Spark 1.3 后推出DataFrame的概念,将数据看做一张有界数据表,具备schema信息,Spark2.0后基于DataFrame推出Structured Streaming(结构化流处理)(Structured Streaming is a scalable and fault-tolerant stream processing micro-batch engine built on the Spark SQL engine),将流式的数据看做一张无界表 (unbounded table),新来的一条数据都作为新的一行加到表的后面。 Spark 2.3后提出Continuous Processing(持续查询)(which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees)的概念,同属于sql模块,结构化流处理支持catalyst查询编译优化和Tungsten优化,降低了开发者开发门槛,同时支持有状态计算、eventTime时间语义等Stream Processing基本概念。所以未来是什么,未来是流批一体?是sql一统天下?
ps 本文代码来自于spark源码
- 点赞
- 收藏
- 关注作者
评论(0)