流处理器——Spark Streaming
theme: condensed-night-purple
流处理器
携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情
Spark Streaming
Spark 是一种快速、通用、可扩展的大数据分析引擎,已经发展成为一个包含多个子项目的集合。 Spark Streaming 是 Spark 的流处理部分。
Spark 的流处理是基于所谓微批处理的思想,把流处理看作是批处理的一种特殊形式,每次接收到一个时间间隔的数据才会去处理,所以天生很难在实时性上有所提升。
虽然在 Spark2.3 中提出了连续处理模型( Continuous Processing Model),但是现在只支持很有限的功能,并不能在大的项目中使用。 Spark还需要做出很大的努力才能改进现有的流处理模型想要在流处理的实时性上提升,就不能継续用微批处理的模式,而要想办法实现真正的流处理即每当有一条数据输入就立刻处理,不做等待。
数据类型
在内部,每个数据块就是一个 RDD,所以 spark streaming 有 RDD 所有优点,处理速度快,容错性好,支持高度并行计算。
操作流程
第一,我们将Spark Streaming类名和StreamingContext的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)中。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程的本地StreamingContext,批处理间隔为1秒。
public static void main(String[] args) throws InterruptedException {
// 工作环境
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); // 定义双线程 / APP 名称
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); // 定义批处理时间间隔 1s
// 流创建(从源导入)
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// 流处理(数据分离、统计并打印)
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
// 启动流运算
jssc.start();
jssc.awaitTermination();
}Copy to clipboardErrorCopied
DStream 对象
Spark Streaming 提供一个对于流数据的抽象 DStream。DStream 可以由来自 Apache Kafka、Flume 或者 HDFS 中的流数据生成,也可以由别的 DStream 经过各种转换操作得来。
底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD。然后,Spark 核心引擎将对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作,将 RDD 经过操作变成中间结果保存在内存中。
由于 Spark Streaming 将底层的细节封装起来了,所以对于开发者来说,只需要操作 DStream 就行。接下来,让我们一起学习 DStream 的结构以及它支持的转换操作。
StreamingContext 对象
任何 Spark Streaming 的程序都要首先创建一个 StreamingContext 的对象,它是所有 Streaming 操作的入口。StreamingContext 中最重要的参数是批处理的时间间隔,即把流数据细分成数据块的粒度。
用 streamingContext.start()
来开始接收数据并处理它 用 streamingContext.awaitTermination()
等待处理停止(手动停止或由于任何错误) 用 streamingContext.stop()
可以手动停止
一旦启动上下文,就无法设置新的流计算或将其添加到该流计算中 上下文一旦停止,就无法重新启动 一个JVM中只能同时激活一个StreamingContext StreamingContext中的stop()也会停止SparkContext。但如果要仅停止StreamingContext的话,设置stop(false) 只要在创建下一个StreamingContext之前停止了上一个StreamingContext(不停止SparkContext),就可以将SparkContext重用于创建多个StreamingContext
- 点赞
- 收藏
- 关注作者
评论(0)