大数据物流项目:Structured Streaming(以结构化方式处理流式数据,底层分析引擎SparkSQL)(十四点五)

举报
Maynor学长 发表于 2022/06/29 20:48:06 2022/06/29
【摘要】 ## 09-[掌握]-事件时间窗口分析之原理剖析> 在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:```1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操...


## 09-[掌握]-事件时间窗口分析之原理剖析

> 在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:

```
1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中
2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;
3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。
```

![image-20210508171538191](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e97e75f85c4b49fdaf88a9b2b6ca21c6~tplv-k3u1fbpfcp-zoom-1.image)


> [实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。]()

![image-20210508172209942](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1a3befc52e194bd69c50fec20c909e90~tplv-k3u1fbpfcp-zoom-1.image)

```ini
基于事件时间窗口分析:
    第一点、按照窗口大小和滑动大小对流式数据进行分组,划分为一个个组(窗口)
    第二点、按照业务,对每个组(窗口)中数据进行聚合统计分析

StructuredStreaming中,窗口代码如何编写呢??
    dataframe.groupBy(
        window("envetTime: orderTime", "1 hour", "1 hour")// 划分窗口,分组
    )
```


## 10-[掌握]-事件时间窗口分析之案例演示

> ​        修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:

![image-20210508180157471](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d7865973e7b340848f3dd6edeebb7f82~tplv-k3u1fbpfcp-zoom-1.image)

> 基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。

![image-20210508180206911](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/260ec7b9a7ef4be7a9fa750be23eb10c~tplv-k3u1fbpfcp-zoom-1.image)

​        为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数
据,测试数据集:

```
2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
```

官方案例演示代码如下:

```scala
package cn.itcast.spark.window

import java.sql.Timestamp

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
 *  TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
 *
 * EventTime即事件真正生成的时间:
 * 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
 *  这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
 *
 * 测试数据:
     * 2019-10-12 09:00:02,cat dog
     * 2019-10-12 09:00:03,dog dog
     * 2019-10-12 09:00:07,owl cat
     * 2019-10-12 09:00:11,dog
     * 2019-10-12 09:00:13,owl
 **/
object _06StructuredWindow {
    
    def main(args: Array[String]): Unit = {
        // 1. 构建SparkSession实例对象,传递sparkConf参数
        val spark: SparkSession = SparkSession.builder()
            .appName(this.getClass.getSimpleName.stripSuffix("$"))
            .master("local[2]")
            .config("spark.sql.shuffle.partitions", "2")
            .getOrCreate()
        import org.apache.spark.sql.functions._
        import spark.implicits._
        
        // 2. 使用SparkSession从TCP Socket读取流式数据
        val inputStreamDF: DataFrame = spark.readStream
            .format("socket")
            .option("host", "node1.itcast.cn")
            .option("port", 9999)
            .load()
        
        // 3. 针对获取流式DStream进行词频统计
        val etlStreamDF: DataFrame = inputStreamDF
            // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
            .as[String]
            .filter(line => null != line && line.trim.split(",").length == 2)
            // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
            // 使用flatMap函数以后 -> (2019-10-12 09:00:02, cat)  ,  (2019-10-12 09:00:02, dog)
            .flatMap{line =>
                val arr = line.trim.split(",")
                arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
            }
            // 设置列的名称
            .toDF("insert_timestamp", "word")
        
        val resultStreamDF = etlStreamDF
            // TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
            /*
                1. 先按照窗口分组、2. 再对窗口中按照单词分组、 3. 最后使用聚合函数聚合
            */
            .groupBy(
                // 先按照窗口分组数据
                window($"insert_timestamp", "10 seconds", "5 seconds"),
                // 在每个窗口内,再按照单词word分组
                $"word"
            ).count()
            .orderBy($"window") // 按照窗口字段降序排序
        
        /*
        root
            |-- window: struct (nullable = true)
            | |-- start: timestamp (nullable = true)
            | |-- end: timestamp (nullable = true)
            |-- word: string (nullable = true)
            |-- count: long (nullable = false)
        */
        resultStreamDF.printSchema()
        
        // 4. 将计算的结果输出,打印到控制台
        val query: StreamingQuery = resultStreamDF.writeStream
            .outputMode(OutputMode.Complete())
            .format("console")
            .option("numRows", "100")
            .option("truncate", "false")
            .trigger(Trigger.ProcessingTime("5 seconds"))
            .start()
        query.awaitTermination()
        query.stop()
        
    }
    
}

```


