大数据物流项目:实时增量ETL存储Kudu代码开发(九点五)
05-[掌握]-实时ETL开发之Bean转换POJO【编程测试】
任务:==首先将物流系统和CRM系统中其他表的数据过滤出来,提取数据字段值,封装到POJO对象,保存外部存储。==
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser._
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理,仅仅实现JSON -> MessageBean
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = {
// 导入隐式转换
import streamDF.sparkSession.implicits._
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据,OGG采集数据
case "logistics" =>
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
.as[String]
// 过滤数据
.filter(msg => null != msg && msg.trim.length > 0)
// 解析每条数据
.map{
msg => JSON.parseObject(msg, classOf[OggMessageBean])
}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
// 返回转换后的数据
oggBeanStreamDS.toDF()
// TODO: CRM系统业务数据
case "crm" =>
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
// 过滤数据
.filter(row => !row.isNullAt(0))
// 解析数据,对分区数据操作
.mapPartitions { iter =>
iter.map { row =>
val jsonValue: String = row.getAs[String]("value")
// 解析JSON字符串
JSON.parseObject(jsonValue, classOf[CanalMessageBean])
}
}
// 返回转换后的数据
canalBeanStreamDS.toDF()
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/**
* 物流Logistics系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlLogistics(streamDF: DataFrame): Unit = {
// 转换DataFrame为Dataset
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
/*
针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
*/
val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
// 1)、依据table字段判断数据:tbl_areas
.filter(bean => bean.getTable.equals(TableMapping.AREAS))
// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
.map(bean => DataParser.toAreaBean(bean))
// 3)、过滤掉转换为null数据
.filter(pojo => null != pojo)
save(areaPojoStreamDS.toDF(), TableMapping.AREAS)
val warehouseSendVehicleStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_SEND_VEHICLE)
.map(bean => DataParser.toWarehouseSendVehicle(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseSendVehicleStreamDF, TableMapping.WAREHOUSE_SEND_VEHICLE)
val waybillLineStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAYBILL_LINE)
.map(bean => DataParser.toWaybillLine(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillLineStreamDF, TableMapping.WAYBILL_LINE)
val chargeStandardStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CHARGE_STANDARD)
.map(bean => DataParser.toChargeStandard(bean))
.filter( pojo => null != pojo)
.toDF()
save(chargeStandardStreamDF, TableMapping.CHARGE_STANDARD)
val codesStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CODES)
.map(bean => DataParser.toCodes(bean))
.filter( pojo => null != pojo)
.toDF()
save(codesStreamDF, TableMapping.CODES)
val collectPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COLLECT_PACKAGE)
.map(bean => DataParser.toCollectPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(collectPackageStreamDF, TableMapping.COLLECT_PACKAGE)
val companyStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY)
.map(bean => DataParser.toCompany(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyStreamDF, TableMapping.COMPANY)
val companyDotMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_DOT_MAP)
.map(bean => DataParser.toCompanyDotMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyDotMapStreamDF, TableMapping.COMPANY_DOT_MAP)
val companyTransportRouteMaStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
.map(bean => DataParser.toCompanyTransportRouteMa(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyTransportRouteMaStreamDF, TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
val companyWarehouseMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_WAREHOUSE_MAP)
.map(bean => DataParser.toCompanyWarehouseMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyWarehouseMapStreamDF, TableMapping.COMPANY_WAREHOUSE_MAP)
val consumerSenderInfoStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CONSUMER_SENDER_INFO)
.map(bean => DataParser.toConsumerSenderInfo(bean))
.filter( pojo => null != pojo)
.toDF()
save(consumerSenderInfoStreamDF, TableMapping.CONSUMER_SENDER_INFO)
val courierStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COURIER)
.map(bean => DataParser.toCourier(bean))
.filter( pojo => null != pojo)
.toDF()
save(courierStreamDF, TableMapping.COURIER)
val deliverPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVER_PACKAGE)
.map(bean => DataParser.toDeliverPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliverPackageStreamDF, TableMapping.DELIVER_PACKAGE)
val deliverRegionStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVER_REGION)
.map(bean => DataParser.toDeliverRegion(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliverRegionStreamDF, TableMapping.DELIVER_REGION)
val deliveryRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVERY_RECORD)
.map(bean => DataParser.toDeliveryRecord(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliveryRecordStreamDF, TableMapping.DELIVERY_RECORD)
val departmentStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DEPARTMENT)
.map(bean => DataParser.toDepartment(bean))
.filter( pojo => null != pojo)
.toDF()
save(departmentStreamDF, TableMapping.DEPARTMENT)
val dotStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DOT)
.map(bean => DataParser.toDot(bean))
.filter( pojo => null != pojo)
.toDF()
save(dotStreamDF, TableMapping.DOT)
val dotTransportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DOT_TRANSPORT_TOOL)
.map(bean => DataParser.toDotTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(dotTransportToolStreamDF, TableMapping.DOT_TRANSPORT_TOOL)
val driverStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DRIVER)
.map(bean => DataParser.toDriver(bean))
.filter( pojo => null != pojo)
.toDF()
save(driverStreamDF, TableMapping.DRIVER)
val empStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EMP)
.map(bean => DataParser.toEmp(bean))
.filter( pojo => null != pojo)
.toDF()
save(empStreamDF, TableMapping.EMP)
val empInfoMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EMP_INFO_MAP)
.map(bean => DataParser.toEmpInfoMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(empInfoMapStreamDF, TableMapping.EMP_INFO_MAP)
val expressBillStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EXPRESS_BILL)
.map(bean => DataParser.toExpressBill(bean))
.filter( pojo => null != pojo)
.toDF()
save(expressBillStreamDF, TableMapping.EXPRESS_BILL)
val expressPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EXPRESS_PACKAGE)
.map(bean => DataParser.toExpressPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(expressPackageStreamDF, TableMapping.EXPRESS_PACKAGE)
val fixedAreaStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.FIXED_AREA)
.map(bean => DataParser.toFixedArea(bean))
.filter( pojo => null != pojo)
.toDF()
save(fixedAreaStreamDF, TableMapping.FIXED_AREA)
val goodsRackStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.GOODS_RACK)
.map(bean => DataParser.toGoodsRack(bean))
.filter( pojo => null != pojo)
.toDF()
save(goodsRackStreamDF, TableMapping.GOODS_RACK)
val jobStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.JOB)
.map(bean => DataParser.toJob(bean))
.filter( pojo => null != pojo)
.toDF()
save(jobStreamDF, TableMapping.JOB)
val outWarehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE)
.map(bean => DataParser.toOutWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(outWarehouseStreamDF, TableMapping.OUT_WAREHOUSE)
val outWarehouseDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE_DETAIL)
.map(bean => DataParser.toOutWarehouseDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(outWarehouseDetailStreamDF, TableMapping.OUT_WAREHOUSE_DETAIL)
val pkgStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PKG)
.map(bean => DataParser.toPkg(bean))
.filter( pojo => null != pojo)
.toDF()
save(pkgStreamDF, TableMapping.PKG)
val postalStandardStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.POSTAL_STANDARD)
.map(bean => DataParser.toPostalStandard(bean))
.filter( pojo => null != pojo)
.toDF()
save(postalStandardStreamDF, TableMapping.POSTAL_STANDARD)
val pushWarehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE)
.map(bean => DataParser.toPushWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(pushWarehouseStreamDF, TableMapping.PUSH_WAREHOUSE)
val pushWarehouseDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE_DETAIL)
.map(bean => DataParser.toPushWarehouseDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(pushWarehouseDetailStreamDF, TableMapping.PUSH_WAREHOUSE_DETAIL)
val routeStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.ROUTE)
.map(bean => DataParser.toRoute(bean))
.filter( pojo => null != pojo)
.toDF()
save(routeStreamDF, TableMapping.ROUTE)
val serviceEvaluationStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.SERVICE_EVALUATION)
.map(bean => DataParser.toServiceEvaluation(bean))
.filter( pojo => null != pojo)
.toDF()
save(serviceEvaluationStreamDF, TableMapping.SERVICE_EVALUATION)
val storeGridStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.STORE_GRID)
.map(bean => DataParser.toStoreGrid(bean))
.filter( pojo => null != pojo)
.toDF()
save(storeGridStreamDF, TableMapping.STORE_GRID)
val transportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.TRANSPORT_TOOL)
.map(bean => DataParser.toTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(transportToolStreamDF, TableMapping.TRANSPORT_TOOL)
val vehicleMonitorStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.VEHICLE_MONITOR)
.map(bean => DataParser.toVehicleMonitor(bean))
.filter( pojo => null != pojo)
.toDF()
save(vehicleMonitorStreamDF, TableMapping.VEHICLE_MONITOR)
val warehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE)
.map(bean => DataParser.toWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseStreamDF, TableMapping.WAREHOUSE)
val warehouseEmpStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_EMP)
.map(bean => DataParser.toWarehouseEmp(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseEmpStreamDF, TableMapping.WAREHOUSE_EMP)
val warehouseRackMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RACK_MAP)
.map(bean => DataParser.toWarehouseRackMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseRackMapStreamDF, TableMapping.WAREHOUSE_RACK_MAP)
val warehouseReceiptStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT)
.map(bean => DataParser.toWarehouseReceipt(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseReceiptStreamDF, TableMapping.WAREHOUSE_RECEIPT)
val warehouseReceiptDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT_DETAIL)
.map(bean => DataParser.toWarehouseReceiptDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseReceiptDetailStreamDF, TableMapping.WAREHOUSE_RECEIPT_DETAIL)
val warehouseTransportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_TRANSPORT_TOOL)
.map(bean => DataParser.toWarehouseTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseTransportToolStreamDF, TableMapping.WAREHOUSE_TRANSPORT_TOOL)
val warehouseVehicleMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_VEHICLE_MAP)
.map(bean => DataParser.toWarehouseVehicleMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseVehicleMapStreamDF, TableMapping.WAREHOUSE_VEHICLE_MAP)
val waybillStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAY_BILL)
.map(bean => DataParser.toWaybill(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillStreamDF, TableMapping.WAY_BILL)
val waybillStateRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAYBILL_STATE_RECORD)
.map(bean => DataParser.toWaybillStateRecord(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillStateRecordStreamDF, TableMapping.WAYBILL_STATE_RECORD)
val workTimeStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WORK_TIME)
.map(bean => DataParser.toWorkTime(bean))
.filter( pojo => null != pojo)
.toDF()
save(workTimeStreamDF, TableMapping.WORK_TIME)
val transportRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.TRANSPORT_RECORD)
.map(bean => DataParser.toTransportRecordBean(bean))
.filter( pojo => null != pojo)
.toDF()
save(transportRecordStreamDF, TableMapping.TRANSPORT_RECORD)
}
/**
* 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlCrm(streamDF: DataFrame): Unit = {
// 将DataFrame转换为Dataset
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
/*
以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
*/
val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
// step1、获取表名称,进行判断,确定封装具体POJO实体类
//.filter($"table" === TableMapping.ADDRESS)
.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
// step2、提取数据字段和数据操作类型,封装到POJO对象中
.flatMap(bean => DataParser.toAddressBean(bean))
// step3、过滤为解析封装完成对象,null值过滤掉
.filter(pojo => null != pojo)
save(addressPojoStreamDS.toDF(), "crm-console")
// Customer 表数据
val customerStreamDS: Dataset[CustomerBean] = canalBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CUSTOMER)
.map(bean => DataParser.toCustomer(bean))
.filter( pojo => null != pojo)
save(customerStreamDS.toDF(), TableMapping.CUSTOMER)
// ConsumerAddressMap 表数据
val consumerAddressMapStreamDS: Dataset[ConsumerAddressMapBean] = canalBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CONSUMER_ADDRESS_MAP)
.map(bean => DataParser.toConsumerAddressMap(bean))
.filter( pojo => null != pojo)
save(consumerAddressMapStreamDS.toDF(), TableMapping.CONSUMER_ADDRESS_MAP)
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(crmDF, "crm")
// step4. 保存转换后数据到外部存储
//save(etlLogisticsDF, "logistics-console")
//save(etlCrmDF, "crm-console")
/*
TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
*/
etlLogistics(etlLogisticsDF)
etlCrm(etlCrmDF)
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
运行流式计算程序,分别对MySQL数据库和Oracle数据库表的数据进行插入、更新或删除操作,勘查看控制台是否有数据。
- MySQL数据库,任意一张表进行更新和删除
- Oracle数据库,任意一张表进行更新和删除
06-[掌握]-实时ETL开发之保存Kudu表【save方法】
任务:需要将最终ETL获取POJO对象数据,保存到Kudu表中,实现
KuduStreamApp
对象中:save
方法。
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
if(isAutoCreateTable) {
KuduTools.createKuduTable(tableName, streamDF.schema)
}
// 2. 保存数据到Kudu表中
streamDF.writeStream
.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}")
.outputMode(OutputMode.Append())
.format("kudu")
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.operation", "upsert")
.start()
}
保存数据之前,需要考虑,表是否存在,如果表不存在,并且允许创建表,则先将表创建
07-[理解]-实时ETL开发之保存Kudu表【KuduTools】
任务:==依据数据集DataFrame,在Kudu数据库中创建表==,实现KuduTools方法:
createKuduTable
实现步骤:
step1. 获取Kudu的上下文对象:KuduContext
step2. 判断Kudu中是否存在这张表,如果不存在则创建
step3. 生成Kudu表的结构信息
step4. 设置表的分区策略和副本数目
step5. 创建表
完成代码如下所示:
package cn.itcast.logistics.common
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}
/**
* Kudu操作的工具类:创建表及其他操作
*/
object KuduTools {
/**
实现步骤:
step1. 获取Kudu的上下文对象:KuduContext
step2. 判断Kudu中是否存在这张表,如果不存在则创建
step3. 生成Kudu表的结构信息
step4. 设置表的分区策略和副本数目
step5. 创建表
*/
def createKuduTable(tableName: String, dataframe: DataFrame, keys: Seq[String] = Seq("id")): Unit = {
// Spark与Kudu集成时,构建KuduContext上下文对象
val kuduContext = new KuduContext(Configuration.KUDU_RPC_ADDRESS, dataframe.sparkSession.sparkContext)
// step1. 判断表是否存在,存在不创建
if(kuduContext.tableExists(tableName)){
println(s"在Kudu中[${tableName}]已经存在,无需创建......................")
return
}
// step2. 创建Kudu表
// a. Schema信息,Kudu表中主键不能为null,设置为false
val schema: StructType = StructType(
dataframe.schema.fields.map{field =>
StructField(
field.name,
field.dataType,
nullable = if(keys.contains(field.name)) false else true
)
}
)
// b. 创建表选项设置
val options: CreateTableOptions = new CreateTableOptions()
// 设置分区策略
import scala.collection.JavaConverters._
options.addHashPartitions(keys.asJava, 3)
// 设置副本数
options.setNumReplicas(1)
/*
def createTable(
tableName: String,
schema: StructType,
keys: Seq[String],
options: CreateTableOptions
): KuduTable
*/
val kuduTable = kuduContext.createTable(tableName, schema, keys, options)
println(s"Kudu中表【${tableName}: ${kuduTable.getTableId}】已经创建完成..................")
}
}
具体代码实现如下,其中细节如下:
1)、KuduContext构建,需要2个参数:KuduMasterRPCAddress和SparkContext
- 可以从DataFrame中获取SparkSession实例,再次获取SparkContext对象
2)、Kudu表创建时,
Schema
约束要求主键字段不能为空,所以不能直接使用DataFrame.schema
3)、分区策略时,设置字段必须是主键或者主键字段
Scala集合对象转换为Java集合对象
import scala.collection.JavaConverters._
- 4)、直接调用KuduContext中方法创建表即可,先判断表不存在,再创建
08-[掌握]-实时ETL开发之保存Kudu表【CRM数据测试】
任务:==针对CRM系统数据进行测试,首先启动MySQL数据库和Canal服务,再启动Kafk和Kudu服务,最后运行流式计算程序,对CRM系统中表数据进行更新或删除时,数据是否同步到Kudu表中。==
此处进行测试CRM系统数据,所以可以注释掉物流系统代码:
//etlLogistics(etlLogisticsDF)
- 1)、插入数据
-- 插入数据INSERT
INSERT INTO `crm_address` VALUES ('10001', '葛秋红', null, '17*******47', '恒大影城南侧小金庄', '130903', null, '2020-02-02 18:51:39', '2020-02-02 18:51:39', null);
- 2)、更新数据
- 3)、删除数据,在数据库中删除10002数据,但是Kudu表没有删除数据
为什么呢???由于保存数据到Kudu表时,所有数据:
kudu.operation=upsert
,表示插入和更新
在保存数据到Kudu表时,应该依据DataFrame数据集中字段:
opType
,获取数据操作类型,进行保存
数据
- 1)、
opType = insert或update
时,保存数据kudu.operation=upsert
- 2)、
opType = delete
时,保存数据kudu.operation=delete
- 依据数据的
主键
,将Kudu表的数据删除
09-[掌握]-实时ETL开发之保存Kudu表【opType优化】
任务:优化代码,结合每条数据中字段
opType
,真正确定每条数据如何保存数据Kudu表中。
- 如果是INSERT和UPDATE,将数据插入更新Kudu表;
- 如果是DELETE,获取主键,删除Kudu表的数据
接下来,依据每条数==据opType划分数据集,如果是insert和update,operation就是upser;如果是delete,operation就是delete。==
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
// 导入隐式转换
import streamDF.sparkSession.implicits._
// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
if(isAutoCreateTable) {
KuduTools.createKuduTable(tableName, streamDF.drop($"opType"))
}
// 2. 保存数据到Kudu表中
// TODO: opType = insert 或update时,保存为:upsert
streamDF
.filter($"opType".equalTo("insert") || $"opType".equalTo("update"))
// 删除opType列数据
.drop($"opType")
.writeStream
.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}-upsert")
.outputMode(OutputMode.Append())
.format("kudu")
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.operation", "upsert")
.start()
// TODO: opType = delete时,保存为:delete
streamDF
.filter($"opType".equalTo("delete"))
.select($"id")
.writeStream
.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}-delete")
.outputMode(OutputMode.Append())
.format("kudu")
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.operation", "delete")
.start()
}
继续测试数据,发现当删除MySQL表数据时,Kudu数据也进行删除
10-[掌握]-实时ETL开发之保存Kudu表【实时数据测试】
任务:==运行数据模拟生成器程序,实时产出CRM系统和物流系统业务数据,插入到表中,流式计算程序实时从Kafka消费数据,进行ETL转换,最终保存到Kudu表中。==
- 1)、第一步、启动MySQL数据库和CanalServer服务
node1.itcast.cn
启动容器:mysql
和canal-server
,再查看服务是否启动和是否重启
- 2)、第二步、启动Zookeeper和Kafka服务
使用CM界面启动,建议删除Kafka中
Topic:crm
,再重新创建
- 3)、第三步、启动Kudu服务
使用KuduPlus连接KuduMaster,删除启动CRM系统相关表
- 4)、第四步、运行编写流式计算程序,主要需要消费crm系统业务数据,删除检查点目录
启动应用,截图如下所示:
- 5)、第五步、运行CRM系统业务数据模拟生成器程序:
MockCrmDataApp
使用KuduPlus查看Kudu表中数据是否同步过来
备注说明:以同样方式和步骤,将物流系统业务数据同步到Kudu表中,以便后续进行离线报表分析和即席查询查询。
11-[掌握]-实时ETL开发之UDF函数使用
任务:==在实时ETL开发中,对JSON数据转换,可以使用UDF函数进行==,在实际项目中使用SparkSQL分析数据,要么使用官方提供SQL函数,要么自定义UDF函数使用。
- 需求:将JSON字符串转换为MessageBean代码,使用map函数操作,转换为UDF函数使用
回顾复习:在SparkSQL如何自定义UDF函数,有2种方式
- 方式一、UDF函数在
SQL
中使用
spark.udf.register(
"", // 函数名称
() => {} // 匿名函数
)
- 方式二、UDF函数在
DSL
中使用
import org.apache.spark.sql.functions.udf
val xx_udf = udf(
() => {} // 匿名函数
)
==由于物流项目采用DSL编程,所以使用方式二定义UDF函数==
import org.apache.spark.sql.functions.{col, trim, length, udf}
// TODO: 自定义UDF函数
val json_to_bean = udf(
(json: String) => {
JSON.parseObject(json, classOf[OggMessageBean])
}
)
修改process中对物流业务数据转换代码如下:
- 点赞
- 收藏
- 关注作者
评论(0)