大数据物流项目:实时增量ETL存储Kudu代码开发(九点五)

举报
Maynor学长 发表于 2022/06/29 20:25:31 2022/06/29
【摘要】 05-[掌握]-实时ETL开发之Bean转换POJO【编程测试】任务:==首先将物流系统和CRM系统中其他表的数据过滤出来,提取数据字段值,封装到POJO对象,保存外部存储。==package cn.itcast.logistics.etl.realtimeimport cn.itcast.logistics.common.BeanImplicits._import cn.itcast.l...

05-[掌握]-实时ETL开发之Bean转换POJO【编程测试】

任务:==首先将物流系统和CRM系统中其他表的数据过滤出来,提取数据字段值,封装到POJO对象,保存外部存储。==

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser._
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */
object KuduStreamApp extends BasicStreamApp {
	
	/**
	 * 数据的处理,仅仅实现JSON -> MessageBean
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
	 * @return 流式数据集StreamingDataFrame
	 */
	override def process(streamDF: DataFrame, category: String): DataFrame = {
		// 导入隐式转换
		import streamDF.sparkSession.implicits._
		
		val etlStreamDF: DataFrame = category match {
			// TODO: 物流系统业务数据,OGG采集数据
			case "logistics" =>
				val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
					// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
					.as[String]
					// 过滤数据
					.filter(msg => null != msg && msg.trim.length > 0)
					// 解析每条数据
					.map{
						msg => JSON.parseObject(msg, classOf[OggMessageBean])
					}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
				
				// 返回转换后的数据
				oggBeanStreamDS.toDF()
			// TODO: CRM系统业务数据
			case "crm" =>
				val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
					// 过滤数据
					.filter(row => !row.isNullAt(0))
					// 解析数据,对分区数据操作
					.mapPartitions { iter =>
						iter.map { row =>
							val jsonValue: String = row.getAs[String]("value")
							// 解析JSON字符串
							JSON.parseObject(jsonValue, classOf[CanalMessageBean])
						}
					}
			
				// 返回转换后的数据
				canalBeanStreamDS.toDF()
				
			// TODO: 其他业务系统数据
			case _ => streamDF
		}
		// 返回ETL转换后的数据
		etlStreamDF
	}
	
	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
		streamDF.writeStream
			.queryName(s"query-${tableName}")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "100")
			.option("truncate", "false")
			.start()
	}
	
	/**
	 * 物流Logistics系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlLogistics(streamDF: DataFrame): Unit = {
		
		// 转换DataFrame为Dataset
		val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
		
		/*
			针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
		*/
		val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
			// 1)、依据table字段判断数据:tbl_areas
			.filter(bean => bean.getTable.equals(TableMapping.AREAS))
			// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
			.map(bean => DataParser.toAreaBean(bean))
			// 3)、过滤掉转换为null数据
			.filter(pojo => null != pojo)
		save(areaPojoStreamDS.toDF(), TableMapping.AREAS)
		
		val warehouseSendVehicleStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_SEND_VEHICLE)
			.map(bean => DataParser.toWarehouseSendVehicle(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseSendVehicleStreamDF, TableMapping.WAREHOUSE_SEND_VEHICLE)
		
		val waybillLineStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAYBILL_LINE)
			.map(bean => DataParser.toWaybillLine(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillLineStreamDF, TableMapping.WAYBILL_LINE)
		
		val chargeStandardStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CHARGE_STANDARD)
			.map(bean => DataParser.toChargeStandard(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(chargeStandardStreamDF, TableMapping.CHARGE_STANDARD)
		
		val codesStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CODES)
			.map(bean => DataParser.toCodes(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(codesStreamDF, TableMapping.CODES)
		
		val collectPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COLLECT_PACKAGE)
			.map(bean => DataParser.toCollectPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(collectPackageStreamDF, TableMapping.COLLECT_PACKAGE)
		
		val companyStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY)
			.map(bean => DataParser.toCompany(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyStreamDF, TableMapping.COMPANY)
		
		val companyDotMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_DOT_MAP)
			.map(bean => DataParser.toCompanyDotMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyDotMapStreamDF, TableMapping.COMPANY_DOT_MAP)
		
		val companyTransportRouteMaStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
			.map(bean => DataParser.toCompanyTransportRouteMa(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyTransportRouteMaStreamDF, TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
		
		val companyWarehouseMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_WAREHOUSE_MAP)
			.map(bean => DataParser.toCompanyWarehouseMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyWarehouseMapStreamDF, TableMapping.COMPANY_WAREHOUSE_MAP)
		
		val consumerSenderInfoStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CONSUMER_SENDER_INFO)
			.map(bean => DataParser.toConsumerSenderInfo(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(consumerSenderInfoStreamDF, TableMapping.CONSUMER_SENDER_INFO)
		
		val courierStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COURIER)
			.map(bean => DataParser.toCourier(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(courierStreamDF, TableMapping.COURIER)
		
		val deliverPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVER_PACKAGE)
			.map(bean => DataParser.toDeliverPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliverPackageStreamDF, TableMapping.DELIVER_PACKAGE)
		
		val deliverRegionStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVER_REGION)
			.map(bean => DataParser.toDeliverRegion(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliverRegionStreamDF, TableMapping.DELIVER_REGION)
		
		val deliveryRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVERY_RECORD)
			.map(bean => DataParser.toDeliveryRecord(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliveryRecordStreamDF, TableMapping.DELIVERY_RECORD)
		
		val departmentStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DEPARTMENT)
			.map(bean => DataParser.toDepartment(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(departmentStreamDF, TableMapping.DEPARTMENT)
		
		val dotStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DOT)
			.map(bean => DataParser.toDot(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(dotStreamDF, TableMapping.DOT)
		
		val dotTransportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DOT_TRANSPORT_TOOL)
			.map(bean => DataParser.toDotTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(dotTransportToolStreamDF, TableMapping.DOT_TRANSPORT_TOOL)
		
		val driverStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DRIVER)
			.map(bean => DataParser.toDriver(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(driverStreamDF, TableMapping.DRIVER)
		
		val empStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EMP)
			.map(bean => DataParser.toEmp(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(empStreamDF, TableMapping.EMP)
		
		val empInfoMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EMP_INFO_MAP)
			.map(bean => DataParser.toEmpInfoMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(empInfoMapStreamDF, TableMapping.EMP_INFO_MAP)
		
		val expressBillStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EXPRESS_BILL)
			.map(bean => DataParser.toExpressBill(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(expressBillStreamDF, TableMapping.EXPRESS_BILL)
		
		val expressPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EXPRESS_PACKAGE)
			.map(bean => DataParser.toExpressPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(expressPackageStreamDF, TableMapping.EXPRESS_PACKAGE)
		
		val fixedAreaStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.FIXED_AREA)
			.map(bean => DataParser.toFixedArea(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(fixedAreaStreamDF, TableMapping.FIXED_AREA)
		
		val goodsRackStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.GOODS_RACK)
			.map(bean => DataParser.toGoodsRack(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(goodsRackStreamDF, TableMapping.GOODS_RACK)
		
		val jobStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.JOB)
			.map(bean => DataParser.toJob(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(jobStreamDF, TableMapping.JOB)
		
		val outWarehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE)
			.map(bean => DataParser.toOutWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(outWarehouseStreamDF, TableMapping.OUT_WAREHOUSE)
		
		val outWarehouseDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE_DETAIL)
			.map(bean => DataParser.toOutWarehouseDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(outWarehouseDetailStreamDF, TableMapping.OUT_WAREHOUSE_DETAIL)
		
		val pkgStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PKG)
			.map(bean => DataParser.toPkg(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pkgStreamDF, TableMapping.PKG)
		
		val postalStandardStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.POSTAL_STANDARD)
			.map(bean => DataParser.toPostalStandard(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(postalStandardStreamDF, TableMapping.POSTAL_STANDARD)
		
		val pushWarehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE)
			.map(bean => DataParser.toPushWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pushWarehouseStreamDF, TableMapping.PUSH_WAREHOUSE)
		
		val pushWarehouseDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE_DETAIL)
			.map(bean => DataParser.toPushWarehouseDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pushWarehouseDetailStreamDF, TableMapping.PUSH_WAREHOUSE_DETAIL)
		
		val routeStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.ROUTE)
			.map(bean => DataParser.toRoute(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(routeStreamDF, TableMapping.ROUTE)
		
		val serviceEvaluationStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.SERVICE_EVALUATION)
			.map(bean => DataParser.toServiceEvaluation(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(serviceEvaluationStreamDF, TableMapping.SERVICE_EVALUATION)
		
		val storeGridStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.STORE_GRID)
			.map(bean => DataParser.toStoreGrid(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(storeGridStreamDF, TableMapping.STORE_GRID)
		
		val transportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.TRANSPORT_TOOL)
			.map(bean => DataParser.toTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(transportToolStreamDF, TableMapping.TRANSPORT_TOOL)
		
		val vehicleMonitorStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.VEHICLE_MONITOR)
			.map(bean => DataParser.toVehicleMonitor(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(vehicleMonitorStreamDF, TableMapping.VEHICLE_MONITOR)
		
		val warehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE)
			.map(bean => DataParser.toWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseStreamDF, TableMapping.WAREHOUSE)
		
		val warehouseEmpStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_EMP)
			.map(bean => DataParser.toWarehouseEmp(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseEmpStreamDF, TableMapping.WAREHOUSE_EMP)
		
		val warehouseRackMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RACK_MAP)
			.map(bean => DataParser.toWarehouseRackMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseRackMapStreamDF, TableMapping.WAREHOUSE_RACK_MAP)
		
		val warehouseReceiptStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT)
			.map(bean => DataParser.toWarehouseReceipt(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseReceiptStreamDF, TableMapping.WAREHOUSE_RECEIPT)
		
		val warehouseReceiptDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT_DETAIL)
			.map(bean => DataParser.toWarehouseReceiptDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseReceiptDetailStreamDF, TableMapping.WAREHOUSE_RECEIPT_DETAIL)
		
		val warehouseTransportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_TRANSPORT_TOOL)
			.map(bean => DataParser.toWarehouseTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseTransportToolStreamDF, TableMapping.WAREHOUSE_TRANSPORT_TOOL)
		
		val warehouseVehicleMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_VEHICLE_MAP)
			.map(bean => DataParser.toWarehouseVehicleMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseVehicleMapStreamDF, TableMapping.WAREHOUSE_VEHICLE_MAP)
		
		val waybillStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAY_BILL)
			.map(bean => DataParser.toWaybill(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillStreamDF, TableMapping.WAY_BILL)
		
		val waybillStateRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAYBILL_STATE_RECORD)
			.map(bean => DataParser.toWaybillStateRecord(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillStateRecordStreamDF, TableMapping.WAYBILL_STATE_RECORD)
		
		val workTimeStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WORK_TIME)
			.map(bean => DataParser.toWorkTime(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(workTimeStreamDF, TableMapping.WORK_TIME)
		
		val transportRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.TRANSPORT_RECORD)
			.map(bean => DataParser.toTransportRecordBean(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(transportRecordStreamDF, TableMapping.TRANSPORT_RECORD)
		
	}
	
	/**
	 * 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlCrm(streamDF: DataFrame): Unit = {
		
		// 将DataFrame转换为Dataset
		val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
		/*
			以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
		 */
		val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
			// step1、获取表名称,进行判断,确定封装具体POJO实体类
			//.filter($"table" === TableMapping.ADDRESS)
			.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
			// step2、提取数据字段和数据操作类型,封装到POJO对象中
			.flatMap(bean => DataParser.toAddressBean(bean))
			// step3、过滤为解析封装完成对象,null值过滤掉
			.filter(pojo => null != pojo)
		save(addressPojoStreamDS.toDF(), "crm-console")
		
		// Customer 表数据
		val customerStreamDS: Dataset[CustomerBean] = canalBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CUSTOMER)
			.map(bean => DataParser.toCustomer(bean))
			.filter( pojo => null != pojo)
		save(customerStreamDS.toDF(), TableMapping.CUSTOMER)
		// ConsumerAddressMap 表数据
		val consumerAddressMapStreamDS: Dataset[ConsumerAddressMapBean] = canalBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CONSUMER_ADDRESS_MAP)
			.map(bean => DataParser.toConsumerAddressMap(bean))
			.filter( pojo => null != pojo)
		save(consumerAddressMapStreamDS.toDF(), TableMapping.CONSUMER_ADDRESS_MAP)
	}
	
	/*
			实时Kudu ETL应用程序入口,数据处理逻辑步骤:
				step1. 创建SparkSession实例对象,传递SparkConf
				step2. 从Kafka数据源实时消费数据
				step3. 对获取Json数据进行ETL转换
				step4. 保存转换后数据到外部存储
				step5. 应用启动以后,等待终止结束
			*/
	def main(args: Array[String]): Unit = {
		// step1. 创建SparkSession实例对象,传递SparkConf
		val spark: SparkSession = SparkUtils.createSparkSession(
			SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
		)
		
		// step2. 从Kafka数据源实时消费数据
		// 物流系统Topic数据
		val logisticsDF: DataFrame = load(spark, "logistics")
		val crmDF: DataFrame = load(spark, "crm")
		
		// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
		val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
		val etlCrmDF: DataFrame = process(crmDF, "crm")
		
		// step4. 保存转换后数据到外部存储
		//save(etlLogisticsDF, "logistics-console")
		//save(etlCrmDF, "crm-console")
		
		/*
			TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
		 */
		etlLogistics(etlLogisticsDF)
		etlCrm(etlCrmDF)
		
		// step5. 应用启动以后,等待终止结束
		spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
		spark.streams.awaitAnyTermination()
	}
}

​ 运行流式计算程序,分别对MySQL数据库和Oracle数据库表的数据进行插入、更新或删除操作,勘查看控制台是否有数据。

  • MySQL数据库,任意一张表进行更新和删除

1616138400590

  • Oracle数据库,任意一张表进行更新和删除

1616138388697

06-[掌握]-实时ETL开发之保存Kudu表【save方法】

任务:需要将最终ETL获取POJO对象数据,保存到Kudu表中,实现KuduStreamApp对象中:save方法。

	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
		// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
		if(isAutoCreateTable) {
			KuduTools.createKuduTable(tableName, streamDF.schema)
		}
		
		// 2. 保存数据到Kudu表中
		streamDF.writeStream
    			.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}")
    			.outputMode(OutputMode.Append())
    			.format("kudu")
				.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
				.option("kudu.table", tableName)
				.option("kudu.operation", "upsert")
    			.start()
	}

保存数据之前,需要考虑,表是否存在,如果表不存在,并且允许创建表,则先将表创建

07-[理解]-实时ETL开发之保存Kudu表【KuduTools】

任务:==依据数据集DataFrame,在Kudu数据库中创建表==,实现KuduTools方法:createKuduTable

实现步骤:
    step1. 获取Kudu的上下文对象:KuduContext
    step2. 判断Kudu中是否存在这张表,如果不存在则创建
    step3. 生成Kudu表的结构信息
    step4. 设置表的分区策略和副本数目
    step5. 创建表

完成代码如下所示:

package cn.itcast.logistics.common

import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}

/**
 * Kudu操作的工具类:创建表及其他操作
 */
object KuduTools {
	
	/**
	实现步骤:
	    step1. 获取Kudu的上下文对象:KuduContext
	    step2. 判断Kudu中是否存在这张表,如果不存在则创建
	    step3. 生成Kudu表的结构信息
	    step4. 设置表的分区策略和副本数目
	    step5. 创建表
	 */
	def createKuduTable(tableName: String, dataframe: DataFrame, keys: Seq[String] = Seq("id")): Unit = {
		
		// Spark与Kudu集成时,构建KuduContext上下文对象
		val kuduContext = new KuduContext(Configuration.KUDU_RPC_ADDRESS, dataframe.sparkSession.sparkContext)
		
		// step1. 判断表是否存在,存在不创建
		if(kuduContext.tableExists(tableName)){
			println(s"在Kudu中[${tableName}]已经存在,无需创建......................")
			return
		}
		
		// step2. 创建Kudu表
		// a. Schema信息,Kudu表中主键不能为null,设置为false
		val schema: StructType = StructType(
			dataframe.schema.fields.map{field =>
				StructField(
					field.name,
					field.dataType,
					nullable = if(keys.contains(field.name)) false else true
				)
			}
		)
		
		// b. 创建表选项设置
		val options: CreateTableOptions = new CreateTableOptions()
		// 设置分区策略
		import scala.collection.JavaConverters._
		options.addHashPartitions(keys.asJava, 3)
		// 设置副本数
		options.setNumReplicas(1)
		/*
		def createTable(
	      tableName: String,
	      schema: StructType,
	      keys: Seq[String],
	      options: CreateTableOptions
	    ): KuduTable
		 */
		val kuduTable = kuduContext.createTable(tableName, schema, keys, options)
		println(s"Kudu中表【${tableName}: ${kuduTable.getTableId}】已经创建完成..................")
	}
	
}

具体代码实现如下,其中细节如下:

  • 1)、KuduContext构建,需要2个参数:KuduMasterRPCAddress和SparkContext

    • 可以从DataFrame中获取SparkSession实例,再次获取SparkContext对象
  • 2)、Kudu表创建时,Schema约束要求主键字段不能为空,所以不能直接使用DataFrame.schema

  • 3)、分区策略时,设置字段必须是主键或者主键字段

  • Scala集合对象转换为Java集合对象

import scala.collection.JavaConverters._
  • 4)、直接调用KuduContext中方法创建表即可,先判断表不存在,再创建