## 11-[了解]-事件时间窗口分析之event-time 窗口生成

> Structured Streaming中如何依据EventTime事件时间生成窗口的呢?

```
基于事件时间窗口分析,第一个窗口时间依据第一条流式数据的事件时间EventTime计算得到的。
```

![image-20210508180425283](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/274483a30c5a4fcf92672b2eeb6d56ef~tplv-k3u1fbpfcp-zoom-1.image)

> 直接查看源码:`org.apache.spark.sql.catalyst.analysis.TimeWindowing`

![image-20210508181426553](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/afce13edc59a4a66a7e54300de6ab008~tplv-k3u1fbpfcp-zoom-1.image)



## 12-[掌握]-水位Watermark引入及延迟数据

> 基于事件时间窗口分析,数据延迟到达,先产生的数据,后到达流式应用系统。
>
> [重新运行上面的流式计算程序,当数据延迟达到以后,发现数据会被继续处理。]()

![image-20210508181813082](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/eb34e3e3aba54a318d6b88af833fb0d5~tplv-k3u1fbpfcp-zoom-1.image)

```
此时发现应用程序逻辑处理,不合理,存在如下2个问题:
- 问题一:
    延迟的数据,真的有必要在处理吗????
        很多应用场景,都是没有必要处理,延迟性太高,没有实时性

- 问题二:
    实时窗口统计,内存中一直保存所有窗口统计数据,真的有必要吗??
        不需要的,窗口分析:统计的最近数据的状态,以前的状态几乎没有任何作用
    如果流式应用程序运行很久,此时内存被严重消费,性能低下
```

> [StructuredStreaming中为了解决上述问题,提供一种机制:`Watermark水位线机制`]()

![image-20210508182630077](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/003bc24ff5644f128119cb42eaeedd13~tplv-k3u1fbpfcp-zoom-1.image)



## 13-[掌握]-水位Watermark计算及案例演示

> 如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:

![image-20210508182800640](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2dbf4aeaa57c4643a98ada5b96ba3eff~tplv-k3u1fbpfcp-zoom-1.image)

看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。

```scala
package cn.itcast.spark.window

import java.sql.Timestamp

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
 *         TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
            dog,2019-10-10 12:00:07
            owl,2019-10-10 12:00:08
            
            dog,2019-10-10 12:00:14
            cat,2019-10-10 12:00:09
            
            cat,2019-10-10 12:00:15
            dog,2019-10-10 12:00:08
            owl,2019-10-10 12:00:13
            owl,2019-10-10 12:00:21
            
            owl,2019-10-10 12:00:17
 */
object _07StructuredWatermarkUpdate {
    
    def main(args: Array[String]): Unit = {
        
        // 1. 构建SparkSession实例对象,传递sparkConf参数
        val spark: SparkSession =  SparkSession.builder()
            .appName(this.getClass.getSimpleName.stripSuffix("$"))
            .master("local[2]")
            .config("spark.sql.shuffle.partitions", "2")
            .getOrCreate()
        // b. 导入隐式转换及函数库
        import org.apache.spark.sql.functions._
        import spark.implicits._
        
        // 2. 使用SparkSession从TCP Socket读取流式数据
        val inputStreamDF: DataFrame = spark.readStream
            .format("socket")
            .option("host", "node1.itcast.cn")
            .option("port", 9999)
            .load()
        
        // 3. 针对获取流式DataFrame设置EventTime窗口及Watermark水位限制
        val etlStreamDF: DataFrame = inputStreamDF
            // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
            .as[String]
            // 过滤无效数据
            .filter(line => null != line && line.trim.length > 0)
            // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
            .map{line =>
                val arr = line.trim.split(",")
                (arr(0), Timestamp.valueOf(arr(1)))
            }
            // 设置列的名称
            .toDF("word", "time")
            
        val resultStreamDF = etlStreamDF
            // TODO:设置水位Watermark
            .withWatermark("time", "10 seconds")
            // TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
            .groupBy(
                window($"time", "10 seconds", "5 seconds"),
                $"word"
            ).count()
        
        // 4. 将计算的结果输出,打印到控制台
        val query: StreamingQuery = resultStreamDF.writeStream
            .outputMode(OutputMode.Update())
            .format("console")
            .option("numRows", "100")
            .option("truncate", "false")
            .trigger(Trigger.ProcessingTime("5 seconds"))
            .start()  // 流式DataFrame,需要启动
        // 查询器一直等待流式应用结束
        query.awaitTermination()
        query.stop()
    }
    
}

```

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。