大数据物流项目:主题及报表开发(十二点五)

举报
Maynor学长 发表于 2022/06/29 20:41:11 2022/06/29
【摘要】 05-[掌握]-主题及指标开发之重构公共接口【编程】任务:前面已经定义好公共接口方法声明,实现基本方法代码。package cn.itcast.logistics.offlineimport cn.itcast.logistics.common.{Configuration, KuduTools, SparkUtils}import org.apache.spark.SparkConfim...

05-[掌握]-主题及指标开发之重构公共接口【编程】

任务:前面已经定义好公共接口方法声明,实现基本方法代码。

package cn.itcast.logistics.offline

import cn.itcast.logistics.common.{Configuration, KuduTools, SparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, current_date, date_format, date_sub}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * 离线报表分析公共接口,采用模板方法设计模式构建:基本方法和模板方法
 */
trait AbstractOfflineApp {

	// 定义变量
	private var spark: SparkSession = _
	
	// 实例化spark对象
	def init(clazz: Class[_]): Unit = {
		// a. 构建SparkConf对象,基本设置
		var sparkConf: SparkConf = SparkUtils.sparkConf()
		// b. 设置运行方式
		sparkConf = SparkUtils.autoSettingEnv(sparkConf)
		// c. 构建SparkSession对象
		spark = SparkUtils.createSparkSession(sparkConf, this.getClass)
		spark.sparkContext.setLogLevel(Configuration.LOG_OFF)
	}
	
	// 从Kudu表加载数据
	def loadKuduSource(tableName: String, 
	                   isLoadFullData: Boolean = false): DataFrame = {
		// 加载Kudu表数据,不考虑全量还是增量
		var kuduDF: DataFrame = spark.read
			.format(Configuration.SPARK_KUDU_FORMAT)
			.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
			.option("kudu.table", tableName)
			.option("kudu.socketReadTimeoutMs", "60000")
			.load()
		
		// 如果是增量加载数据,表示加载昨日数据,需要过滤操作
		if(!isLoadFullData){
			kuduDF = kuduDF
				// 依据 每个表中字段:cdt = 2013-06-02 21:24:00,过滤数据
				.filter(
					date_sub(current_date(), 1) === date_format(col("cdt"), "yyyy-MM-dd")
				)
		}
		
		// 返回数据
		kuduDF
	}
	
	// 处理数据,要么是数据拉宽,要么是指标计算
	def process(dataframe: DataFrame): DataFrame
	
	// 保存数据到Kudu表
	def saveKuduSink(dataframe: DataFrame, tableName: String, 
	                 isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")) = {
		// 如果允许创建表,并且表不存在,就创建表
		if(isAutoCreateTable){
			KuduTools.createKuduTable(tableName, dataframe, keys)
		}
		
		// 保存数据到Kudu表
		dataframe.write
			.mode(SaveMode.Append)
			.format(Configuration.SPARK_KUDU_FORMAT)
			.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
			.option("kudu.table", tableName)
			.option("kudu.operation", "upsert")
			.save()
	}
	
	// 关闭会话实例对象
	def close(): Unit = {
		if(null != spark) spark.close()
	}
	
	// TODO: 定义模块方法,规定基本方法执行顺序
	def execute(clazz: Class[_],
	            srcTable: String, dstTable: String, 
	            isLoadFullData: Boolean = false,
	            isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")): Unit = {
		// step1. 初始化
		init(clazz)
		try{
			// step2. 加载Kudu表数据
			val kuduDF: DataFrame = loadKuduSource(srcTable, isLoadFullData)
			kuduDF.show(10, truncate = false)
		
			// step3. 处理数据
			val resultDF: DataFrame = process(kuduDF)
			resultDF.show(10, truncate = false)
			
			// step4. 保存数据
			saveKuduSink(resultDF, dstTable, isAutoCreateTable, keys)
		}catch {
			case e: Exception => e.printStackTrace()
		}finally {
			// step5. 关闭资源
			close()
		}
	}
	
}