08-[掌握]-实时ETL开发之保存Kudu表【CRM数据测试】

​ 任务:==针对CRM系统数据进行测试,首先启动MySQL数据库和Canal服务,再启动Kafk和Kudu服务,最后运行流式计算程序,对CRM系统中表数据进行更新或删除时,数据是否同步到Kudu表中。==

1616141173650

此处进行测试CRM系统数据,所以可以注释掉物流系统代码://etlLogistics(etlLogisticsDF)

  • 1)、插入数据
-- 插入数据INSERT
INSERT INTO `crm_address` VALUES ('10001', '葛秋红', null, '17*******47', '恒大影城南侧小金庄', '130903', null, '2020-02-02 18:51:39', '2020-02-02 18:51:39', null);

1616141686078

  • 2)、更新数据

1616141781597

  • 3)、删除数据,在数据库中删除10002数据,但是Kudu表没有删除数据

1616141827536

为什么呢???由于保存数据到Kudu表时,所有数据:kudu.operation=upsert,表示插入和更新

1616141913877

在保存数据到Kudu表时,应该依据DataFrame数据集中字段:opType,获取数据操作类型,进行保存数据

  • 1)、opType = insert或update时,保存数据kudu.operation=upsert
  • 2)、opType = delete时,保存数据kudu.operation=delete
    • 依据数据的主键,将Kudu表的数据删除

