大数据物流项目:实时增量ETL存储Kudu代码开发(九)
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第13天,点击查看活动详情
Logistics_Day09:实时增量ETL存储Kudu
01-[复习]-上次课程内容回顾
物流项目数据实时ETL转换开发(存储Kudu数据库)部分功能:消费Kafka数据及ETL转换(JSON->Bean对象),项目开发环境搭建(初始化)。
主要讲解如何对实时消费业务数据进行ETL转换:
- 第一步、JSON字符串转换为Bean对象
Canal采集:12个字段,封装到CanalMessageBean对象
OGG采集:7个字段(INSERT和DELETE:6个,UPDATe:7个),封装到OggMessageBean对象
技术实现:
fastJson库,JSON.parseObject(jsonStr, classOf[MessageBea])
- 第二步、提取Bean对象字段值,封装到对应表的POJO对象
以OGG采集物流系统业务数据中表:Areas表为例
- 提取字段:
data 数据字段:getValue
数据操作类型:op_type(U、I、D)
表的名称:table
- 业务实现步骤:
step1、按照表的名称过滤出符合条件表的数据
filter
step2、提取字段值,封装到POJO对象
map集合 -> JSON字符串 -> POJO对象,使用fastJson库
step3、过滤掉null数据
如果转换异常,返回值就是null
02-[了解]-第9天:课程内容提纲
继续完成实时ETL存储到Kudu流式计算程序:
1、完善业务数据(物流系统和CRM)ETL转换
对51张表数据进行转换POJO对象
2、将ETL后数据,保存“save”到Kudu表
insert
插入更新数据
update
delete
删除数据
整个物流项目中,无论实时ETL开发程序,还是后续离线报表分析程序,基本上都使用SparkSQL DSL编程。
03-[掌握]-实时ETL开发之CanalBean转换POJO
任务:将Canal采集数据,提取数据字段值,将其封装到POJO实体类对象中。
step1、获取表名称,进行判断,确定封装具体POJO实体类
step2、提取数据字段和数据操作类型,封装到POJO对象中
step3、过滤为解析封装完成对象,null值过滤掉
以CRM系统中表:
crm_address
为例,从CanalMessageBean中提取字段值,将其封装到AddressBean
对象中
实现数据解析器:
DataParser
,将MessageBean
转换为AddressBean对象
其中注意事项:Canal采集MySQL数据库表的数据中,插入数据时,可能多条数据一起操作
此时获取数据字段值时,需要封装到多条数据中,使用数据结构:列表List返回
实现方法:
toAddressBean
,提取数据字段,封装到POJO对象中
// ================== 客户关系管理CRM系统业务数据解析 ==================
// TODO: 将CRM系统业务数据:crm_address 表业务数据转换为POJO对象
/*
"data": [{
"id": "10001",
"name": "葛秋红",
"tel": null,
"mobile": "17*******47",
"detail_addr": "恒大影城南侧小金庄",
"area_id": "130903",
"gis_addr": null,
"cdt": "2020-02-02 18:51:39",
"udt": "2020-02-02 18:51:39",
"remark": null
}]
*/
def toAddressBean(bean: MessageBean): List[AddressBean] = {
// a. 转换MessageBean对象为CanalMessageBean对象
val canalMessageBean: CanalMessageBean = getCanalMessageBean(bean)
// b. 获取数据操作类型的值
val opType: String = getOpType(canalMessageBean.getType)
// c. 获取数据值,类型为List列表
val datasValue: util.List[util.Map[String, AnyRef]] = canalMessageBean.getData
// d. 封装数据值和操作类型到POJO对象中
// list -> json
val datasJson = JSON.toJSONString(datasValue, true)
println(datasJson)
// json -> pojo
val list: util.List[AddressBean] = JSON.parseArray(datasJson, classOf[AddressBean])
// 如果不为空,有元素,设置数据操作类型
if(! CollectionUtils.isEmpty(list)){
// 将Java中列表转换为Scala中列表
import scala.collection.JavaConverters._
list.asScala.map{bean =>
bean.setOpType(opType)
bean
}.toList
}else{
// e. 返回POJO对象
Nil // 空列表
}
}
运行流式计算程序,在MySQL数据库中更新数据和删除数据,查看控制台打印结果。
04-[掌握]-实时ETL开发之转换POJO【重构代码】
前面已经完成对OGG采集数据和Canal采集数据,分别进行转换操作,最后转换为具体某个表的POJO对象:
- 1)、OGG采集数据,总共48张表,仅仅完成一张表
- 2)、Canal采集数据,总共3张表,仅仅完成一张表
物流系统Logistics(OGG采集)和CRM系统(Canal采集),其他所有的表,都是按照上述方式进行转换,封装提取到对应POJO对象中,方便保存数据到外部存储引擎。
需要对代码进行重构:
第一点、无论是OGG采集还是Canal采集,首先都是将JSON字符串,转换为MessageBean对象
使用
process
方法进行转换即可第二点、依据表名称提取对应数据,封装数据到POJO对象中,最后保存外部
- 首先、提取数据字段,封装POJO对象
- 然后,保存POJO对象到外部存储系统,比如Kudu、ES、CK等
针对具体业务系统,提取方法,每个方法中,针对具体表进行转换和保存操作
- 物流系统方法:
etlLogistics
,依据表名称获取对应数据,封装到POJO,再保存- CRM系统方法:
etlCrm
,依据表名称获取对应数据,封装到POJO,再保存
完整代码如下:
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm.AddressBean
import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理,仅仅实现JSON -> MessageBean
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = {
// 导入隐式转换
import streamDF.sparkSession.implicits._
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据,OGG采集数据
case "logistics" =>
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
.as[String]
// 过滤数据
.filter(msg => null != msg && msg.trim.length > 0)
// 解析每条数据
.map{
msg => JSON.parseObject(msg, classOf[OggMessageBean])
}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
// 返回转换后的数据
oggBeanStreamDS.toDF()
// TODO: CRM系统业务数据
case "crm" =>
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
// 过滤数据
.filter(row => !row.isNullAt(0))
// 解析数据,对分区数据操作
.mapPartitions { iter =>
iter.map { row =>
val jsonValue: String = row.getAs[String]("value")
// 解析JSON字符串
JSON.parseObject(jsonValue, classOf[CanalMessageBean])
}
}
// 返回转换后的数据
canalBeanStreamDS.toDF()
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/**
* 物流Logistics系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlLogistics(streamDF: DataFrame): Unit = {
// 转换DataFrame为Dataset
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
/*
针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
*/
val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
// 1)、依据table字段判断数据:tbl_areas
.filter(bean => bean.getTable.equals(TableMapping.AREAS))
// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
.map(bean => DataParser.toAreaBean(bean))
// 3)、过滤掉转换为null数据
.filter(pojo => null != pojo)
save(areaPojoStreamDS.toDF(), "logistics-console")
}
/**
* 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlCrm(streamDF: DataFrame): Unit = {
// 将DataFrame转换为Dataset
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
/*
以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
*/
val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
// step1、获取表名称,进行判断,确定封装具体POJO实体类
//.filter($"table" === TableMapping.ADDRESS)
.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
// step2、提取数据字段和数据操作类型,封装到POJO对象中
.flatMap(bean => DataParser.toAddressBean(bean))
// step3、过滤为解析封装完成对象,null值过滤掉
.filter(pojo => null != pojo)
save(addressPojoStreamDS.toDF(), "crm-console")
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(crmDF, "crm")
// step4. 保存转换后数据到外部存储
//save(etlLogisticsDF, "logistics-console")
//save(etlCrmDF, "crm-console")
/*
TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
*/
etlLogistics(etlLogisticsDF)
etlCrm(etlCrmDF)
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
- 点赞
- 收藏
- 关注作者
评论(0)