SparkStreaming编程模型
SparkStreaming编程模型
概要
01 依赖管理
02 基本套路
03 输入源
04 转换操作
05 输出操作
06 持久化操作
依赖管理
核心依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
Source相关依赖
部分source相关依赖现在已经单独打包,需要单独引入
基本套路
//1、参数处理
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}
//2、初始化StreamingContext
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//3、从source获取数据创建DStream
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
//4、对DStream进行各种操作
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//5、处理计算结果
wordCounts.print()
//6、启动Spark Streaming
ssc.start()
ssc.awaitTermination()
输入源
Dstream输入源---input DStream
Spark内置了两类Source:
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-
topic number of Kafka partitions to consume])
Dstream输入源--- Receiver
input Dstream都会关联一个Receiver(除了FileInputDStream)
Receiver以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD。
Receiver收集到输入数据后会把数据复制到另一个执行器进程来保障容错性(默认行为)
Receiver会消耗额外的cpu资源,所以要注意分配更多的cpu cores
local模式下不要“local” or “local[1]”
分布式运行时,分配的cores> receivers的数量
StreamingContext 会周期性地运行Spark 作业来处理这些数据,把数据与之前时间区间中的RDD 进行整合
内置的input Dstream:Basic Sources
内置input Dstream
https://github.com/apache/spark/tree/v1.6.2/external
套接字
文件流
val logData = ssc.textFileStream(logDirectory)
Spark 支持从任意Hadoop 兼容的文件系统中读取数据, Spark Streaming 也就支持从任意Hadoop 兼容的文件系统目录中的文件创建数据流(InputFormat参数化)
ssc.fileStream[LongWritable, IntWritable,
SequenceFileInputFormat[LongWritable, IntWritable]](inputDirectory).map {
case (x, y) => (x.get(), y.get())
}
文件必须原子化创建(比如把文件移入Spark 监控的目录)
Akka actor流
内置的input Dstream:Advanced Sources
Apache:
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount ")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
Apache Flume
Dstream输入源:multiple input DStream
multiple input streams(same type and same slide duration)
ssc.union(Seq(stream1,stream2,…))
stream.union(otherStream)
Dstream输入源:Custom Receiver
Custom input Dstream
org.apache.spark.streaming.receiver.Receiver
http://spark.apache.org/docs/1.6.2/streaming-custom-receivers.html
转换操作
无状态转换操作
和Spark core的语义⼀一致
无状态转化操作就是把简单的RDD 转化操作应用到每个批次上,也就是转化DStream中的每一个RDD(对Dstream的操作会映射到每个批次的RDD上)
无状态转换操作不会跨多个batch的RDD去执行
有状态转换操作1-updateStateByKey
有时我们需要在DStream 中跨所有批次维护状态(例如跟踪用户访问网站的会话)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的Dstream
使用updateStateByKey需要完成两步工作:
1. 定义状态:可以是任意数据类型
2. 定义状态更新函数-updateFunc
update(events, oldState)
events:是在当前批次中收到的事件的列表(可能为空)。
oldState:是一个可选的状态对象,存放在Option 内;如果一个键没有之前的状态,
这个值可以空缺。
newState:由函数返回,也以Option 形式存在;我们可以返回一个空的Option 来表示想要删除该状态。
注意:有状态转化操作需要在你的StreamingContext 中打开检查点机制来确保容错性ssc.checkpoint("hdfs://...")
有状态转换操作2-window
基于窗口的操作会在一个比StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果
所有基于窗口的操作都需要两个参数,分别为windowDuration以及slideDuration,两者都必须是StreamContext 的批次间隔的整数倍
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
batchDuration
val ssc = new StreamingContext(sparkConf, Seconds(10))
windowDuration
长控制每次计算最近的多少个批次的数据(windowDuration/batchDuration)
slideDuration
默认值与batchDuration相等(默认滑动一个batch)
控制多长时间计算一次
代码片段:
val ssc = new StreamingContext(sparkConf, Seconds(10))
…
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(20))
val windowCounts = accessLogsWindow.count()
窗口时长为3 个批次,滑动步长为2 个批次;每隔2 个批次就对前3 个批次的数据进行一次计算
有状态转换操作2-window操作—普通规约与增量规约
增量规约只考虑新进入窗口的数据和离开窗口的数据,让Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+ 对应的逆函数为-
有状态转换操作2-window操作—理解增量规约:
输出操作
DStream输出
常见输出操作:
每个批次中抓取DStream 的前十个元素打印出来。
foreachRDD
saveAsObjectFiles
saveAsTextFiles
saveAsHadoopFiles
惰性求值
持久化操作
允许用户调用persist 来持久化(将Dstream中的RDD持久化)
默认的持久化: MEMORY_ONLY_SER
对于来自网络的数据源(Kafka, Flume, sockets 等) :MEMORY_AND_DISK_SER_2
对于window 和stateful 操作默认持久化;
- 点赞
- 收藏
- 关注作者
评论(0)