09-[掌握]-实时ETL开发之保存Kudu表【opType优化】

任务:优化代码,结合每条数据中字段opType,真正确定每条数据如何保存数据Kudu表中。

  • 如果是INSERT和UPDATE,将数据插入更新Kudu表;
  • 如果是DELETE,获取主键,删除Kudu表的数据

此外,在创建Kudu表时,需要将opType字段进行删除,表不包含此字段

1616142320295

​ 接下来,依据每条数==据opType划分数据集,如果是insert和update,operation就是upser;如果是delete,operation就是delete。==

	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
		// 导入隐式转换
		import streamDF.sparkSession.implicits._
		
		// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
		if(isAutoCreateTable) {
			KuduTools.createKuduTable(tableName, streamDF.drop($"opType"))
		}
		
		// 2. 保存数据到Kudu表中
		// TODO: opType = insert 或update时,保存为:upsert
		streamDF
            .filter($"opType".equalTo("insert") || $"opType".equalTo("update"))
			// 删除opType列数据
			.drop($"opType")
			.writeStream
            .queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}-upsert")
            .outputMode(OutputMode.Append())
            .format("kudu")
			.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
			.option("kudu.table", tableName)
			.option("kudu.operation", "upsert")
            .start()
		
		// TODO: opType = delete时,保存为:delete
		streamDF
			.filter($"opType".equalTo("delete"))
    		.select($"id")
			.writeStream
			.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}-delete")
			.outputMode(OutputMode.Append())
			.format("kudu")
			.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
			.option("kudu.table", tableName)
			.option("kudu.operation", "delete")
			.start()
	}

