流数据处理框架1
Spark Streaming
Spark 是一种快速、通用、可扩展的大数据分析引擎,已经发展成为一个包含多个子项目的集合。 Spark Streaming 是 Spark 的流处理部分。
Spark 的流处理是基于所谓微批处理的思想,把流处理看作是批处理的一种特殊形式,每次接收到一个时间间隔的数据才会去处理,所以天生很难在实时性上有所提升。
虽然在 Spark2.3 中提出了连续处理模型( Continuous Processing Model),但是现在只支持很有限的功能,并不能在大的项目中使用。 Spark还需要做出很大的努力才能改进现有的流处理模型想要在流处理的实时性上提升,就不能継续用微批处理的模式,而要想办法实现真正的流处理即每当有一条数据输入就立刻处理,不做等待。
数据类型
在内部,每个数据块就是一个 RDD,所以 spark streaming 有 RDD 所有优点,处理速度快,容错性好,支持高度并行计算。
操作流程
第一,我们将Spark Streaming类名和StreamingContext的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)中。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程的本地StreamingContext,批处理间隔为1秒。
public static void main(String[] args) throws InterruptedException {
// 工作环境
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); // 定义双线程 / APP 名称
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); // 定义批处理时间间隔 1s
// 流创建(从源导入)
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// 流处理(数据分离、统计并打印)
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
// 启动流运算
jssc.start();
jssc.awaitTermination();
}Copy to clipboardErrorCopied
DStream 对象
Spark Streaming 提供一个对于流数据的抽象 DStream。DStream 可以由来自 Apache Kafka、Flume 或者 HDFS 中的流数据生成,也可以由别的 DStream 经过各种转换操作得来。
底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD。然后,Spark 核心引擎将对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作,将 RDD 经过操作变成中间结果保存在内存中。
由于 Spark Streaming 将底层的细节封装起来了,所以对于开发者来说,只需要操作 DStream 就行。接下来,让我们一起学习 DStream 的结构以及它支持的转换操作。
StreamingContext 对象
任何 Spark Streaming 的程序都要首先创建一个 StreamingContext 的对象,它是所有 Streaming 操作的入口。StreamingContext 中最重要的参数是批处理的时间间隔,即把流数据细分成数据块的粒度。
用 streamingContext.start()
来开始接收数据并处理它 用 streamingContext.awaitTermination()
等待处理停止(手动停止或由于任何错误) 用 streamingContext.stop()
可以手动停止
一旦启动上下文,就无法设置新的流计算或将其添加到该流计算中 上下文一旦停止,就无法重新启动 一个JVM中只能同时激活一个StreamingContext StreamingContext中的stop()也会停止SparkContext。但如果要仅停止StreamingContext的话,设置stop(false) 只要在创建下一个StreamingContext之前停止了上一个StreamingContext(不停止SparkContext),就可以将SparkContext重用于创建多个StreamingContext
Spark 操作 kafka
Spark Streaming提供了两类内置的streaming源:
Basic sources :直接在StreamingContext API中可用的源。例如,文件系统和socket连接 Advanced sources :像Kafka,Flume,Kinesis等这样的源,可通过额外的程序类获得
消费
- 先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。
0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且0.10版本API有变化(更加强大)
生产
与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。 最直接的做法我们可以想到如下这种方式:
input.foreachRDD(rdd =>
// 不能在这里创建KafkaProducer
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)Copy to clipboardErrorCopied
但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?
首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
Flink
目前唯一同时支持高吞吐、低延迟、高性能的分布式流式数据处理框架。像Apache Spark也只能兼顾高吞吐和高性能特性,主要因为在Spark Streaming流式计算中无法做到低延迟保障。
优势
-
支持事件事件概念。
-
支持有状态计算,保持了事件原本产生的时序性,避免网络传输带来的影响。
-
支持高度灵活的窗口操作,Flink将窗口分为Time、Count、Session以及Data-driven等类型的窗口操作,可以灵活的处罚条件定制化来达到对复杂的流传输模式的支持。
-
基于轻量级分布式快照实现容错,将大型计算任务的流程拆解成小的计算过程,分布到并行节点上处理。并通过 Checkpoints 将执行过程中的状态信息进行持久化存储,可以自动恢复出现异常的任务。
-
基于 JVM 实现独立的内存管理。
运行环境
- JDK 版本必须在 1.8 及以上
- Maven 版本必须在 3.0.4 及以上
- Hadoop 环境支持 hadoop 2.4、2.6、2.7、2.8 等主要版本
Flink 支持使用 Java/Scala 开发,以下示例代码全部使用 Java .
基本组件
-
Flink 架构体系基本上分三层(自顶向下):
-
API & Libraries 层: 提供支撑流计算和批计算的接口,,同时在此基础上抽象出不同的应用类型的组件库。
-
Runtime 核心层:Flink分布式计算框架的核心实现层,负责分布式作业的执行、映射转换、任务调度等。将 DataStream 和 DataSet 转成同意的可执行的 Task Operator 。
-
物理部署层:目前Flink支持本地、集群、云、容器部署,Flink通过盖层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。
-
-
Flink 基本架构
-
Client 客户端:负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态。
-
JobManager:负责整个Flink集群任务的调度以及资源的管理
-
TaskManager:相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请与管理。
-
编程模型
- BasicTypeInfo 数据类型:支持任意 Java 原生基本类型或 String 类型。
// 直接获取
DataSource<String> inputStream= environment.fromElements("1", "2", "3", "4", "5", "6");
// 从集合获取
ArrayList<String> list = new ArrayList<>(list2);
DataSource<String> inputStream= environment.fromCollection(list);Copy to clipboardErrorCopied
- TupleTypeInfo 数据类型:标识 Tuple 类型数据。
DataSource<Tuple2> inputStreamTuple = environment.fromElements(new Tuple2("fangpc", 1), new Tuple2("fangpengcheng", 2));Copy to clipboardErrorCopied
- PojoTypeInfo 数据类型:描述任意的 POJOs ,字段类型必须是上述基础类型,拥有默认构造方法和 getter/setter 方法
var personStream = environment.fromElements(new Person("fangpc", 24), new Person("fangpengcheng", 25));Copy to clipboardErrorCopied
-
Value 数据类型:实现了org.apache.flink.types.Value,其中包括 read() 和 write() 两个方法完成序列化和反序列化操作,有着比较高效的性能。Flink 提供的内建 Value 类型有 IntValue、DoubleValue、StringValue 等。
-
特殊数据类型:
- Scala中的List、Map、Either、Option、Try数据类型
- Java中Either
- Hadoop的Writable数据类型
操作流程
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/******** 配置流过程 *********/
addSource(); // 流创建
// 流转化
/******** 配置流过程 *********/
// 执行
env.execute("Flink Streaming Java API Skeleton");
}
}Copy to clipboardErrorCopied
DataStreamSource 对象是 DataStream 类的子类,代表着数据流的起始点。必须通过 addSource 方法生成, fromCollection/readTextFile 方法底层也会调用 addSource 方法。
DataStream 对象代表着相同类型元素的流,可以通过转换(transformation)来实现转换为另一个 DataStream 对象。DataStrem 对象内部持有当前的 StreamExecutionEnvironment 对象和 DataTransformation 对象。
StreamExecutionEnvironment 对象代表着当前流计算执行环境以及相关配置。每个 DataStream 类在做转换的时候,会首先创建转换对应的 DataTransformation 对象,最终形成一个 DataTransformation 链表被 StreamExecutionEnvironment 对象维护。
Flink 在执行时,会把流拓扑(Source、Transformation、Sink)都转换为 DataFlow:由 Stream 和 Operator 组成,让 Stream在 Operator 中流动。
一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。
在流处理中,一致性分为 3 个级别。
-
at-most-once:故障发生之后,计数结果可能丢失。
-
at-least-once:这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
-
exactly-once:这指的是系统保证在发生故障后得到的计数结果与正确值一致。
第一代流处理器(如 Storm 和 Samza)刚问世时只保证 at-least-once。最先保证 exactly-once 的系统(Storm Trident 和 Spark Streaming)在性能和表现力这两个方面
- 点赞
- 收藏
- 关注作者
评论(0)