在模板方法中调用基本方法(具体方法:有方法体和抽象方法:子类具体实现),定义调用步骤。

06-[掌握]-运单主题之数据拉宽开发

​ 任务:针对运单主题进行DWD层数据拉宽开发,创建对象WayBillDWD对象,继承公共接口AbstractOfflineApp,实现process方法即可。

  • 1)、将运单数据与相关维度数据,进行关联JION以后,字段如下:

1616381706248

当不知道获取那些字段时,最简单和粗暴方式:获取所有字段,不客气,不建议。

  • 2)、SQL语句

1616381787869

  • 3)、创建对象,编写代码,将事实表数据与维度表数据,进行拉宽操作

dwd目录下创建 WayBillDWD 单例对象,继承自AbstractOfflineApp特质

package cn.itcast.logistics.offline.dwd

import cn.itcast.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, TableMapping}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._

/**
 * 运单主题开发:
 *      将运单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到运单宽表中
 */
object WayBillDWD extends AbstractOfflineApp{
	
	/**
	 * 将事实表与维度表关联,进行拉宽操作
	 */
	override def process(dataframe: DataFrame): DataFrame = {
		// 导入隐式转换
		import  dataframe.sparkSession.implicits._
		
		// step1. 加载维度表数据
		// 加载快递员表
		val courierDF: DataFrame = loadKuduSource(TableMapping.COURIER, isLoadFullData = true)
		// 加载网点表
		val dotDF: DataFrame =  loadKuduSource(TableMapping.DOT, isLoadFullData = true)
		// 加载区域表
		val areasDF: DataFrame =  loadKuduSource(TableMapping.AREAS, isLoadFullData = true)
		// 加载转运记录表
		val recordDF: DataFrame =  loadKuduSource(TableMapping.TRANSPORT_RECORD, isLoadFullData = true)
		// 加载起始仓库表
		val startWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true)
		// 加载到达仓库表
		val endWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true)
		// 加载车辆表
		val toolDF: DataFrame = loadKuduSource(TableMapping.TRANSPORT_TOOL, isLoadFullData = true)
		// 加载线路表
		val routeDF: DataFrame = loadKuduSource(TableMapping.ROUTE, isLoadFullData = true)
		// 加载起始仓库关联表
		val startCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true)
		// 加载到达仓库关联表
		val endCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true)
		// 加载起始仓库所在公司表
		val startCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true)
		// 加载到达仓库所在公司表
		val endCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true)
		// 加载物流码表
		val codesDF: DataFrame = loadKuduSource(TableMapping.CODES, isLoadFullData = true)
		// 加载客户表
		val customerDF: DataFrame = loadKuduSource(TableMapping.CUSTOMER, isLoadFullData = true)
		
		// 下单渠道类型表
		val orderChannelTypeDF: DataFrame = codesDF
			.where(col("type") === CodeTypeMapping.ORDER_CHANNEL_TYPE)
			.select(
				col("code").as("orderChannelTypeCode"), col("codeDesc").as("orderChannelTypeName")
			)
		// 客户类型表
		val customerTypeDF: DataFrame = codesDF
			.where(col("type") === CodeTypeMapping.CUSTOM_TYPE)
			.select(
				col("code").as("customerTypeCode"), col("codeDesc").as("customerTypeName")
			)
		
		// step2. 将事实表与维度表关联
		val left_outer = "left_outer"
		val wayBillDF: DataFrame = dataframe
		val joinDF: DataFrame = wayBillDF
			// 运单表与快递员表进行关联
			.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer)
			// 网点表与快递员表进行关联
			.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer)
			// 网点表与区域表进行关联
			.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer)
			// 转运记录表与运单表关联
			.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer)
			// 起始仓库与转运记录表关联
			.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer)
			// 到达仓库与转运记录表关联
			.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer)
			// 转运记录表与交通工具表关联
			.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer)
			// 转运记录表与路线表关联
			.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer)
			// 起始仓库表与仓库公司关联表关联
			.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer)
			// 公司表与起始仓库公司关联表关联
			.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer)
			// 到达仓库表与仓库公司关联表关联
			.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer)
			// 公司表与到达仓库公司关联表关联
			.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer)
			// 运单表与客户表关联
			.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer)
			// 下单渠道表与运单表关联
			.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") ===  wayBillDF("orderChannelId"), left_outer)
			// 客户类型表与客户表关联
			.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer)
		
		// step3. 选择字段和添加日期day
		val wayBillDetailDF: Dataset[Row] = joinDF
			// 选择字段
			.select(
				wayBillDF("id"), //运单id
				wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
				wayBillDF("waybillNumber").as("waybill_number"), //运单编号
				wayBillDF("cid"), //客户id
				customerDF("name").as("cname"), //客户名称
				customerDF("type").as("ctype"), //客户类型
				customerTypeDF("customerTypeName").as("ctype_name"), //客户类型名称
				wayBillDF("eid"), //快递员id
				courierDF("name").as("ename"), //快递员名称
				dotDF("id").as("dot_id"), //网点id
				dotDF("dotName").as("dot_name"), //网点名称
				areasDF("id").as("area_id"), //区域id
				areasDF("name").as("area_name"), //区域名称
				wayBillDF("orderChannelId").as("order_channel_id"), //渠道id
				orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"), //渠道名称
				wayBillDF("orderDt").as("order_dt"), //下单时间
				wayBillDF("orderTerminalType").as("order_terminal_type"), //下单设备类型
				wayBillDF("orderTerminalOsType").as("order_terminal_os_type"), //下单设备操作系统类型
				wayBillDF("reserveDt").as("reserve_dt"), //预约取件时间
				wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"), //是否取件超时
				wayBillDF("pkgId").as("pkg_id"), //订装ID
				wayBillDF("pkgNumber").as("pkg_number"), //订装编号
				wayBillDF("timeoutDt").as("timeout_dt"), //超时时间
				wayBillDF("transformType").as("transform_type"), //运输方式
				wayBillDF("deliveryAddr").as("delivery_addr"),
				wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
				wayBillDF("deliveryMobile").as("delivery_mobile"),
				wayBillDF("deliveryTel").as("delivery_tel"),
				wayBillDF("receiveAddr").as("receive_addr"),
				wayBillDF("receiveCustomerName").as("receive_customer_name"),
				wayBillDF("receiveMobile").as("receive_mobile"),
				wayBillDF("receiveTel").as("receive_tel"),
				wayBillDF("cdt"),
				wayBillDF("udt"),
				wayBillDF("remark"),
				recordDF("swId").as("sw_id"),
				startWarehouseDF("name").as("sw_name"),
				startCompanyDF("id").as("sw_company_id"),
				startCompanyDF("companyName").as("sw_company_name"),
				recordDF("ewId").as("ew_id"),
				endWarehouseDF("name").as("ew_name"),
				endCompanyDF("id").as("ew_company_id"),
				endCompanyDF("companyName").as("ew_company_name"),
				toolDF("id").as("tt_id"),
				toolDF("licensePlate").as("tt_name"),
				recordDF("routeId").as("route_id"),
				concat(routeDF("startStation"), routeDF("endStation")).as("route_name")
			)
			// 添加字段,日期字段day增加日期列
			.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd"))
			// 根据运单表的创建时间顺序排序
			.sort(wayBillDF.col("cdt").asc)
		
		// 返回拉宽数据
		wayBillDetailDF
	}
	
	// SparkSQL程序入口
	def main(args: Array[String]): Unit = {
		// TODO: 调用父类中模板方法,传递相关参数即可
		execute(
			this.getClass, //
			TableMapping.WAY_BILL, // 事实表
			OfflineTableDefine.WAY_BILL_DETAIL, // 宽表
			isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
		)
	}
}

