大数据物流项目:Structured Streaming(以结构化方式处理流式数据,底层分析引擎SparkSQL)(十四点五)
## 09-[掌握]-事件时间窗口分析之原理剖析
> 在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:
```
1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中
2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;
3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。
```
![image-20210508171538191](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e97e75f85c4b49fdaf88a9b2b6ca21c6~tplv-k3u1fbpfcp-zoom-1.image)
> [实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。]()
![image-20210508172209942](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1a3befc52e194bd69c50fec20c909e90~tplv-k3u1fbpfcp-zoom-1.image)
```ini
基于事件时间窗口分析:
第一点、按照窗口大小和滑动大小对流式数据进行分组,划分为一个个组(窗口)
第二点、按照业务,对每个组(窗口)中数据进行聚合统计分析
StructuredStreaming中,窗口代码如何编写呢??
dataframe.groupBy(
window("envetTime: orderTime", "1 hour", "1 hour")// 划分窗口,分组
)
```
## 10-[掌握]-事件时间窗口分析之案例演示
> 修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:
![image-20210508180157471](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d7865973e7b340848f3dd6edeebb7f82~tplv-k3u1fbpfcp-zoom-1.image)
> 基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。
![image-20210508180206911](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/260ec7b9a7ef4be7a9fa750be23eb10c~tplv-k3u1fbpfcp-zoom-1.image)
为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数
据,测试数据集:
```
2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
```
官方案例演示代码如下:
```scala
package cn.itcast.spark.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
*
* EventTime即事件真正生成的时间:
* 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
* 这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
*
* 测试数据:
* 2019-10-12 09:00:02,cat dog
* 2019-10-12 09:00:03,dog dog
* 2019-10-12 09:00:07,owl cat
* 2019-10-12 09:00:11,dog
* 2019-10-12 09:00:13,owl
**/
object _06StructuredWindow {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 3. 针对获取流式DStream进行词频统计
val etlStreamDF: DataFrame = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
.filter(line => null != line && line.trim.split(",").length == 2)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
// 使用flatMap函数以后 -> (2019-10-12 09:00:02, cat) , (2019-10-12 09:00:02, dog)
.flatMap{line =>
val arr = line.trim.split(",")
arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
}
// 设置列的名称
.toDF("insert_timestamp", "word")
val resultStreamDF = etlStreamDF
// TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
/*
1. 先按照窗口分组、2. 再对窗口中按照单词分组、 3. 最后使用聚合函数聚合
*/
.groupBy(
// 先按照窗口分组数据
window($"insert_timestamp", "10 seconds", "5 seconds"),
// 在每个窗口内,再按照单词word分组
$"word"
).count()
.orderBy($"window") // 按照窗口字段降序排序
/*
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
resultStreamDF.printSchema()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
query.stop()
}
}
```
## 11-[了解]-事件时间窗口分析之event-time 窗口生成
> Structured Streaming中如何依据EventTime事件时间生成窗口的呢?
```
基于事件时间窗口分析,第一个窗口时间依据第一条流式数据的事件时间EventTime计算得到的。
```
![image-20210508180425283](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/274483a30c5a4fcf92672b2eeb6d56ef~tplv-k3u1fbpfcp-zoom-1.image)
> 直接查看源码:`org.apache.spark.sql.catalyst.analysis.TimeWindowing`
![image-20210508181426553](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/afce13edc59a4a66a7e54300de6ab008~tplv-k3u1fbpfcp-zoom-1.image)
## 12-[掌握]-水位Watermark引入及延迟数据
> 基于事件时间窗口分析,数据延迟到达,先产生的数据,后到达流式应用系统。
>
> [重新运行上面的流式计算程序,当数据延迟达到以后,发现数据会被继续处理。]()
![image-20210508181813082](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/eb34e3e3aba54a318d6b88af833fb0d5~tplv-k3u1fbpfcp-zoom-1.image)
```
此时发现应用程序逻辑处理,不合理,存在如下2个问题:
- 问题一:
延迟的数据,真的有必要在处理吗????
很多应用场景,都是没有必要处理,延迟性太高,没有实时性
- 问题二:
实时窗口统计,内存中一直保存所有窗口统计数据,真的有必要吗??
不需要的,窗口分析:统计的最近数据的状态,以前的状态几乎没有任何作用
如果流式应用程序运行很久,此时内存被严重消费,性能低下
```
> [StructuredStreaming中为了解决上述问题,提供一种机制:`Watermark水位线机制`]()
![image-20210508182630077](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/003bc24ff5644f128119cb42eaeedd13~tplv-k3u1fbpfcp-zoom-1.image)
## 13-[掌握]-水位Watermark计算及案例演示
> 如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:
![image-20210508182800640](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2dbf4aeaa57c4643a98ada5b96ba3eff~tplv-k3u1fbpfcp-zoom-1.image)
看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。
```scala
package cn.itcast.spark.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
dog,2019-10-10 12:00:07
owl,2019-10-10 12:00:08
dog,2019-10-10 12:00:14
cat,2019-10-10 12:00:09
cat,2019-10-10 12:00:15
dog,2019-10-10 12:00:08
owl,2019-10-10 12:00:13
owl,2019-10-10 12:00:21
owl,2019-10-10 12:00:17
*/
object _07StructuredWatermarkUpdate {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
// b. 导入隐式转换及函数库
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 3. 针对获取流式DataFrame设置EventTime窗口及Watermark水位限制
val etlStreamDF: DataFrame = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
// 过滤无效数据
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.map{line =>
val arr = line.trim.split(",")
(arr(0), Timestamp.valueOf(arr(1)))
}
// 设置列的名称
.toDF("word", "time")
val resultStreamDF = etlStreamDF
// TODO:设置水位Watermark
.withWatermark("time", "10 seconds")
// TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
.groupBy(
window($"time", "10 seconds", "5 seconds"),
$"word"
).count()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start() // 流式DataFrame,需要启动
// 查询器一直等待流式应用结束
query.awaitTermination()
query.stop()
}
}
```
- 点赞
- 收藏
- 关注作者
评论(0)