继续测试数据,发现当删除MySQL表数据时,Kudu数据也进行删除

1616142859552

10-[掌握]-实时ETL开发之保存Kudu表【实时数据测试】

任务:==运行数据模拟生成器程序,实时产出CRM系统和物流系统业务数据,插入到表中,流式计算程序实时从Kafka消费数据,进行ETL转换,最终保存到Kudu表中。==

此处,以CRM系统为例,进行演示,实时产生业务数据,实时ETL转换,保存Kudu表。

  • 1)、第一步、启动MySQL数据库和CanalServer服务

node1.itcast.cn启动容器:mysqlcanal-server,再查看服务是否启动和是否重启

  • 2)、第二步、启动Zookeeper和Kafka服务

使用CM界面启动,建议删除Kafka中Topic:crm,再重新创建

  • 3)、第三步、启动Kudu服务

使用KuduPlus连接KuduMaster,删除启动CRM系统相关表

  • 4)、第四步、运行编写流式计算程序,主要需要消费crm系统业务数据,删除检查点目录

启动应用,截图如下所示:

1616144537227

  • 5)、第五步、运行CRM系统业务数据模拟生成器程序:MockCrmDataApp

1616144589107

使用KuduPlus查看Kudu表中数据是否同步过来

1616144639091

备注说明:以同样方式和步骤,将物流系统业务数据同步到Kudu表中,以便后续进行离线报表分析和即席查询查询。

11-[掌握]-实时ETL开发之UDF函数使用

任务:==在实时ETL开发中,对JSON数据转换,可以使用UDF函数进行==,在实际项目中使用SparkSQL分析数据,要么使用官方提供SQL函数,要么自定义UDF函数使用。

1616145091910

回顾复习:在SparkSQL如何自定义UDF函数,有2种方式

  • 方式一、UDF函数在SQL中使用
spark.udf.register(
	"", // 函数名称
    () => {}  // 匿名函数
)
  • 方式二、UDF函数在DSL中使用
import org.apache.spark.sql.functions.udf
val xx_udf = udf(
	() => {} // 匿名函数
)

==由于物流项目采用DSL编程,所以使用方式二定义UDF函数==

		import org.apache.spark.sql.functions.{col, trim, length, udf}
		
		// TODO: 自定义UDF函数
		val json_to_bean = udf(
			(json: String) => {
				JSON.parseObject(json, classOf[OggMessageBean])
			}
		)

修改process中对物流业务数据转换代码如下:

1616145774098

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。