注意:在DWD层程序中,mian仅仅调度父类模板方法,传递参数即可。

1616382579014

07-[掌握]-运单主题之指标计算【MAIN 方法】

任务:从Kudu数据库中加载运单宽表数据,按照指标进行计算,创建WayBillDWS对象,继承公共接口。

  • 1)、指标字段,首先获取总的运单数及各个维度统计最大、最小和平均运单数目。

1616382689162

1616382707111

  • 2)、Spark 编程实现

dws目录下创建 WayBillDWS 单例对象,继承自AbstractOfflineApp特质

package cn.itcast.logistics.offline.dws

import cn.itcast.logistics.common.{Configuration, OfflineTableDefine}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
 * 运单主题指标开发:
 *      从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。
 */
object WayBillDWS extends AbstractOfflineApp{
	/**
	 * 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集
	 */
	override def process(dataframe: DataFrame): DataFrame = {
		// 导入隐式转换
		val session: SparkSession = dataframe.sparkSession
		import session.implicits._
		
		// a. 获取日期值day
		
		// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
		
		// c. 将指标结果转换DataFrame
		
		// 返回指标结果
		
	}
	
	def main(args: Array[String]): Unit = {
		// 调用父类中模板方法,传递参数
		execute(
			this.getClass, // 
			OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, //
			isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
		)
	}
}

