客快物流大数据项目(六十二):主题及指标开发

举报
Lansonli 发表于 2022/03/12 00:38:29 2022/03/12
【摘要】 目录 主题及指标开发 一、主题开发业务流程 二、离线模块初始化 1、创建包结构 2、​​​​​​​创建时间处理工具 3、​​​​​​​定义主题宽表及指标结果表的表名 4、​​​​​​​物流字典码表数据类型定义枚举类 5、​​​​​​​封装公共接口 主题及指标开发 一、主题开发业务流程 二、​​​​​​​离...

目录

主题及指标开发

一、主题开发业务流程

二、离线模块初始化

1、创建包结构

2、​​​​​​​创建时间处理工具

3、​​​​​​​定义主题宽表及指标结果表的表名

4、​​​​​​​物流字典码表数据类型定义枚举类

5、​​​​​​​封装公共接口


主题及指标开发

一、主题开发业务流程

二、​​​​​​​离线模块初始化

1、​​​​​​​创建包结构

本次项目采用scala编程语言,因此创建scala目录

包名

说明

cn.it.logistics.offline

离线指标统计程序所在包

cn.it.logistics.offline.dwd

离线指标dwd层程序所在包

cn.it.logistics.offline.dws

离线指标dws层程序所在包

2、​​​​​​​创建时间处理工具

实现步骤:

  • 公共模块scala目录下的common程序包下创建DateHelper对象
    • 实现获取当前日期
    • 实现获取昨天日期

  
  1. package cn.it.logistics.common
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. /**
  5. * 时间处理工具类
  6. */
  7. object DateHelper {
  8. /**
  9. * 返回昨天的时间
  10. */
  11. def getyesterday(format:String)={
  12. //当前时间减去一天(昨天时间)
  13. new SimpleDateFormat(format).format(new Date(System.currentTimeMillis() - 1000 * 60 * 60 * 24))
  14. }
  15. /**
  16. * 返回今天的时间
  17. * @param format
  18. */
  19. def gettoday(format:String) = {
  20. //获取指定格式的当前时间
  21. new SimpleDateFormat(format).format(new Date)
  22. }
  23. }

3、​​​​​​​定义主题宽表及指标结果表的表名

每个主题都需要拉宽操作将拉宽后的数据存储到kudu表中,同时指标计算的数据最终也需要落地到kudu表,因此提前将各个主题相关表名定义出来

实现步骤:

  • 公共模块scala目录下的common程序包下创建OfflineTableDefine单例对象
  • 定义各个主题相关的表名

参考代码:


  
  1. package cn.it.logistics.common
  2. /**
  3. * 自定义离线计算结果表
  4. */
  5. object OfflineTableDefine {
  6. //快递单明细表
  7. val expressBillDetail = "tbl_express_bill_detail"
  8. //快递单指标结果表
  9. val expressBillSummary = "tbl_express_bill_summary"
  10. //运单明细表
  11. val wayBillDetail = "tbl_waybill_detail"
  12. //运单指标结果表
  13. val wayBillSummary = "tbl_waybill_summary"
  14. //仓库明细表
  15. val wareHouseDetail = "tbl_warehouse_detail"
  16. //仓库指标结果表
  17. val wareHouseSummary = "tbl_warehouse_summary"
  18. //网点车辆明细表
  19. val dotTransportToolDetail = "tbl_dot_transport_tool_detail"
  20. //仓库车辆明细表
  21. val warehouseTransportToolDetail = "tbl_warehouse_transport_tool_detail"
  22. //网点车辆指标结果表
  23. val ttDotSummary = "tbl_dot_transport_tool_summary"
  24. //仓库车辆指标结果表
  25. val ttWsSummary = "tbl_warehouse_transport_tool_summary"
  26. //客户明细表数据
  27. val customerDetail = "tbl_customer_detail"
  28. //客户指标结果表数据
  29. val customerSummery = "tbl_customer_summary"
  30. }

4、​​​​​​​物流字典码表数据类型定义枚举类

为了后续使用方便且易于维护,根据物流字典表的数据类型定义成枚举工具类,物流字典表的数据如下:

来自:tbl_codes表

name

type

注册渠道

1

揽件状态

2

派件状态

3

快递员状态

4

地址类型

5

网点状态

6

员工状态

7

是否保价

8

运输工具类型

9

运输工具状态

10

仓库类型

11

是否租赁

12

货架状态

13

回执单状态

14

出入库类型

15

客户类型

16

