大数据物流项目:实时增量ETL存储Kudu代码开发(九)

举报
Maynor学长 发表于 2022/06/29 20:24:58 2022/06/29
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第13天,点击查看活动详情 Logistics_Day09:实时增量ETL存储Kudu 01-[复习]-上次课程内容回顾​ 物流项目数据实时ETL转换开发(存储Kudu数据库)部分功能:消费Kafka数据及ETL转换(JSON->Bean对象),项目开发环境搭建(初始化)。主要讲解如何对实...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第13天,点击查看活动详情

Logistics_Day09:实时增量ETL存储Kudu

1612344442449

01-[复习]-上次课程内容回顾

​ 物流项目数据实时ETL转换开发(存储Kudu数据库)部分功能:消费Kafka数据及ETL转换(JSON->Bean对象),项目开发环境搭建(初始化)。

1613783556397

主要讲解如何对实时消费业务数据进行ETL转换:
- 第一步、JSON字符串转换为Bean对象
	Canal采集:12个字段,封装到CanalMessageBean对象
	OGG采集:7个字段(INSERTDELETE6个,UPDATe:7个),封装到OggMessageBean对象
	技术实现:
		fastJson库,JSON.parseObject(jsonStr, classOf[MessageBea])

- 第二步、提取Bean对象字段值,封装到对应表的POJO对象
	以OGG采集物流系统业务数据中表:Areas表为例
	- 提取字段:
		data 数据字段:getValue
		数据操作类型:op_type(UID)
		表的名称:table
	- 业务实现步骤:
		step1、按照表的名称过滤出符合条件表的数据
			filter
		step2、提取字段值,封装到POJO对象
			map集合 -> JSON字符串 -> POJO对象,使用fastJson库
		step3、过滤掉null数据
			如果转换异常,返回值就是null

image-20210527083418198

02-[了解]-第9天:课程内容提纲

继续完成实时ETL存储到Kudu流式计算程序:

1613738409032

1、完善业务数据(物流系统和CRMETL转换
	对51张表数据进行转换POJO对象
	
2、将ETL后数据,保存“save”到Kudu表
	insert
				插入更新数据
	update
	
	delete
				删除数据

整个物流项目中,无论实时ETL开发程序,还是后续离线报表分析程序,基本上都使用SparkSQL DSL编程。

对数据进行实时ETL时,如何使用UDF函数进行转换。

03-[掌握]-实时ETL开发之CanalBean转换POJO

任务:将Canal采集数据,提取数据字段值,将其封装到POJO实体类对象中。

step1、获取表名称,进行判断,确定封装具体POJO实体类

step2、提取数据字段和数据操作类型,封装到POJO对象中

step3、过滤为解析封装完成对象,null值过滤掉

以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中

1616125077871

实现数据解析器:DataParser,将MessageBean转换为AddressBean对象
其中注意事项:Canal采集MySQL数据库表的数据中,插入数据时,可能多条数据一起操作

1616135564297

此时获取数据字段值时,需要封装到多条数据中,使用数据结构:列表List返回

1616135613491

实现方法:toAddressBean,提取数据字段,封装到POJO对象中

	// ================== 客户关系管理CRM系统业务数据解析 ==================
	// TODO: 将CRM系统业务数据:crm_address 表业务数据转换为POJO对象
	/*
		"data": [{
		"id": "10001",
		"name": "葛秋红",
		"tel": null,
		"mobile": "17*******47",
		"detail_addr": "恒大影城南侧小金庄",
		"area_id": "130903",
		"gis_addr": null,
		"cdt": "2020-02-02 18:51:39",
		"udt": "2020-02-02 18:51:39",
		"remark": null
		}]
	*/
	def toAddressBean(bean: MessageBean): List[AddressBean] = {
		// a. 转换MessageBean对象为CanalMessageBean对象
		val canalMessageBean: CanalMessageBean = getCanalMessageBean(bean)
		// b. 获取数据操作类型的值
		val opType: String = getOpType(canalMessageBean.getType)
		// c. 获取数据值,类型为List列表
		val datasValue: util.List[util.Map[String, AnyRef]] = canalMessageBean.getData
		// d. 封装数据值和操作类型到POJO对象中
		// list -> json
		val datasJson = JSON.toJSONString(datasValue, true)
		println(datasJson)
		// json -> pojo
		val list: util.List[AddressBean] = JSON.parseArray(datasJson, classOf[AddressBean])
		// 如果不为空,有元素,设置数据操作类型
		if(! CollectionUtils.isEmpty(list)){
			// 将Java中列表转换为Scala中列表
			import scala.collection.JavaConverters._
			list.asScala.map{bean =>
				bean.setOpType(opType)
				bean
			}.toList
		}else{
           // e. 返回POJO对象
			Nil // 空列表
        }
	}

运行流式计算程序,在MySQL数据库中更新数据和删除数据,查看控制台打印结果。

1616136327618

04-[掌握]-实时ETL开发之转换POJO【重构代码】

前面已经完成对OGG采集数据和Canal采集数据,分别进行转换操作,最后转换为具体某个表的POJO对象:

  • 1)、OGG采集数据,总共48张表,仅仅完成一张表
  • 2)、Canal采集数据,总共3张表,仅仅完成一张表