08-[掌握]-运单主题之指标计算【process 方法】

任务:实现process方法,先进行指标计算,按照每天数据进行指标统计,示意图如下所示:

1616339707049

具体指标计算代码如下所示:

		// 导入隐式转换
		val session: SparkSession = dataframe.sparkSession
		import session.implicits._
		
		// a. 获取日期值day
		val days: Array[Row] = dataframe.select($"day").distinct().collect()
		
		// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
		days.map{dayRow =>
			// 获取日期值
			val dayValue: String = dayRow.getString(0)
			
			// 过滤每日数据
			val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue)
			wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK)
			
			// 指标计算
			// 指标一:总运单数
			val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total"))
			
			// 指标二:各区域运单数,最大、最小和平均
			val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count()
			val areaTotalAggDF: DataFrame = areaTotalDF.agg(
				max($"count").as("areaMaxTotal"),
				min($"count").as("areaMinTotal"),
				round(avg($"count"), 0).as("areaAvgTotal")
			)
			
			// 指标三:各分公司运单数,最大、最小和平均
			val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count()
			val companyTotalAggDF: DataFrame = companyTotalDF.agg(
				max($"count").as("companyMaxTotal"),
				min($"count").as("companyMinTotal"),
				round(avg($"count"), 0).as("companyAvgTotal")
			)
			
			// 指标四:各网点运单数,最大、最小和平均
			val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count()
			val dotTotalAggDF: DataFrame = dotTotalDF.agg(
				max($"count").as("dotMaxTotal"),
				min($"count").as("dotMinTotal"),
				round(avg($"count"), 0).as("dotAvgTotal")
			)
			
			// 指标五:各线路运单数,最大、最小和平均
			val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count()
			val routeTotalAggDF: DataFrame = routeTotalDF.agg(
				max($"count").as("routeMaxTotal"),
				min($"count").as("routeMinTotal"),
				round(avg($"count"), 0).as("routeAvgTotal")
			)
			
			// 指标六:各运输工具运单数,最大、最小和平均
			val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count()
			val ttTotalAggDF: DataFrame = ttTotalDF.agg(
				max($"count").as("ttMaxTotal"),
				min($"count").as("ttMinTotal"),
				round(avg($"count"), 0).as("ttAvgTotal")
			)
			
			// 指标七:各类客户运单数,最大、最小和平均
			val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count()
			val typeTotalAggDF: DataFrame = typeTotalDF.agg(
				max($"count").as("typeMaxTotal"),
				min($"count").as("typeMinTotal"),
				round(avg($"count"), 0).as("typeAvgTotal")
			)
			
			// 数据不再使用时,释放资源
			wayBillDetailDF.unpersist()
			
			// 将计算指标封装到Row中
			Row.fromSeq(
				dayRow.toSeq ++
					totalDF.first().toSeq ++
					areaTotalAggDF.first().toSeq ++
					companyTotalAggDF.first().toSeq ++
					dotTotalAggDF.first().toSeq ++
					routeTotalAggDF.first().toSeq ++
					ttTotalAggDF.first().toSeq ++
					typeTotalAggDF.first().toSeq
			)
		}

