大数据物流项目:Structured Streaming(以结构化方式处理流式数据,底层分析引擎SparkSQL)(十四点五)
【摘要】 ## 09-[掌握]-事件时间窗口分析之原理剖析> 在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:```1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操...
## 09-[掌握]-事件时间窗口分析之原理剖析
> 在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:
```
1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中
2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;
3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。
```

> [实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。]()

```ini
基于事件时间窗口分析:
第一点、按照窗口大小和滑动大小对流式数据进行分组,划分为一个个组(窗口)
第二点、按照业务,对每个组(窗口)中数据进行聚合统计分析
StructuredStreaming中,窗口代码如何编写呢??
dataframe.groupBy(
window("envetTime: orderTime", "1 hour", "1 hour")// 划分窗口,分组
)
```
## 10-[掌握]-事件时间窗口分析之案例演示
> 修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:

> 基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。

为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数
据,测试数据集:
```
2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
```
官方案例演示代码如下:
```scala
package cn.itcast.spark.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
*
* EventTime即事件真正生成的时间:
* 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
* 这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
*
* 测试数据:
* 2019-10-12 09:00:02,cat dog
* 2019-10-12 09:00:03,dog dog
* 2019-10-12 09:00:07,owl cat
* 2019-10-12 09:00:11,dog
* 2019-10-12 09:00:13,owl
**/
object _06StructuredWindow {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 3. 针对获取流式DStream进行词频统计
val etlStreamDF: DataFrame = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
.filter(line => null != line && line.trim.split(",").length == 2)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
// 使用flatMap函数以后 -> (2019-10-12 09:00:02, cat) , (2019-10-12 09:00:02, dog)
.flatMap{line =>
val arr = line.trim.split(",")
arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
}
// 设置列的名称
.toDF("insert_timestamp", "word")
val resultStreamDF = etlStreamDF
// TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
/*
1. 先按照窗口分组、2. 再对窗口中按照单词分组、 3. 最后使用聚合函数聚合
*/
.groupBy(
// 先按照窗口分组数据
window($"insert_timestamp", "10 seconds", "5 seconds"),
// 在每个窗口内,再按照单词word分组
$"word"
).count()
.orderBy($"window") // 按照窗口字段降序排序
/*
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
resultStreamDF.printSchema()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
query.stop()
}
}
```
## 11-[了解]-事件时间窗口分析之event-time 窗口生成
> Structured Streaming中如何依据EventTime事件时间生成窗口的呢?
```
基于事件时间窗口分析,第一个窗口时间依据第一条流式数据的事件时间EventTime计算得到的。
```

> 直接查看源码:`org.apache.spark.sql.catalyst.analysis.TimeWindowing`

## 12-[掌握]-水位Watermark引入及延迟数据
> 基于事件时间窗口分析,数据延迟到达,先产生的数据,后到达流式应用系统。
>
> [重新运行上面的流式计算程序,当数据延迟达到以后,发现数据会被继续处理。]()

```
此时发现应用程序逻辑处理,不合理,存在如下2个问题:
- 问题一:
延迟的数据,真的有必要在处理吗????
很多应用场景,都是没有必要处理,延迟性太高,没有实时性
- 问题二:
实时窗口统计,内存中一直保存所有窗口统计数据,真的有必要吗??
不需要的,窗口分析:统计的最近数据的状态,以前的状态几乎没有任何作用
如果流式应用程序运行很久,此时内存被严重消费,性能低下
```
> [StructuredStreaming中为了解决上述问题,提供一种机制:`Watermark水位线机制`]()

## 13-[掌握]-水位Watermark计算及案例演示
> 如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:

看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。
```scala
package cn.itcast.spark.window
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
dog,2019-10-10 12:00:07
owl,2019-10-10 12:00:08
dog,2019-10-10 12:00:14
cat,2019-10-10 12:00:09
cat,2019-10-10 12:00:15
dog,2019-10-10 12:00:08
owl,2019-10-10 12:00:13
owl,2019-10-10 12:00:21
owl,2019-10-10 12:00:17
*/
object _07StructuredWatermarkUpdate {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
// b. 导入隐式转换及函数库
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 3. 针对获取流式DataFrame设置EventTime窗口及Watermark水位限制
val etlStreamDF: DataFrame = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
// 过滤无效数据
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.map{line =>
val arr = line.trim.split(",")
(arr(0), Timestamp.valueOf(arr(1)))
}
// 设置列的名称
.toDF("word", "time")
val resultStreamDF = etlStreamDF
// TODO:设置水位Watermark
.withWatermark("time", "10 seconds")
// TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
.groupBy(
window($"time", "10 seconds", "5 seconds"),
$"word"
).count()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start() // 流式DataFrame,需要启动
// 查询器一直等待流式应用结束
query.awaitTermination()
query.stop()
}
}
```
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)