SparkStreaming

举报
bigdata张凯翔 发表于 2021/03/26 01:57:46 2021/03/26
【摘要】 Spark Streming的特性 易用、容错、易整合 Spark Streaming 对比 Storm & SparkStreaming Storm 开发语言 Scala Clojure 编程模型3 DStream Spout/Bolt 实时性 准实时,批处理 实时流处理 Spark Streming编程实战 开发流程: 1、构建sparkCon...

Spark Streming的特性

易用、容错、易整合

Spark Streaming 对比 Storm

& SparkStreaming Storm
开发语言 Scala Clojure
编程模型3 DStream Spout/Bolt
实时性 准实时,批处理 实时流处理

Spark Streming编程实战

开发流程:

  • 1、构建sparkContext对象
  • 2、构建StreamingContext对象
  • 3、创建输入流InputDStream
  • 4、对DStream执行算子操作
  • 5、将执行结果保存或者输出

SparkStreaming整合flume

Spark Streaming对接FlumeNG有两种方式:
(1)、FlumeNG主动将消息Push推给Spark Streaming
Spark程序需要启动一个端口接受数据,所以flume的配置文件中需要配置spark程序所运行的ip和端口
(2)、Spark Streaming主动从flume 中Poll拉取数据。
Flume需要启动一个端口来输出数据,所以flume配置文件中配置的是flume机器的主机名和端口,而在spark程序中需要绑定flume的ip和输出端口,这样spark程序才能主动拉取到数据
两种方式的启动顺序对比:
在push模式中,先启动spark application,进入等待状态,等待flume push数据,此时启动flume进行数据的传递.
pull模式中,spark application会从配置的端口pull数据,此时若flume还未启动,spark application会提示端口连接失败.所以需要先启动flume后启动spark application

SparkStreaming整合Kafka

SparkStreaming整合Kafka有两种方式:
(1)、Receiver方式(KafkaUtils.createStream(调用kafka高级api)
这种方法使用Receiver来接收数据。Receiver是使用Kafka高级消费者API实现的。与所有的接收者一样,通过Receiver从Kafka接收的数据存储在Spark执行程序exector中,然后由Spark Streaming启动的作业处理数据。但是,在默认配置下,这种方法可能会在失败时丢失数据。为了确保零数据丢失,您必须在Spark Streaming(在Spark 1.2中引入)中额外启用wal预写日志(需要设置检查点checkpoint<包含消息和offset>保证了消息不丢失),同时保存所有接收到的Kafka数据写入hdfs的预写日志,以便所有数据都可以在失败时恢复。

每次消费完一条数据都需要在zk上更新offset的值,如果在更新offset过程中zk挂掉了,消费的消息偏移量并没有更新成功。下次程序在消费的时候就会出现重复消费的情况。
(2)、Direct方式(KafkaUtils.createDirectStream(调用kafka低级api)
定期地从kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,Spark通过调用kafka简单的消费者Api(低级api)读取一定范围的数据。
Direct对比Receiver方式的优点:
1、简化并行
一个rdd分区对比topic上一个分区
2、高效
第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是接受kafka中topic的数据,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题
3、恰好一次
offset仅被保存在checkpoint目录里,消除了zk与ssc偏移量不一致问题,解决重复消费问题,缺点是无法使用基于zk的kafka监控工具

Checkpoint【后续会有更详细的介绍】

Spark Streaming的检查点具有容错机制,有足够的信息能够支持故障恢复。支持两种数据类型的检查点:元数据检查点和数据检查点
(1)元数据检查点,在类似HDFS的容错存储上,保存Streaming计算信息。这种检查点用来恢复运行Streaming应用程序失败的Driver进程。
(2)数据检查点,在进行跨越多个批次合并数据的有状态操作时尤其重要。通过周期检查将转换RDD的中间状态进行可靠存储,借以切断无限增加的依赖。使用有状态的转换,如果updateStateByKey或者reduceByKeyAndWindow在应用程序中使用,那么需要提供检查点路径,对RDD进行周期性检查。

当程序因为异常重启时,如果检查点路径存在,context将从检查点数据中重建。如果检查点目录不存在,将会新建context,并设置DStream。




SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么?

一、基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

需要注意的要点

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
优点如下:
简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
一次且仅一次的事务机制。
三、对比:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
在实际生产环境中大都用Direct方式

简述SparkStreaming窗口函数的原理(重点)

窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。
图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/918dbeb36884

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。