09-[掌握]-运单主题之指标计算【转换DataFrame】

任务:将统计指标(封装在数组)转换为DataFrame数据集,采用自定义Schema方式完成。

		// c. 将指标结果转换DataFrame
		// i. RDD[Row],采用并行化方式将数组转换为RDD
		val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow)
		// ii. schema,自定义Schema信息
		val schema: StructType = new StructType()
    		.add("id", StringType, nullable = false)
			.add("total", LongType, nullable = true)
			.add("maxAreaTotal", LongType, nullable = true)
			.add("minAreaTotal", LongType, nullable = true)
			.add("avgAreaTotal", DoubleType, nullable = true)
			.add("maxCompanyTotal", LongType, nullable = true)
			.add("minCompanyTotal", LongType, nullable = true)
			.add("avgCompanyTotal", DoubleType, nullable = true)
			.add("maxDotTotal", LongType, nullable = true)
			.add("minDotTotal", LongType, nullable = true)
			.add("avgDotTotal", DoubleType, nullable = true)
			.add("maxRouteTotal", LongType, nullable = true)
			.add("minRouteTotal", LongType, nullable = true)
			.add("avgRouteTotal", DoubleType, nullable = true)
			.add("maxToolTotal", LongType, nullable = true)
			.add("minToolTotal", LongType, nullable = true)
			.add("avgToolTotal", DoubleType, nullable = true)
			.add("maxCtypeTotal", LongType, nullable = true)
			.add("minCtypeTotal", LongType, nullable = true)
			.add("avgCtypeTotal", DoubleType, nullable = true)
		
		// iii. 转换RDD为DataFrame
		val aggDF: DataFrame = session.createDataFrame(aggRDD, schema)
		
		// 返回指标结果
		aggDF

运单主题指标计算完整代码:

package cn.itcast.logistics.offline.dws

import cn.itcast.logistics.common.{Configuration, OfflineTableDefine}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
import org.apache.spark.storage.StorageLevel

/**
 * 运单主题指标开发:
 *      从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。
 */
object WayBillDWS extends AbstractOfflineApp{
	/**
	 * 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集
	 */
	override def process(dataframe: DataFrame): DataFrame = {
		// 导入隐式转换
		val session: SparkSession = dataframe.sparkSession
		import session.implicits._
		
		// a. 获取日期值day
		val days: Array[Row] = dataframe.select($"day").distinct().collect()
		
		// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
		val aggRow: Array[Row] = days.map{ dayRow =>
			// 获取日期值
			val dayValue: String = dayRow.getString(0)
			
			// 过滤每日数据
			val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue)
			wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK)
			
			// 指标计算
			// 指标一:总运单数
			val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total"))
			
			// 指标二:各区域运单数,最大、最小和平均
			val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count()
			val areaTotalAggDF: DataFrame = areaTotalDF.agg(
				max($"count").as("areaMaxTotal"),
				min($"count").as("areaMinTotal"),
				round(avg($"count"), 0).as("areaAvgTotal")
			)
			
			// 指标三:各分公司运单数,最大、最小和平均
			val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count()
			val companyTotalAggDF: DataFrame = companyTotalDF.agg(
				max($"count").as("companyMaxTotal"),
				min($"count").as("companyMinTotal"),
				round(avg($"count"), 0).as("companyAvgTotal")
			)
			
