大数据物流项目:Structured Streaming(内置数据源、自定义Sink(2种方式)和集成Kafka)(十三)

举报
Maynor学长 发表于 2022/06/29 20:42:32 2022/06/29
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第17天,点击查看活动详情 Spark Day13:Structured Streaming 01-[了解]-上次课程内容回顾主要讲解2个方面内容:SparkStreaming中偏移量管理和StructuredStreaming快速入门1、SparkStreaming中偏移量管理 - 统计类...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第17天,点击查看活动详情

Spark Day13:Structured Streaming

image-20210506095958240

01-[了解]-上次课程内容回顾

主要讲解2个方面内容:SparkStreaming中偏移量管理和StructuredStreaming快速入门

1、SparkStreaming中偏移量管理
	- 统计类型应用,重启以后如何继续运行
		状态State
		继续消费Kafka数据(偏移量)
	- Checkpoint 检查点
		当流式应用再次重启运行时,从检查点目录构建应用程序(StreamingContext对象)
		StreamingContext.getActiveOrCreate(ckptDir, () => StreamingContext)
	- 手动管理偏移量
		可以将流式应用每次消费Kafka数据,偏移量存储外部系统中,比如MySQL数据库表、Zookeeper或HBase等
		演示:将偏移量保存到MySQL表中
        	表的设计:
        		groupId、topic、partition、offset
        编写工具类:
        	读取表中偏移量
        	保存每批次消费后偏移量

image-20210507150501579

2、StructuredStreaming
	SparkStreaming 不足
		。。。。。
	StructuredStreaming 设计思想
		。。。。。。
	Spark2.0提供新型的流式计算框架,以结构化方式处理流式数据,将流式数据封装到Dataset/DataFrame中
	思想:
		将流式数据当做一个无界表,流式数据源源不断追加到表中,当表中有数据时,立即进行增量处理分析,最终按照设置输出模式,将结果数据输出
	模型:
		第一层、无界表,输入表:input table
		第二层、增量查询,默认情况一有(1条数据或者多条数据)数据就查询
			本质上还是微批处理
		第三层、结果表:result table
			增量查询时,会将结果表以前的数据进行合并:state状态更新
		第四层、输出数据
			按照OutputMode,将结果表的数据进行输出
			- Append,默认值,追加数据
			- Update,当结果表有数据更新再输出
			- Complete,不管三七二十一,直接将结果表数据全部输出
	入门案例
		第一步、运行官方案例,从netcat消费数据,进行词频统计,打印控制台
		第二步、编写程序,实现功能
			SparkSession程序入口,加载流式数据spark.readStream,封装到流式数据集DataFrame
				分析数据,直接使用DSL编程或者SQL编程
			输出结果数据
				val query: StreamingQuery = streamDF.writeStream.xxx.start()  // 启动流式应用
				query.awaitTermination()
				query.stop()

02-[掌握]-词频统计WordCount(SQL编程)

修改词频统计WordCount代码,使用SQL分析处理,具体代码如下:

package cn.itcast.spark.start

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
	 * 第一点、程序入口SparkSession,加载流式数据:spark.readStream
	 * 第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
	 * 第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
 */
object _01StructuredWordCountSQL {
	