物流系统Logistics(OGG采集)和CRM系统(Canal采集),其他所有的表,都是按照上述方式进行转换,封装提取到对应POJO对象中,方便保存数据到外部存储引擎。

需要对代码进行重构:

1616136999090

完整代码如下:

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm.AddressBean
import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */
object KuduStreamApp extends BasicStreamApp {
	
	/**
	 * 数据的处理,仅仅实现JSON -> MessageBean
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
	 * @return 流式数据集StreamingDataFrame
	 */
	override def process(streamDF: DataFrame, category: String): DataFrame = {
		// 导入隐式转换
		import streamDF.sparkSession.implicits._
		
		val etlStreamDF: DataFrame = category match {
			// TODO: 物流系统业务数据,OGG采集数据
			case "logistics" =>
				val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
					// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
					.as[String]
					// 过滤数据
					.filter(msg => null != msg && msg.trim.length > 0)
					// 解析每条数据
					.map{
						msg => JSON.parseObject(msg, classOf[OggMessageBean])
					}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
				
				// 返回转换后的数据
				oggBeanStreamDS.toDF()
			// TODO: CRM系统业务数据
			case "crm" =>
				val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
					// 过滤数据
					.filter(row => !row.isNullAt(0))
					// 解析数据,对分区数据操作
					.mapPartitions { iter =>
						iter.map { row =>
							val jsonValue: String = row.getAs[String]("value")
							// 解析JSON字符串
							JSON.parseObject(jsonValue, classOf[CanalMessageBean])
						}
					}
			
				// 返回转换后的数据
				canalBeanStreamDS.toDF()
				
			// TODO: 其他业务系统数据
			case _ => streamDF
		}
		// 返回ETL转换后的数据
		etlStreamDF
	}
	
	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
		streamDF.writeStream
			.queryName(s"query-${tableName}")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "100")
			.option("truncate", "false")
			.start()
	}
	
	/**
	 * 物流Logistics系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlLogistics(streamDF: DataFrame): Unit = {
		
		// 转换DataFrame为Dataset
		val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
		
		/*
			针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
		*/
		val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
			// 1)、依据table字段判断数据:tbl_areas
			.filter(bean => bean.getTable.equals(TableMapping.AREAS))
			// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
			.map(bean => DataParser.toAreaBean(bean))
			// 3)、过滤掉转换为null数据
			.filter(pojo => null != pojo)
		save(areaPojoStreamDS.toDF(), "logistics-console")
		
		
	}
	
	/**
	 * 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlCrm(streamDF: DataFrame): Unit = {
		
		// 将DataFrame转换为Dataset
		val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
		/*
			以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
		 */
		val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
			// step1、获取表名称,进行判断,确定封装具体POJO实体类
			//.filter($"table" === TableMapping.ADDRESS)
			.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
			// step2、提取数据字段和数据操作类型,封装到POJO对象中
			.flatMap(bean => DataParser.toAddressBean(bean))
			// step3、过滤为解析封装完成对象,null值过滤掉
			.filter(pojo => null != pojo)
		save(addressPojoStreamDS.toDF(), "crm-console")
		
	}
	
	/*
			实时Kudu ETL应用程序入口,数据处理逻辑步骤:
				step1. 创建SparkSession实例对象,传递SparkConf
				step2. 从Kafka数据源实时消费数据
				step3. 对获取Json数据进行ETL转换
				step4. 保存转换后数据到外部存储
				step5. 应用启动以后,等待终止结束
			*/
	def main(args: Array[String]): Unit = {
		// step1. 创建SparkSession实例对象,传递SparkConf
		val spark: SparkSession = SparkUtils.createSparkSession(
			SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
		)
		
		// step2. 从Kafka数据源实时消费数据
		// 物流系统Topic数据
		val logisticsDF: DataFrame = load(spark, "logistics")
		val crmDF: DataFrame = load(spark, "crm")
		
		// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
		val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
		val etlCrmDF: DataFrame = process(crmDF, "crm")
		
		// step4. 保存转换后数据到外部存储
		//save(etlLogisticsDF, "logistics-console")
		//save(etlCrmDF, "crm-console")
		
		/*
			TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
		 */
		etlLogistics(etlLogisticsDF)
		etlCrm(etlCrmDF)
		
		// step5. 应用启动以后,等待终止结束
		spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
		spark.streams.awaitAnyTermination()
	}
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。