			// 指标四:各网点运单数,最大、最小和平均
			val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count()
			val dotTotalAggDF: DataFrame = dotTotalDF.agg(
				max($"count").as("dotMaxTotal"),
				min($"count").as("dotMinTotal"),
				round(avg($"count"), 0).as("dotAvgTotal")
			)
			
			// 指标五:各线路运单数,最大、最小和平均
			val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count()
			val routeTotalAggDF: DataFrame = routeTotalDF.agg(
				max($"count").as("routeMaxTotal"),
				min($"count").as("routeMinTotal"),
				round(avg($"count"), 0).as("routeAvgTotal")
			)
			
			// 指标六:各运输工具运单数,最大、最小和平均
			val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count()
			val ttTotalAggDF: DataFrame = ttTotalDF.agg(
				max($"count").as("ttMaxTotal"),
				min($"count").as("ttMinTotal"),
				round(avg($"count"), 0).as("ttAvgTotal")
			)
			
			// 指标七:各类客户运单数,最大、最小和平均
			val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count()
			val typeTotalAggDF: DataFrame = typeTotalDF.agg(
				max($"count").as("typeMaxTotal"),
				min($"count").as("typeMinTotal"),
				round(avg($"count"), 0).as("typeAvgTotal")
			)
			
			// 数据不再使用时,释放资源
			wayBillDetailDF.unpersist()
			
			// 将计算指标封装到Row中
			Row.fromSeq(
				dayRow.toSeq ++
					totalDF.first().toSeq ++
					areaTotalAggDF.first().toSeq ++
					companyTotalAggDF.first().toSeq ++
					dotTotalAggDF.first().toSeq ++
					routeTotalAggDF.first().toSeq ++
					ttTotalAggDF.first().toSeq ++
					typeTotalAggDF.first().toSeq
			)
		}
		
		// c. 将指标结果转换DataFrame
		// i. RDD[Row],采用并行化方式将数组转换为RDD
		val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow)
		// ii. schema,自定义Schema信息
		val schema: StructType = new StructType()
    		.add("id", StringType, nullable = false)
			.add("total", LongType, nullable = true)
			.add("maxAreaTotal", LongType, nullable = true)
			.add("minAreaTotal", LongType, nullable = true)
			.add("avgAreaTotal", DoubleType, nullable = true)
			.add("maxCompanyTotal", LongType, nullable = true)
			.add("minCompanyTotal", LongType, nullable = true)
			.add("avgCompanyTotal", DoubleType, nullable = true)
			.add("maxDotTotal", LongType, nullable = true)
			.add("minDotTotal", LongType, nullable = true)
			.add("avgDotTotal", DoubleType, nullable = true)
			.add("maxRouteTotal", LongType, nullable = true)
			.add("minRouteTotal", LongType, nullable = true)
			.add("avgRouteTotal", DoubleType, nullable = true)
			.add("maxToolTotal", LongType, nullable = true)
			.add("minToolTotal", LongType, nullable = true)
			.add("avgToolTotal", DoubleType, nullable = true)
			.add("maxCtypeTotal", LongType, nullable = true)
			.add("minCtypeTotal", LongType, nullable = true)
			.add("avgCtypeTotal", DoubleType, nullable = true)
		
		// iii. 转换RDD为DataFrame
		val aggDF: DataFrame = session.createDataFrame(aggRDD, schema)
		
		// 返回指标结果
		aggDF
	}
	
	def main(args: Array[String]): Unit = {
		// 调用父类中模板方法,传递参数
		execute(
			this.getClass, //
			OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, //
			isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
		)
	}
}

面试题:RDD、DataFrame和Dataset之间关系和区别???

RDD是什么、DataFrame = RDD[Row] + schema 、Dataset = RDD[CaseClass] + schema、DataFrame = Dataset[Row]

10-[理解]-Kudu 原理及优化之数据存储模型

任务:针对Kudu存储引擎,底层数据存储原理和模型。