	def main(args: Array[String]): Unit = {
		
		// TODO: step1. 构建SparkSession实例对象,相关配置进行设置
		val spark: SparkSession = SparkSession.builder()
    		.appName(this.getClass.getSimpleName.stripSuffix("$"))
    		.master("local[2]")
    		.config("spark.sql.shuffle.partitions", "2")
    		.getOrCreate()
		import spark.implicits._
		
		// TODO: step2. 从TCP Socket加载数据,读取数据列名称为value,类型是String
		val inputStreamDF: DataFrame = spark.readStream
    		.format("socket")
    		.option("host", "node1.itcast.cn")
    		.option("port", "9999")
    		.load()
		/*
		root
			|-- value: string (nullable = true)
		 */
		//inputStreamDF.printSchema()
		
		// TODO: step3. 进行词频统计,基于SQL分析
		// 第一步、将DataFrame注册为临时视图
		inputStreamDF.createOrReplaceTempView("view_temp_lines")
		// 第二步、编写SQL语句并执行
		val resultStreamDF: DataFrame = spark.sql(
			"""
			  |WITH tmp AS (
			  |  SELECT explode(split(trim(value), '\\s+')) AS word FROM view_temp_lines
			  |)
			  |SELECT word, COUNT(1) AS count FROM tmp GROUP BY word
			  |""".stripMargin)
		/*
		root
		 |-- word: string (nullable = true)
		 |-- count: long (nullable = false)
		 */
		
		// TODO: step4. 将结果输出(ResultTable结果输出,此时需要设置输出模式)
		val query: StreamingQuery = resultStreamDF.writeStream
			.outputMode(OutputMode.Update()) // 表示当词频更新时,再输出
			.format("console")
    		.option("numRows", "10")
    		.option("truncate", "false")
			// 启动流式应用
			.start()
		
		// TODO: step5. 启动流式应用后,等待终止
		query.awaitTermination()
		query.stop()
	}
	
}

03-[了解]-今日课程内容提纲

主要3个方面内容:内置数据源、自定义Sink(2种方式)和集成Kafka

1、内置数据源【了解】
	File Source,监控某个目录下新的文件数据
	Rate Source,产生随机数据数据源

2、StreamingQuery 流式查询器基本属性设置【理解】
	名称
	触发时间间隔
	检查点
	输出模式
	如何保存流式应用End-To-End精确性一次语义

3、集成Kafka【掌握】
	结构化流从Kafka消费数据,封装为DataFrame;将流式数据集DataFrame保存到Kafka Topic
	- 数据源Source
	- 数据终端Sink

image-20210507153929371

04-[了解]-内置数据源之File Source 使用

​ 从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。

image-20210507154039375

在Structured Streaming中使用SparkSession#readStream读取流式数据,返回DataStreamReader对象,指定读取数据源相关信息,声明如下:

image-20210507154218794

查看DataStreamReader中方法可以发现与DataFrameReader中基本一致,编码上更加方便加载流式数据。

image-20210507154306506

文件数据源(File Source):将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet

image-20210507154529822

可以设置相关可选参数:

image-20210507154553532

演示范例:监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。

package cn.itcast.spark.source

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

/**
 * 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
 */
object _02StructuredFileSource {
	
	def main(args: Array[String]): Unit = {
		// 构建SparkSession实例对象,相关配置进行设置
		val spark: SparkSession = SparkSession.builder()
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.master("local[2]")
			// 设置Shuffle时分区数目
			.config("spark.sql.shuffle.partitions", "2")
			.getOrCreate()
		import spark.implicits._
		
		// TODO: 从文件数据源加载数据,本质就是监控目录
		val schema: StructType = new StructType()
			.add("name", StringType, nullable = true)
			.add("age", IntegerType, nullable = true)
			.add("hobby", StringType, nullable = true)
		val inputStreamDF: DataFrame = spark.readStream
			.schema(schema)
    		.option("sep", ";")
			.csv("file:///D:/datas/")
		
		// TODO: 监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。
		val resultStreamDF: DataFrame = inputStreamDF
			// 年龄小于25岁
    		.filter($"age" < 25)
			// 按照爱好分组,统计个数
    		.groupBy($"hobby").count()
			// 排行榜,依据个数降序排序
    		.orderBy($"count".desc)
		
		// TODO: 将结果输出(ResultTable结果输出,此时需要设置输出模式)
		val query: StreamingQuery = resultStreamDF.writeStream
			.outputMode(OutputMode.Complete()) // 当数据更新时再进行输出: mapWithState
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		// 启动流式应用后,等待终止
		query.awaitTermination()
		query.stop()
	}
	
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。