下单终端类型

17

下单渠道类型

18

实现步骤:

  • 公共模块scala目录下的common程序包下创建CodeTypeMapping对象
  • 根据物流字典表数据类型定义属性

实现过程:

  • 公共模块scala目录下的common程序包下创建CodeTypeMapping对象
  • 根据物流字典表数据类型定义属性


  
  1. package cn.it.logistics.common
  2. /**
  3. * 定义物流字典编码类型映射工具类
  4. */
  5. class CodeTypeMapping {
  6. //注册渠道
  7. val RegisterChannel = 1
  8. //揽件状态
  9. val CollectStatus = 2
  10. //派件状态
  11. val DispatchStatus = 3
  12. //快递员状态
  13. val CourierStatus = 4
  14. //地址类型
  15. val AddressType = 5
  16. //网点状态
  17. val DotStatus = 6
  18. //员工状态
  19. val StaffStatus = 7
  20. //是否保价
  21. val IsInsured = 8
  22. //运输工具类型
  23. val TransportType = 9
  24. //运输工具状态
  25. val TransportStatus = 10
  26. //仓库类型
  27. val WareHouseType = 11
  28. //是否租赁
  29. val IsRent = 12
  30. //货架状态
  31. val GoodsShelvesStatue = 13
  32. //回执单状态
  33. val ReceiptStatus = 14
  34. //出入库类型
  35. val WarehousingType = 15
  36. //客户类型
  37. val CustomType = 16
  38. //下单终端类型
  39. val OrderTerminalType = 17
  40. //下单渠道类型
  41. val OrderChannelType = 18
  42. }
  43. object CodeTypeMapping extends CodeTypeMapping{
  44. }

5、​​​​​​​封装公共接口

根据分析:主题开发数据的来源都是来自于kudu数据库,将数据进行拉宽或者将计算好的指标最终需要写入到kudu表中,因此根据以上流程抽象出来公共接口

实现步骤:

  • offline目录下创建OfflineApp单例对象
    • 定义数据的读取方法:getKuduSource
    • 定义数据的处理方法:execute
    • 定义数据的存储方法:save

参考代码:


  
  1. package cn.it.logistics.offline
  2. import cn.it.logistics.common.{Configuration, DateHelper, Tools}
  3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  4. import org.apache.spark.sql.functions.{col, date_format}
  5. /**
  6. * 根据不同的主题开发定义抽象方法
  7. * 1)数据读取
  8. * 2)数据处理
  9. * 3)数据保存
  10. */
  11. trait OfflineApp {
  12. /**
  13. * 读取kudu表的数据
  14. * @param sparkSession
  15. * @param tableName
  16. * @param isLoadFullData
  17. */
  18. def getKuduSource(sparkSession: SparkSession, tableName:String, isLoadFullData:Boolean = false)= {
  19. if (isLoadFullData) {
  20. //加载全部的数据
  21. sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(
  22. Map(
  23. "kudu.master" -> Configuration.kuduRpcAddress,
  24. "kudu.table" -> tableName,
  25. "kudu.socketReadTimeoutMs"-> "60000")
  26. ).load().toDF()
  27. } else {
  28. //加载增量数据
  29. sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(
  30. Map(
  31. "kudu.master" -> Configuration.kuduRpcAddress,
  32. "kudu.table" -> tableName,
  33. "kudu.socketReadTimeoutMs"-> "60000")
  34. ).load()
  35. .where(date_format(col("cdt"), "yyyyMMdd") === DateHelper.getyesterday("yyyyMMdd")).toDF()
  36. }
  37. }
  38. /**
  39. * 数据处理
  40. * @param sparkSession
  41. */
  42. def execute(sparkSession: SparkSession)
  43. /**
  44. * 数据存储
  45. * dwd及dws层的数据都是需要写入到kudu数据库中,写入逻辑相同
  46. * @param dataFrame
  47. * @param isAutoCreateTable
  48. */
  49. def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true): Unit = {
  50. //允许自动创建表
  51. if (isAutoCreateTable) {
  52. Tools.autoCreateKuduTable(tableName, dataFrame)
  53. }
  54. //将数据写入到kudu中
  55. dataFrame.write.format(Configuration.SPARK_KUDU_FORMAT).options(Map(
  56. "kudu.master" -> Configuration.kuduRpcAddress,
  57. "kudu.table" -> tableName
  58. )).mode(SaveMode.Append).save()
  59. }
  60. }

​​​​​​​ 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123414712

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。