大数据存储引擎:
	- HDFS 分布式文件系统
		从HDFS读写数据流程
	- HBase 相关知识点
		知识点:表如何设计的 ???RowKey如何设计???
		知识点:从HBase表读写数据流程是啥????
		
	minor compaction:小合并,将多个storefile文件合并为多个文件
	major compaction:大合并,将所有storefile文件合并为一个文件

1614062789027

  • 1)、表table与约束schema
    • schema:字段名称、字段类型、是否为空,是否为主键
    • table数据划分为多个tablet
    • 分区策略(如何划分数据)和副本数(数据安全性)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pyJOpRFZ-1641191197147)(/img/1616396524218.png)]

1614062859258

​ Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统

1616396752594

  • 1)、一个Table会被分成若干个tablet,其中Tablet的数量是根据hash或者是range进行设置的
  • 2)、一个Tablet中包含MetaData信息和多个RowSet信息,其中MetaData信息是block和block在data中的位置。
  • 3)、一个RowSet包含一个MemRowSet和多个DiskRowSet,其中MemRowSet用于存储insert数据和update后的数据,写满后会刷新到磁盘中也就是多个DiskRowSet中,默认是1G刷新一次或者是2分钟。
  • 4)、DiskRowSet用于老数据的mutation(改变),比如说数据的更新操作,后台定期对DiskRowSet
    进行合并操作,删除历史数据和没有的数据,减少查询过程中的IO开销
  • 5)、一个DiskRowSet包含1个BloomFilter,1个Ad_hoc Index,多个UndoFile、RedoFile、BaseData、DeltaMem

有两个在内存中处理的数据集,区别如下:

1616396995152

在KUDU中,把DiskRowSet分为了两部分:

  • 1)、base data: 负责存储基础数据
  • 2)、delta stores:delta stores负责存储 base data 中的变更数据.
    • DeltaMem,存储更新数据
    • DeltaFile,存储变更以后的数据

1616397248377

​ 数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 basedata),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet后续的数据变更(更新、删除)。 DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。

11-[理解]-Kudu 原理及优化之数据读写原理

任务:当从Kudu表读写(读取数据、插入数据、更新数据、删除数据)数据时,原理流程是如何的。

  • 1)、Kudu的工作模式如下图

1616397619688

上图中编号对应功能描述,如下所示:

1616397899658

  • 2)、Kudu 流程

1616397919792

从Kudu读取数据时:

  • 第一点:两次过滤定位,首先对应Tablet Follower·,然后获取DiskRowSet
  • 第二点:先从DiskRowSet读取数据,再读取MemRowSet,最后合并数据,返回Client
  • 3)、Kudu 写流程

1616398209135

当向Kudu表中写入数据时,进行三次判断插入数据主键是否存在,如果不存在,再进行插入数据:

  • 第一次、主键范围过滤
  • 第二次、布隆过滤器
  • 第三次、B-树索引过滤

1616398394710

更新和删除数据,与插入数据类型,需要经过三次过滤判断,当主键存在时,才进行更新和删除操作。

12-[理解]-Kudu 原理及优化之基本优化设置

1616399306832

  • 2)、Kudu 使用限制

1616399377811

  • 3)、字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ihVh0pHo-1641191197151)(img/1616399441166.png)]

  • 4)、表的设置

1616399481288

  • 5)、分区限制

1616399538415

  • 6)、扩展建议和限制

1616399599852

1616399657809

  • 8)、集群管理限制

1616399700991

  • 9)、Spark 集成限制

1616399776432

【作业】-仓库主题报表开发

需求:
	参考建议【1.5 仓库主题】,按照讲解【运单主题】报表开发步骤进行编程实现
	第一、DWD层,开发程序:WarehouseDWD,继承接口AbstractOfflineApp
	第二、DWS层,开发程序:WarehouseDWS,继承接口AbstractOfflineApp
	
面试题:
	如何对Kudu表数据进行备份操作????
	思路:
		SparkSQL程序,从Kudu读取数据,写入HDFS文件
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。