SparkStreaming编程模型

举报
Smy1121 发表于 2019/06/22 17:00:48 2019/06/22
【摘要】 SparkStreaming编程模型

SparkStreaming编程模型


概要

01 依赖管理

02 基本套路

03 输入源

04 转换操作

05 输出操作

06 持久化操作


依赖管理


核心依赖

<dependency> 

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.10</artifactId>

<version>1.6.2</version> 

</dependency>


Source相关依赖

部分source相关依赖现在已经单独打包,需要单独引入

image.png


基本套路


//1、参数处理

if (args.length < 2) {

 System.err.println("Usage: NetworkWordCount  ")

 System.exit(1)

}


//2、初始化StreamingContext

val sparkConf = new SparkConf().setAppName("NetworkWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(1))


//3、从source获取数据创建DStream

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)


//4、对DStream进行各种操作

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)


//5、处理计算结果

wordCounts.print()


//6、启动Spark Streaming

ssc.start()

ssc.awaitTermination()



输入源

Dstream输入源---input DStream

Spark内置了两类Source:

image.png


val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-

topic number of Kafka partitions to consume])



Dstream输入源--- Receiver

input Dstream都会关联一个Receiver(除了FileInputDStream)

Receiver以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD。

Receiver收集到输入数据后会把数据复制到另一个执行器进程来保障容错性(默认行为)

Receiver会消耗额外的cpu资源,所以要注意分配更多的cpu cores


local模式下不要“local” or “local[1]”

分布式运行时,分配的cores> receivers的数量

StreamingContext 会周期性地运行Spark 作业来处理这些数据,把数据与之前时间区间中的RDD 进行整合


image.png


内置的input Dstream:Basic Sources


内置input Dstream

https://github.com/apache/spark/tree/v1.6.2/external


套接字

文件流

val logData = ssc.textFileStream(logDirectory)


Spark 支持从任意Hadoop 兼容的文件系统中读取数据, Spark Streaming 也就支持从任意Hadoop 兼容的文件系统目录中的文件创建数据流(InputFormat参数化)

ssc.fileStream[LongWritable, IntWritable,

SequenceFileInputFormat[LongWritable, IntWritable]](inputDirectory).map {

case (x, y) => (x.get(), y.get())

}


文件必须原子化创建(比如把文件移入Spark 监控的目录)


Akka actor流

内置的input Dstream:Advanced Sources


Apache:


def main(args: Array[String]) {

if (args.length < 4) {

System.err.println("Usage: KafkaWordCount    ")

System.exit(1)

}

StreamingExamples.setStreamingLogLevels()


val Array(zkQuorum, group, topics, numThreads) = args

val sparkConf = new SparkConf().setAppName("KafkaWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(2))

ssc.checkpoint("checkpoint")


val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)

wordCounts.print()


ssc.start()

ssc.awaitTermination()

}



Apache Flume

Dstream输入源:multiple input DStream

multiple input streams(same type and same slide duration)

ssc.union(Seq(stream1,stream2,…))

stream.union(otherStream)



Dstream输入源:Custom Receiver

Custom input Dstream

org.apache.spark.streaming.receiver.Receiver

http://spark.apache.org/docs/1.6.2/streaming-custom-receivers.html


   

转换操作

无状态转换操作

和Spark core的语义⼀一致


无状态转化操作就是把简单的RDD 转化操作应用到每个批次上,也就是转化DStream中的每一个RDD(对Dstream的操作会映射到每个批次的RDD上)

无状态转换操作不会跨多个batch的RDD去执行

image.png


有状态转换操作1-updateStateByKey


有时我们需要在DStream 中跨所有批次维护状态(例如跟踪用户访问网站的会话)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的Dstream


使用updateStateByKey需要完成两步工作:

1. 定义状态:可以是任意数据类型

2. 定义状态更新函数-updateFunc


update(events, oldState)

events:是在当前批次中收到的事件的列表(可能为空)。

oldState:是一个可选的状态对象,存放在Option 内;如果一个键没有之前的状态,

这个值可以空缺。

newState:由函数返回,也以Option 形式存在;我们可以返回一个空的Option 来表示想要删除该状态。


注意:有状态转化操作需要在你的StreamingContext 中打开检查点机制来确保容错性ssc.checkpoint("hdfs://...")



有状态转换操作2-window


基于窗口的操作会在一个比StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果

所有基于窗口的操作都需要两个参数,分别为windowDuration以及slideDuration,两者都必须是StreamContext 的批次间隔的整数倍


val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))

val windowCounts = accessLogsWindow.count()

batchDuration

val ssc = new StreamingContext(sparkConf, Seconds(10))

windowDuration


长控制每次计算最近的多少个批次的数据(windowDuration/batchDuration)

slideDuration

默认值与batchDuration相等(默认滑动一个batch)

控制多长时间计算一次


代码片段:

val ssc = new StreamingContext(sparkConf, Seconds(10))

val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(20))

val windowCounts = accessLogsWindow.count()


窗口时长为3 个批次,滑动步长为2 个批次;每隔2 个批次就对前3 个批次的数据进行一次计算


image.png



有状态转换操作2-window操作—普通规约与增量规约


增量规约只考虑新进入窗口的数据和离开窗口的数据,让Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+ 对应的逆函数为-

image.png


有状态转换操作2-window操作—理解增量规约:

image.png


输出操作

DStream输出


常见输出操作:

print

每个批次中抓取DStream 的前十个元素打印出来。

foreachRDD


saveAsObjectFiles

saveAsTextFiles

saveAsHadoopFiles


惰性求值


持久化操作

允许用户调用persist 来持久化(将Dstream中的RDD持久化)

默认的持久化: MEMORY_ONLY_SER

对于来自网络的数据源(Kafka, Flume, sockets 等) :MEMORY_AND_DISK_SER_2

image.png


对于window 和stateful 操作默认持久化;

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。