SparkStreaming介绍
【摘要】 spark streaming 是在spark core基础上的一个高吞吐、高容错的无状态的微批处理框架,其是基于批量数据的处理。本文的设定是你对spark运行机制和rdd (the resilient distributed dataset)编程有一定的了解。
spark streaming 是在spark core基础上的一个高吞吐、高容错的无状态的微批处理框架,其是基于批量数据的处理。(spark streaming uses a microbatch architecture for continuous data processing and relies on the exactly once neture of batch processing to ensure correctness)。其编程接口主要由streamingContext和DStream提供。本文的设定是你对spark运行机制和rdd (the resilient distributed dataset)编程有一定的了解。
1. SparkStreaming 整体架构
SparkStreaming 和 Spark批处理一样,有Driver、Executor、clustermanager(本文不讲)、worker等基本概念。后面的内容会介绍这些组件里的细节。
2. StreamingContext
2.1 初始化
StreamingContext是Spark Streaming的入口类,首先创建sparkConf,
val conf = new SparkConf()
val ssc = new StreamingContext(conf, Seconds(duration.toString.toInt))
ssc.start
初始化StreamingContext
class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration ) extends Logging {
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf) }
private[streaming] val graph: DStreamGraph
private[streaming] val scheduler = new JobScheduler(this)
// 开启作业流
def start(): Unit = synchronized {
StreamingContenxt 有spark core中的 sparkContext、CheckPoint等spark core本身的属性,同时具备Duration这个时间上的维度。
初始化StreamingContext时,会初始化SparkContext对象,其最终作业处理还是交给sparkContext处理,2.3 节会有介绍。
StreamingContext会初始化DStreamGraph用来处理Dstream的依赖关系, 2.2节会有介绍。
2.2 启动
在Driver中,StreamingContext会初始化JobScheduler,JobScheduler包含JobGenerator,JobGenerator会生成RecurringTimer对象,其负责定时生成Job,发送给Executor执行,Executor中Receiver会接收数据,执行从Driver提交过来的作业逻辑。
启动时,StreamingContext 调用start 启动作业
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start() // receiverTrack 启动
jobGenerator.start() // jobGenerator启动
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
JobGenerator 通过sc 调用 DStreamGraph 生成job。
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
*/
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Generate jobs and perform checkpointing for the given `time`. */ 周期性的生成作业和checkpoint
private def generateJobs(time: Time): Unit = {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are truncated periodically. 记录血缘关系
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
// allocate received blocks to batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time) 将receive接收的block分配给batch 2.3 节有介绍
// generate jobs using allocated block 此处在DStreamGraph周期性生成job
graph.generateJobs(time)
}
JobGenerator 中会周期性的作业, 记录血缘关系,将receiver接收的block分配给batch
2.3 DStream生成,作业提交
Driver端生成Receiver对象,发送至Executor端,Receiver接收流数据,收集成Block,同步至Driver所收集的block元数据信息,
/**
* Here is the approach to schedule executors:
First, schedule all the receivers with preferred locations (hosts), evenly among the
* executors running on those host.
* Then, schedule all other receivers evenly among all the executors such that overall
* distribution over all the receivers is even
* This method is called when we start to launch receivers at the first time.
* @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
receivers: Seq[Receiver[_]],
executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
该方法指定receiver运行在那些Executor上,然后在executor上启动receiver,接收数据。
/**
* Abstract class that is responsible for supervising a Receiver in the worker.
* It provides all the necessary interfaces for handling the data received by the receiver.
* Supervisor 是 receiver监视器,同时负责数据的写操作。
*/
private[streaming] abstract class ReceiverSupervisor() {
def start(): Unit = {
onStart()
// 2.2 中 receiverTracker.start 调用
startReceiver()
}
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askSync[Boolean](msg)
}
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators.add(newBlockGenerator)
newBlockGenerator
}
在receiver启动时,向Driver端ReceiverTracker进行通信,发送元数据信息,保持心跳。同时创建BlockGenerator。
/**
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*
* Note: Do not create BlockGenerator instances directly inside receivers. Use
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
/**
* Push a single data item into the buffer.
*/
// 接收数据
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
}
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
//生成block
newBlock = new Block(blockId, newBlockBuffer)
}
}
存储block 并将元信息发送给driver
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
): Unit = {
// 存储block
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
// 元数据发送给Driver
if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) {
throw new SparkException("Failed to add block to receiver tracker.")
}
BlockGenretor 完成数据接收,Block生成,回调ReceiverSupervisor存储block,并将元数据发送给driver,
回到Driver端.
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
/** Allocate all unallocated blocks to the given batch. */
def allocateBlocksToBatch(batchTime: Time): Unit = {
if (receiverInputStreams.nonEmpty) {
// 分配block
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
/ Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
// 周期性生成rdd构成DStream
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
// 调用sparkcontext提交作业,回到sparkcontext
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => (), ())
}
回到sparkcontext 通过DagScheduler 提交作业
class SparkContext(config: SparkConf) extends Logging {
/**
* Submit a job for execution and return a FutureJob holding the result.
* @param rdd target RDD to run tasks on
* @param resultHandler callback to pass each result to
* @param resultFunc function to be executed when the result is ready
*/
def submitJob[T, U, R](
rdd: RDD[T] processPartition: Iterator[T] => U, partitions: Seq[Int],resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] ={
assertNotStopped()
val cleanF = clean(processPartition)
val callSite = getCallSite
// 回到sparkcontext 通过DagScheduler 提交作业
val waiter = dagScheduler.submitJob(
rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions, callSite, resultHandler, localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
}
回到Dirver,ReceiverTracker 调用sc生成周期性时间间隔rdd构成DStream,通过sparkcontext调用DagScheduler提交作业,最终还是被转化成rdd和dag,是不是又回到了熟悉的spark作业流程。
3. DStream处理
DStream 本质上是对rdd的一个封装,DStream中的每个rdd是包含来自一个时间间隔的数据,(spark represents stream as a continuous series of Rdds, Each rdd is processed as a batch)
abstract class DStream[T: ClassTag] (
@transient private[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
/** Time interval after which the DStream generates an RDD */
def slideDuration: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
/** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
/**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
/**
* Return a new DStream in which each RDD contains all the elements in seen in a 返回窗口时间上对应的rdd
* sliding window of time over this DStream. The new DStream generates RDDs with
* the same interval as this DStream. 窗口长度必须是duration的整数倍
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
从DStream源码中我们可以看到它的关键特性
1. 后面的DStream依赖前面的DStream
2. DStream每个时间间隔生成一个rdd,也就是所说的miniRdd
DStream和rdd的关系就是随时间不断产生的rdd,对DStream的操作就是固定时间操作时间间隔上的rdd。就是在rdd的基础上增加了时间维度,spark streaming是一小批一小批的数据进行处理,且处理的逻辑在一批数据准备好之后才会进行计算。
对比spark原生rdd基础算子,DStream多出window、updateStateByKey、mapstateByKey、 foreachRdd等几个算子,下面介绍下这几个算子,其他rdd算子操作参照spark rdd基础操作,在这里不再介绍。和原生rdd一样,DStream的转换操作也是lazy机制,要有action操作才被触发。
3.1 window
window是流计算中一个非常重要的概念,window操作是对过去固定长度的时间段的数据进行处理(windowing is the process of slicing up
a data source along temporal boundaries),DStream调用window方法时会有两个参数duration(窗口长度)、slide(滑动步长,非必须) 生成WindowStream,只配置duration参数时是滚动窗口,加上slide参数是滑动窗口.
class WindowedDStream[T: ClassTag](
parent: DStream[T], _windowDuration: Duration, _slideDuration: Duration)
extends DStream[T](parent.ssc) {
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) // 该窗口的终止时间 - 起始时间
val rddsInWindow = parent.slice(currentWindow)
Some(ssc.sc.union(rddsInWindow))
}
}
可以看到经过运算后是返回对应该窗口时间的rdd,然后你可以在基于该rdd进行函数运算生成新的DStream。
3.2 UpdateStateByKey
当你需要根据之前的计算结果和当前数据一起计算来求算结果时,可使用此算子,该算子为我们提供了记录及更新状态的功能。
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* In every batch the updateFunc will be called for each state even if there are no new values.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
在updateStateByKey中,你需要定义状态和状态更新函数,使用updateFunc来更新状态,Seq[V]为本次传入的数据,Option[S]为前次计算的结果,最后返回DStream,使用updateBykey时需要对所有数据进行扫描,再用key分组。如果数据量过大,可以考虑使用mapwithstate。
3.3 foreachRdd
是sparkstreaming 中的常用的输出操作,
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
这里传入的foreachFunc会作用在DStream中的每个rdd上,通常foreachFunc会将rdd的数据输出到外部系统,同时也可以建立连接从外部系统如db读取数据,同时其他算子如print、saveAsxxx也可完成输出操作。
4. 流速控制
4.1 消费速率动态控制
利用spark反压机制(backPressure),相应的配置项是spark.streaming.backpressure.enabled,默认值是false,如果需要开启的话设置为true,spark反压采用的是令牌桶机制,用RateController控制接收速率,
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
/**
* A component that estimates the rate at which an `InputDStream` should ingest
* records, based on updates at every batch completion.
* 用来评估inputStream的消费能力,根据消费能力调整接收速率
* Please see `org.apache.spark.streaming.scheduler.RateController` for more details.
*/
private[streaming] trait RateEstimator extends Serializable {
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get(BACKPRESSURE_RATE_ESTIMATOR) match {
case "pid" =>
val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL)
val integral = conf.get(BACKPRESSURE_PID_INTEGRAL)
val derived = conf.get(BACKPRESSURE_PID_DERIVED)
val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE)
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
}
/**
* Provides waitToPush() method to limit the rate at which receivers consume data.
* updateRate 更新速率
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
def waitToPush(): Unit = {
rateLimiter.acquire()
}
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
@param newRate A new rate in records per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
spark 会评估作业的消费能力,进而调整消费速率,使消费的数据量适应上消费能力,但你资源不足时,会造成消息堆积,消费延迟,并不能解决消费能力不足的问题.
4.2 消息队列端限制
即静态限速,比如消费kafka时,可设置消费kafka的最大速率,防止作业初次启动时拉取过多数据,
spark.streaming.kafka.maxRatePerPartition 一次数据拉取的最大量就是 该maxRate * kafka partition数量 * spark duration
4.3 动态资源分配
当数据的生产速度存在波动时,所需要的计算资源也不一样,比如高峰时需要更多资源,低谷时又不需要那么多资源,我们可以使用spark.dynamicAllocation.enabled 配置项设置动态分配策略,但此特性还处于试验阶段。
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}
if (Utils.isDynamicAllocationEnabled(sc.conf) ||
ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
logWarning("Dynamic Allocation is enabled for this application. " +
"Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " +
"Write Ahead Log is not enabled for non-replayable sources. " +
"See the programming guide for details on how to enable the Write Ahead Log.")
}
5. 容错与调优简介
Spark Streaming天然具有spark的容错的机制,有基于rdd lineage的容错机制,有使用checkpoint容错,有Driver和Executor的容错,Streaming模块也有自己的容错机制。调优方面spark本身的调优,streaming还有有些如保障作业7 * 24 调优机制,后续会专门写一篇SparkStreaming容错和调优,敬请期待和拍砖。
6. 各流框架特性对比
针对市面上各大流计算框架,对比其吞吐、时延、时间语义等特性
框架 特性 |
spark-streaming |
structured-streaming |
Flink |
Storm |
时延 |
High |
Low |
Very low |
Low |
吞吐量 |
High |
High |
High |
Low |
性能 |
Medium |
High |
High |
Low |
Stream-sql |
Y |
Y |
Y |
N |
EventTime |
N |
Y |
Y |
N |
动态资源 |
N |
N |
N |
Y |
Backpressure |
Medium |
Medium |
Good |
Medium |
watemark |
N |
Y |
Y |
N |
Model |
microbatch |
Native、batch |
Native |
Native |
社区 |
维护 |
相对活跃 |
活跃 |
Decline |
成熟度 |
High |
Medium |
High |
Decline |
7. Spark流展望
Spark Streaming吞吐上还是有很大优势,但其本质上还是微批处理机制,并不是纯流处理,同时又无法支持EventTime时间语义这个致命弱点,同时满足不了极低的时延需求。Spark 1.3 后推出DataFrame的概念,将数据看做一张有界数据表,具备schema信息,Spark2.0后基于DataFrame推出Structured Streaming(结构化流处理)(Structured Streaming is a scalable and fault-tolerant stream processing micro-batch engine built on the Spark SQL engine),将流式的数据看做一张无界表 (unbounded table),新来的一条数据都作为新的一行加到表的后面。 Spark 2.3后提出Continuous Processing(持续查询)(which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees)的概念,同属于sql模块,结构化流处理支持catalyst查询编译优化和Tungsten优化,降低了开发者开发门槛,同时支持有状态计算、eventTime时间语义等Stream Processing基本概念。所以未来是什么,未来是流批一体?是sql一统天下?
ps 本文代码来自于spark源码
【版权声明】本文为华为云社区用户翻译文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,
举报邮箱:cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)