大数据物流项目:主题及报表开发(十二点五)
05-[掌握]-主题及指标开发之重构公共接口【编程】
任务:前面已经定义好公共接口方法声明,实现基本方法代码。
package cn.itcast.logistics.offline
import cn.itcast.logistics.common.{Configuration, KuduTools, SparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, current_date, date_format, date_sub}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* 离线报表分析公共接口,采用模板方法设计模式构建:基本方法和模板方法
*/
trait AbstractOfflineApp {
// 定义变量
private var spark: SparkSession = _
// 实例化spark对象
def init(clazz: Class[_]): Unit = {
// a. 构建SparkConf对象,基本设置
var sparkConf: SparkConf = SparkUtils.sparkConf()
// b. 设置运行方式
sparkConf = SparkUtils.autoSettingEnv(sparkConf)
// c. 构建SparkSession对象
spark = SparkUtils.createSparkSession(sparkConf, this.getClass)
spark.sparkContext.setLogLevel(Configuration.LOG_OFF)
}
// 从Kudu表加载数据
def loadKuduSource(tableName: String,
isLoadFullData: Boolean = false): DataFrame = {
// 加载Kudu表数据,不考虑全量还是增量
var kuduDF: DataFrame = spark.read
.format(Configuration.SPARK_KUDU_FORMAT)
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.socketReadTimeoutMs", "60000")
.load()
// 如果是增量加载数据,表示加载昨日数据,需要过滤操作
if(!isLoadFullData){
kuduDF = kuduDF
// 依据 每个表中字段:cdt = 2013-06-02 21:24:00,过滤数据
.filter(
date_sub(current_date(), 1) === date_format(col("cdt"), "yyyy-MM-dd")
)
}
// 返回数据
kuduDF
}
// 处理数据,要么是数据拉宽,要么是指标计算
def process(dataframe: DataFrame): DataFrame
// 保存数据到Kudu表
def saveKuduSink(dataframe: DataFrame, tableName: String,
isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")) = {
// 如果允许创建表,并且表不存在,就创建表
if(isAutoCreateTable){
KuduTools.createKuduTable(tableName, dataframe, keys)
}
// 保存数据到Kudu表
dataframe.write
.mode(SaveMode.Append)
.format(Configuration.SPARK_KUDU_FORMAT)
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.operation", "upsert")
.save()
}
// 关闭会话实例对象
def close(): Unit = {
if(null != spark) spark.close()
}
// TODO: 定义模块方法,规定基本方法执行顺序
def execute(clazz: Class[_],
srcTable: String, dstTable: String,
isLoadFullData: Boolean = false,
isAutoCreateTable: Boolean = true, keys: Seq[String] = Seq("id")): Unit = {
// step1. 初始化
init(clazz)
try{
// step2. 加载Kudu表数据
val kuduDF: DataFrame = loadKuduSource(srcTable, isLoadFullData)
kuduDF.show(10, truncate = false)
// step3. 处理数据
val resultDF: DataFrame = process(kuduDF)
resultDF.show(10, truncate = false)
// step4. 保存数据
saveKuduSink(resultDF, dstTable, isAutoCreateTable, keys)
}catch {
case e: Exception => e.printStackTrace()
}finally {
// step5. 关闭资源
close()
}
}
}
在模板方法中调用基本方法(具体方法:有方法体和抽象方法:子类具体实现),定义调用步骤。
06-[掌握]-运单主题之数据拉宽开发
任务:针对运单主题进行DWD层数据拉宽开发,创建对象
WayBillDWD
对象,继承公共接口AbstractOfflineApp
,实现process
方法即可。
- 1)、将
运单数据与相关维度数据
,进行关联JION以后,字段如下:
当不知道获取那些字段时,最简单和粗暴方式:获取所有字段,不客气,不建议。
- 2)、SQL语句
- 3)、创建对象,编写代码,将事实表数据与维度表数据,进行拉宽操作
在
dwd
目录下创建WayBillDWD
单例对象,继承自AbstractOfflineApp
特质
package cn.itcast.logistics.offline.dwd
import cn.itcast.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, TableMapping}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
/**
* 运单主题开发:
* 将运单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到运单宽表中
*/
object WayBillDWD extends AbstractOfflineApp{
/**
* 将事实表与维度表关联,进行拉宽操作
*/
override def process(dataframe: DataFrame): DataFrame = {
// 导入隐式转换
import dataframe.sparkSession.implicits._
// step1. 加载维度表数据
// 加载快递员表
val courierDF: DataFrame = loadKuduSource(TableMapping.COURIER, isLoadFullData = true)
// 加载网点表
val dotDF: DataFrame = loadKuduSource(TableMapping.DOT, isLoadFullData = true)
// 加载区域表
val areasDF: DataFrame = loadKuduSource(TableMapping.AREAS, isLoadFullData = true)
// 加载转运记录表
val recordDF: DataFrame = loadKuduSource(TableMapping.TRANSPORT_RECORD, isLoadFullData = true)
// 加载起始仓库表
val startWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true)
// 加载到达仓库表
val endWarehouseDF: DataFrame = loadKuduSource(TableMapping.WAREHOUSE, isLoadFullData = true)
// 加载车辆表
val toolDF: DataFrame = loadKuduSource(TableMapping.TRANSPORT_TOOL, isLoadFullData = true)
// 加载线路表
val routeDF: DataFrame = loadKuduSource(TableMapping.ROUTE, isLoadFullData = true)
// 加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true)
// 加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = loadKuduSource(TableMapping.COMPANY_WAREHOUSE_MAP, isLoadFullData = true)
// 加载起始仓库所在公司表
val startCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true)
// 加载到达仓库所在公司表
val endCompanyDF: DataFrame = loadKuduSource(TableMapping.COMPANY, isLoadFullData = true)
// 加载物流码表
val codesDF: DataFrame = loadKuduSource(TableMapping.CODES, isLoadFullData = true)
// 加载客户表
val customerDF: DataFrame = loadKuduSource(TableMapping.CUSTOMER, isLoadFullData = true)
// 下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF
.where(col("type") === CodeTypeMapping.ORDER_CHANNEL_TYPE)
.select(
col("code").as("orderChannelTypeCode"), col("codeDesc").as("orderChannelTypeName")
)
// 客户类型表
val customerTypeDF: DataFrame = codesDF
.where(col("type") === CodeTypeMapping.CUSTOM_TYPE)
.select(
col("code").as("customerTypeCode"), col("codeDesc").as("customerTypeName")
)
// step2. 将事实表与维度表关联
val left_outer = "left_outer"
val wayBillDF: DataFrame = dataframe
val joinDF: DataFrame = wayBillDF
// 运单表与快递员表进行关联
.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer)
// 网点表与快递员表进行关联
.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer)
// 网点表与区域表进行关联
.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer)
// 转运记录表与运单表关联
.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer)
// 起始仓库与转运记录表关联
.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer)
// 到达仓库与转运记录表关联
.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer)
// 转运记录表与交通工具表关联
.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer)
// 转运记录表与路线表关联
.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer)
// 起始仓库表与仓库公司关联表关联
.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer)
// 公司表与起始仓库公司关联表关联
.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer)
// 到达仓库表与仓库公司关联表关联
.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer)
// 公司表与到达仓库公司关联表关联
.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer)
// 运单表与客户表关联
.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer)
// 下单渠道表与运单表关联
.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer)
// 客户类型表与客户表关联
.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer)
// step3. 选择字段和添加日期day
val wayBillDetailDF: Dataset[Row] = joinDF
// 选择字段
.select(
wayBillDF("id"), //运单id
wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
wayBillDF("waybillNumber").as("waybill_number"), //运单编号
wayBillDF("cid"), //客户id
customerDF("name").as("cname"), //客户名称
customerDF("type").as("ctype"), //客户类型
customerTypeDF("customerTypeName").as("ctype_name"), //客户类型名称
wayBillDF("eid"), //快递员id
courierDF("name").as("ename"), //快递员名称
dotDF("id").as("dot_id"), //网点id
dotDF("dotName").as("dot_name"), //网点名称
areasDF("id").as("area_id"), //区域id
areasDF("name").as("area_name"), //区域名称
wayBillDF("orderChannelId").as("order_channel_id"), //渠道id
orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"), //渠道名称
wayBillDF("orderDt").as("order_dt"), //下单时间
wayBillDF("orderTerminalType").as("order_terminal_type"), //下单设备类型
wayBillDF("orderTerminalOsType").as("order_terminal_os_type"), //下单设备操作系统类型
wayBillDF("reserveDt").as("reserve_dt"), //预约取件时间
wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"), //是否取件超时
wayBillDF("pkgId").as("pkg_id"), //订装ID
wayBillDF("pkgNumber").as("pkg_number"), //订装编号
wayBillDF("timeoutDt").as("timeout_dt"), //超时时间
wayBillDF("transformType").as("transform_type"), //运输方式
wayBillDF("deliveryAddr").as("delivery_addr"),
wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
wayBillDF("deliveryMobile").as("delivery_mobile"),
wayBillDF("deliveryTel").as("delivery_tel"),
wayBillDF("receiveAddr").as("receive_addr"),
wayBillDF("receiveCustomerName").as("receive_customer_name"),
wayBillDF("receiveMobile").as("receive_mobile"),
wayBillDF("receiveTel").as("receive_tel"),
wayBillDF("cdt"),
wayBillDF("udt"),
wayBillDF("remark"),
recordDF("swId").as("sw_id"),
startWarehouseDF("name").as("sw_name"),
startCompanyDF("id").as("sw_company_id"),
startCompanyDF("companyName").as("sw_company_name"),
recordDF("ewId").as("ew_id"),
endWarehouseDF("name").as("ew_name"),
endCompanyDF("id").as("ew_company_id"),
endCompanyDF("companyName").as("ew_company_name"),
toolDF("id").as("tt_id"),
toolDF("licensePlate").as("tt_name"),
recordDF("routeId").as("route_id"),
concat(routeDF("startStation"), routeDF("endStation")).as("route_name")
)
// 添加字段,日期字段day增加日期列
.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd"))
// 根据运单表的创建时间顺序排序
.sort(wayBillDF.col("cdt").asc)
// 返回拉宽数据
wayBillDetailDF
}
// SparkSQL程序入口
def main(args: Array[String]): Unit = {
// TODO: 调用父类中模板方法,传递相关参数即可
execute(
this.getClass, //
TableMapping.WAY_BILL, // 事实表
OfflineTableDefine.WAY_BILL_DETAIL, // 宽表
isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
)
}
}
注意:在DWD层程序中,mian仅仅调度父类模板方法,传递参数即可。
07-[掌握]-运单主题之指标计算【MAIN 方法】
任务:从Kudu数据库中加载运单宽表数据,按照指标进行计算,创建
WayBillDWS
对象,继承公共接口。
- 1)、指标字段,首先获取总的运单数及各个维度统计最大、最小和平均运单数目。
- 2)、Spark 编程实现
在
dws
目录下创建WayBillDWS
单例对象,继承自AbstractOfflineApp
特质
package cn.itcast.logistics.offline.dws
import cn.itcast.logistics.common.{Configuration, OfflineTableDefine}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* 运单主题指标开发:
* 从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。
*/
object WayBillDWS extends AbstractOfflineApp{
/**
* 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集
*/
override def process(dataframe: DataFrame): DataFrame = {
// 导入隐式转换
val session: SparkSession = dataframe.sparkSession
import session.implicits._
// a. 获取日期值day
// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
// c. 将指标结果转换DataFrame
// 返回指标结果
}
def main(args: Array[String]): Unit = {
// 调用父类中模板方法,传递参数
execute(
this.getClass, //
OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, //
isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
)
}
}
08-[掌握]-运单主题之指标计算【process 方法】
任务:实现process方法,先进行指标计算,按照每天数据进行指标统计,示意图如下所示:
具体指标计算代码如下所示:
// 导入隐式转换
val session: SparkSession = dataframe.sparkSession
import session.implicits._
// a. 获取日期值day
val days: Array[Row] = dataframe.select($"day").distinct().collect()
// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
days.map{dayRow =>
// 获取日期值
val dayValue: String = dayRow.getString(0)
// 过滤每日数据
val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue)
wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK)
// 指标计算
// 指标一:总运单数
val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total"))
// 指标二:各区域运单数,最大、最小和平均
val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count()
val areaTotalAggDF: DataFrame = areaTotalDF.agg(
max($"count").as("areaMaxTotal"),
min($"count").as("areaMinTotal"),
round(avg($"count"), 0).as("areaAvgTotal")
)
// 指标三:各分公司运单数,最大、最小和平均
val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count()
val companyTotalAggDF: DataFrame = companyTotalDF.agg(
max($"count").as("companyMaxTotal"),
min($"count").as("companyMinTotal"),
round(avg($"count"), 0).as("companyAvgTotal")
)
// 指标四:各网点运单数,最大、最小和平均
val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count()
val dotTotalAggDF: DataFrame = dotTotalDF.agg(
max($"count").as("dotMaxTotal"),
min($"count").as("dotMinTotal"),
round(avg($"count"), 0).as("dotAvgTotal")
)
// 指标五:各线路运单数,最大、最小和平均
val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count()
val routeTotalAggDF: DataFrame = routeTotalDF.agg(
max($"count").as("routeMaxTotal"),
min($"count").as("routeMinTotal"),
round(avg($"count"), 0).as("routeAvgTotal")
)
// 指标六:各运输工具运单数,最大、最小和平均
val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count()
val ttTotalAggDF: DataFrame = ttTotalDF.agg(
max($"count").as("ttMaxTotal"),
min($"count").as("ttMinTotal"),
round(avg($"count"), 0).as("ttAvgTotal")
)
// 指标七:各类客户运单数,最大、最小和平均
val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count()
val typeTotalAggDF: DataFrame = typeTotalDF.agg(
max($"count").as("typeMaxTotal"),
min($"count").as("typeMinTotal"),
round(avg($"count"), 0).as("typeAvgTotal")
)
// 数据不再使用时,释放资源
wayBillDetailDF.unpersist()
// 将计算指标封装到Row中
Row.fromSeq(
dayRow.toSeq ++
totalDF.first().toSeq ++
areaTotalAggDF.first().toSeq ++
companyTotalAggDF.first().toSeq ++
dotTotalAggDF.first().toSeq ++
routeTotalAggDF.first().toSeq ++
ttTotalAggDF.first().toSeq ++
typeTotalAggDF.first().toSeq
)
}
09-[掌握]-运单主题之指标计算【转换DataFrame】
任务:将统计指标(封装在数组)转换为DataFrame数据集,采用自定义Schema方式完成。
// c. 将指标结果转换DataFrame
// i. RDD[Row],采用并行化方式将数组转换为RDD
val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow)
// ii. schema,自定义Schema信息
val schema: StructType = new StructType()
.add("id", StringType, nullable = false)
.add("total", LongType, nullable = true)
.add("maxAreaTotal", LongType, nullable = true)
.add("minAreaTotal", LongType, nullable = true)
.add("avgAreaTotal", DoubleType, nullable = true)
.add("maxCompanyTotal", LongType, nullable = true)
.add("minCompanyTotal", LongType, nullable = true)
.add("avgCompanyTotal", DoubleType, nullable = true)
.add("maxDotTotal", LongType, nullable = true)
.add("minDotTotal", LongType, nullable = true)
.add("avgDotTotal", DoubleType, nullable = true)
.add("maxRouteTotal", LongType, nullable = true)
.add("minRouteTotal", LongType, nullable = true)
.add("avgRouteTotal", DoubleType, nullable = true)
.add("maxToolTotal", LongType, nullable = true)
.add("minToolTotal", LongType, nullable = true)
.add("avgToolTotal", DoubleType, nullable = true)
.add("maxCtypeTotal", LongType, nullable = true)
.add("minCtypeTotal", LongType, nullable = true)
.add("avgCtypeTotal", DoubleType, nullable = true)
// iii. 转换RDD为DataFrame
val aggDF: DataFrame = session.createDataFrame(aggRDD, schema)
// 返回指标结果
aggDF
运单主题指标计算完整代码:
package cn.itcast.logistics.offline.dws
import cn.itcast.logistics.common.{Configuration, OfflineTableDefine}
import cn.itcast.logistics.offline.AbstractOfflineApp
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType}
import org.apache.spark.storage.StorageLevel
/**
* 运单主题指标开发:
* 从Kudu表加载宽表数据,按照业务指标进行统计分析:基于不同维度分组聚合,类似快递单指表指标。
*/
object WayBillDWS extends AbstractOfflineApp{
/**
* 按照业务指标进行计算,按照日期day进行统计,最后返回DataFrame数据集
*/
override def process(dataframe: DataFrame): DataFrame = {
// 导入隐式转换
val session: SparkSession = dataframe.sparkSession
import session.implicits._
// a. 获取日期值day
val days: Array[Row] = dataframe.select($"day").distinct().collect()
// b. 遍历日期值,划分宽表数据,对每天数据进行指标统计
val aggRow: Array[Row] = days.map{ dayRow =>
// 获取日期值
val dayValue: String = dayRow.getString(0)
// 过滤每日数据
val wayBillDetailDF: Dataset[Row] = dataframe.filter($"day" === dayValue)
wayBillDetailDF.persist(StorageLevel.MEMORY_AND_DISK)
// 指标计算
// 指标一:总运单数
val totalDF: DataFrame = wayBillDetailDF.agg(count("id").as("total"))
// 指标二:各区域运单数,最大、最小和平均
val areaTotalDF: DataFrame = wayBillDetailDF.groupBy($"area_id").count()
val areaTotalAggDF: DataFrame = areaTotalDF.agg(
max($"count").as("areaMaxTotal"),
min($"count").as("areaMinTotal"),
round(avg($"count"), 0).as("areaAvgTotal")
)
// 指标三:各分公司运单数,最大、最小和平均
val companyTotalDF: DataFrame = wayBillDetailDF.groupBy($"sw_company_name").count()
val companyTotalAggDF: DataFrame = companyTotalDF.agg(
max($"count").as("companyMaxTotal"),
min($"count").as("companyMinTotal"),
round(avg($"count"), 0).as("companyAvgTotal")
)
// 指标四:各网点运单数,最大、最小和平均
val dotTotalDF: DataFrame = wayBillDetailDF.groupBy($"dot_id").count()
val dotTotalAggDF: DataFrame = dotTotalDF.agg(
max($"count").as("dotMaxTotal"),
min($"count").as("dotMinTotal"),
round(avg($"count"), 0).as("dotAvgTotal")
)
// 指标五:各线路运单数,最大、最小和平均
val routeTotalDF: DataFrame = wayBillDetailDF.groupBy($"route_id").count()
val routeTotalAggDF: DataFrame = routeTotalDF.agg(
max($"count").as("routeMaxTotal"),
min($"count").as("routeMinTotal"),
round(avg($"count"), 0).as("routeAvgTotal")
)
// 指标六:各运输工具运单数,最大、最小和平均
val ttTotalDF: DataFrame = wayBillDetailDF.groupBy($"tt_id").count()
val ttTotalAggDF: DataFrame = ttTotalDF.agg(
max($"count").as("ttMaxTotal"),
min($"count").as("ttMinTotal"),
round(avg($"count"), 0).as("ttAvgTotal")
)
// 指标七:各类客户运单数,最大、最小和平均
val typeTotalDF: DataFrame = wayBillDetailDF.groupBy($"ctype").count()
val typeTotalAggDF: DataFrame = typeTotalDF.agg(
max($"count").as("typeMaxTotal"),
min($"count").as("typeMinTotal"),
round(avg($"count"), 0).as("typeAvgTotal")
)
// 数据不再使用时,释放资源
wayBillDetailDF.unpersist()
// 将计算指标封装到Row中
Row.fromSeq(
dayRow.toSeq ++
totalDF.first().toSeq ++
areaTotalAggDF.first().toSeq ++
companyTotalAggDF.first().toSeq ++
dotTotalAggDF.first().toSeq ++
routeTotalAggDF.first().toSeq ++
ttTotalAggDF.first().toSeq ++
typeTotalAggDF.first().toSeq
)
}
// c. 将指标结果转换DataFrame
// i. RDD[Row],采用并行化方式将数组转换为RDD
val aggRDD: RDD[Row] = session.sparkContext.parallelize(aggRow)
// ii. schema,自定义Schema信息
val schema: StructType = new StructType()
.add("id", StringType, nullable = false)
.add("total", LongType, nullable = true)
.add("maxAreaTotal", LongType, nullable = true)
.add("minAreaTotal", LongType, nullable = true)
.add("avgAreaTotal", DoubleType, nullable = true)
.add("maxCompanyTotal", LongType, nullable = true)
.add("minCompanyTotal", LongType, nullable = true)
.add("avgCompanyTotal", DoubleType, nullable = true)
.add("maxDotTotal", LongType, nullable = true)
.add("minDotTotal", LongType, nullable = true)
.add("avgDotTotal", DoubleType, nullable = true)
.add("maxRouteTotal", LongType, nullable = true)
.add("minRouteTotal", LongType, nullable = true)
.add("avgRouteTotal", DoubleType, nullable = true)
.add("maxToolTotal", LongType, nullable = true)
.add("minToolTotal", LongType, nullable = true)
.add("avgToolTotal", DoubleType, nullable = true)
.add("maxCtypeTotal", LongType, nullable = true)
.add("minCtypeTotal", LongType, nullable = true)
.add("avgCtypeTotal", DoubleType, nullable = true)
// iii. 转换RDD为DataFrame
val aggDF: DataFrame = session.createDataFrame(aggRDD, schema)
// 返回指标结果
aggDF
}
def main(args: Array[String]): Unit = {
// 调用父类中模板方法,传递参数
execute(
this.getClass, //
OfflineTableDefine.WAY_BILL_DETAIL, OfflineTableDefine.WAY_BILL_SUMMARY, //
isLoadFullData = Configuration.IS_FIRST_RUNNABLE //
)
}
}
面试题:RDD、DataFrame和Dataset之间关系和区别???
RDD是什么、DataFrame = RDD[Row] + schema 、Dataset = RDD[CaseClass] + schema、DataFrame = Dataset[Row]
10-[理解]-Kudu 原理及优化之数据存储模型
任务:针对
Kudu
存储引擎,底层数据存储原理和模型。
大数据存储引擎:
- HDFS 分布式文件系统
从HDFS读写数据流程
- HBase 相关知识点
知识点:表如何设计的 ???RowKey如何设计???
知识点:从HBase表读写数据流程是啥????
minor compaction:小合并,将多个storefile文件合并为多个文件
major compaction:大合并,将所有storefile文件合并为一个文件
- 1)、表
table
与约束schema
- schema:字段名称、字段类型、是否为空,是否为主键
- table数据划分为多个tablet
- 分区策略(如何划分数据)和副本数(数据安全性)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pyJOpRFZ-1641191197147)(/img/1616396524218.png)]
- 2)、Kudu 底层数据模型,非常类似HBase数据底层数据存储模型
Kudu的底层数据文件的存储,未采用HDFS这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于Table/Tablet/Replica视图级别的底层存储系统
- 1)、一个Table会被分成若干个tablet,其中Tablet的数量是根据hash或者是range进行设置的
- 2)、一个Tablet中包含MetaData信息和多个RowSet信息,其中MetaData信息是block和block在data中的位置。
- 3)、一个RowSet包含一个
MemRowSet
和多个DiskRowSet,其中MemRowSet用于存储insert数据和update后的数据,写满后会刷新到磁盘中也就是多个DiskRowSet中,默认是1G刷新一次或者是2分钟。- 4)、DiskRowSet用于老数据的mutation(改变),比如说数据的更新操作,后台定期对DiskRowSet
进行合并操作,删除历史数据和没有的数据,减少查询过程中的IO开销- 5)、一个DiskRowSet包含1个BloomFilter,1个Ad_hoc Index,多个UndoFile、RedoFile、BaseData、
DeltaMem
MemRowSet
:存储新增的数据,对该内存数据集中还未flush的数据的更新;
- 新插入数据insert(以前表中没有,主键不存在)。
- 更新后的数据(update以后,新的数据)
DeltaMem
:对已flush到磁盘内的数据的更新;
- 某个数据从memRowSet刷新到DiskRowSet磁盘中,用户对此数据进行更新,将更新数据放到DeltaMem内存中
- 当对DiskRowSet磁盘中老数据和DelteMem中要更新数据进行合并以后,再次放到MemRowSet中
在KUDU中,把
DiskRowSet
分为了两部分:
- 1)、base data: 负责存储基础数据
- 2)、delta stores:delta stores负责存储 base data 中的变更数据.
- DeltaMem,存储更新数据
- DeltaFile,存储变更以后的数据
数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 basedata),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet后续的数据变更(更新、删除)。 DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。
11-[理解]-Kudu 原理及优化之数据读写原理
任务:当从Kudu表读写(读取数据、插入数据、更新数据、删除数据)数据时,原理流程是如何的。
- 1)、Kudu的工作模式如下图
上图中编号对应功能描述,如下所示:
- 2)、Kudu
读
流程
从Kudu读取数据时:
- 第一点:两次过滤定位,首先对应Tablet Follower·,然后获取DiskRowSet
- 第二点:先从DiskRowSet读取数据,再读取MemRowSet,最后合并数据,返回Client
- 3)、Kudu 写流程
当向Kudu表中写入数据时,进行三次判断插入数据主键是否存在,如果不存在,再进行插入数据:
- 第一次、主键范围过滤
- 第二次、布隆过滤器
- 第三次、B-树索引过滤
更新和删除数据,与插入数据类型,需要经过三次过滤判断,当主键存在时,才进行更新和删除操作。
12-[理解]-Kudu 原理及优化之基本优化设置
- 1)、Kudu 关键配置,设置Kudu TabletServer 内存
- 2)、Kudu 使用限制
- 3)、字段
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ihVh0pHo-1641191197151)(img/1616399441166.png)]
- 4)、表的设置
- 5)、分区限制
- 6)、扩展建议和限制
- 7)、守护进程,Master和TabletServer内存配置
- 8)、集群管理限制
- 9)、Spark 集成限制
【作业】-仓库主题报表开发
需求:
参考建议【1.5 仓库主题】,按照讲解【运单主题】报表开发步骤进行编程实现
第一、DWD层,开发程序:WarehouseDWD,继承接口AbstractOfflineApp
第二、DWS层,开发程序:WarehouseDWS,继承接口AbstractOfflineApp
面试题:
如何对Kudu表数据进行备份操作????
思路:
SparkSQL程序,从Kudu读取数据,写入HDFS文件
- 点赞
- 收藏
- 关注作者
评论(0)