数据仓库之业务域开发指南

举报
tea_year 发表于 2025/09/29 15:23:06 2025/09/29
【摘要】 一、概要说明业务域的数据来自业务系统的数据库表模型非常多且关系复杂一般来说,这些表可以分为如下类别维度表对一个事物(实体)进行属性描述的表比如,商品信息表produce_id,name,price,stock_num,cat3_id,...​会员信息表member_id,account,nick_name,birthday,star_zuo,gender,age,integration,le...

一、概要说明

业务域的数据来自业务系统的数据库

表模型非常多且关系复杂

一般来说,这些表可以分为如下类别

  • 维度表

对一个事物(实体)进行属性描述的表
比如,商品信息表
produce_id,name,price,stock_num,cat3_id,...

会员信息表
member_id,account,nick_name,birthday,star_zuo,gender,age,integration,level_id

运营位信息
id,name,image_url,ad_campain,...

优惠券信息表
coupon_id,coupon_name,amount,valid_start,valid_end,member_level_id


  • 事实表

对一件发生过的事情(事实)进行描述的表
比如,订单表
order_id,member_id,timestamp,amount,discount,coupon_dikou,hongbao_dikou,receive_addr,receive_phone.

订单商品详情表
order_id,product_id,price,cnt,coupon_id,amount,...

购物车表
member_id,product_id,number,price,create_time,update_time,submit_time,cancel_time,.......

优惠券领取、使用记录表
member_id,create_time,coupon_id,number,timeout_dt,use_time,left_number,..........


  • 字典表

一个“码”,对应一个名称
id,color_name
1,菁蓝
2,土黄
3,金黄
4,天蓝



二、Sqoop数据抽取

1. Sqoop工具介绍

sqoop 是 apache 旗下一款Hadoop中的各种存储系统(HDFS、HIVE、HBASE) 和关系数据库(mysql、oracle、sqlserver等)服务器之间传送数据”的工具。

核心的功能有两个:

导入(迁入)、导出(迁出)


导入数据:MySQL,Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统

导出数据:从 Hadoop 的文件系统中导出数据到关系数据库 mysql 等 Sqoop 的本质还是一个命令行工具,和 HDFS,Hive 相比,并没有什么高深的理论。

wps1.jpg

底层工作机制

将导入或导出命令翻译成 MapReduce 程序来实现

在翻译出的 MapReduce 中主要是对InputFormat 和 OutputFormat 进行定制


2. 数据抽取策略

维度表
维度表小表(品类信息表,活动信息表,优惠券信息表等),每天抽取过来一份全量(或者一周、一月);
维度表大表(商品信息表),每天抽取过来一份增量数据

事实表
订单相关表
优惠券领取使用记录表
秒杀订阅记录表

事实表每天都会抽取一份增量数据


三、ODS层

本层,表模型结构与业务库中的表模型结构保持一致 只是,本层表的数据,主要是通过sqoop增量导入后的每日增量数据; 所以,本层表是分区增量表;

主要表模型

  • 商品信息(主要信息、详情信息、类目信息、属性信息、商品相册信息)

wps2.jpg

  • 用户信息(主要信息、附加信息、会员等级信息)

wps3.jpg

  • 订单信息及购物车相关(主要信息、详情信息、物流信息、评论信息)

wps4.jpg

  • 内容管理(话题,文章,评论)

wps5.jpg

  • 营销管理(优惠券、代金券、活动规则、主题推荐)

wps6.jpg


订单表增量抽取实战

  • mysql建表

create table t_order(
id   int   primary key auto_increment,
amt decimal(10,2),
`status` int default 0,
user_id int,
create_time timestamp DEFAULT CURRENT_TIMESTAMP,
modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)

-- 如果status没有值,则使用默认值0
`status` int default 0,
-- CURRENT_TIMESTAMP作为关键字可以获取系统时间
create_time timestamp DEFAULT CURRENT_TIMESTAMP
-- 当前行数据修改,则这一列自动修改为系统时间
modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
  • 添加测试数据

insert into t_order values(null,100,0,1001,null,null);
insert into t_order values(null,99,0,1002,null,null);


a) 会覆盖原有的表数据

sqoop import \
--driver com.mysql.jdbc.Driver \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--table t_order \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "," \
--delete-target-dir \
--hive-overwrite \
--hive-database ods \
--hive-table t_order


b) 带有分区增量数据导入

1.添加测试数据
insert into t_order values(null,100,0,1001,null,null);
insert into t_order values(null,99,0,1002,null,null);

mysql查询结果
5 100 0 1001 2022-12-01 16:24:10 2022-12-01 16:24:10
6 99 0 1002 2022-12-01 16:24:10 2022-12-01 16:24:10

2.执行sqoop导入脚本
sqoop import \
-Dmapred.job.queue.name=default \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--target-dir /user/root/t_order   \
--delete-target-dir \
--hive-import \
--hive-database ods   \
--hive-table t_order \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--hive-overwrite \
--split-by id \
--query 'select id,amt,status,user_id,create_time,modify_time from t_order where modify_time >= "1970-01-01 00:00:00" AND $CONDITIONS'   \
--hive-partition-key dt   \
--hive-partition-value '2022-12-01' \
-m 1

3.添加和修改数据 (并且假设现在已经是12-02了)
insert into t_order values(null,220,0,1001,'2022-12-02 10:10:10','2022-12-02 10:10:10');
update t_order set `status` = 2 , modify_time = '2022-12-02 11:00:00' where id = 5

mysql查询结果如下
5 100 2 1001 2022-12-01 16:24:10 2022-12-02 11:00:00
6 99 0 1002 2022-12-01 16:24:10 2022-12-01 16:24:10
7 220 0 1001 2022-12-02 10:10:10 2022-12-02 10:10:10


4.执行sqoop导入脚本
-- 期望结果:将添加和修改的数据以增量的方式导入到hive的表的新分区下
sqoop import \
-Dmapred.job.queue.name=default \
--connect jdbc:mysql://hadoop10:3306/test1?useSSL=false&characterEncoding=UTF-8 \
--username root \
--password '123456' \
--target-dir /user/root/t_order   \
--delete-target-dir \
--hive-import \
--hive-database ods   \
--hive-table t_order \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N' \
--hive-overwrite \
--split-by id \
--query 'select id,amt,status,user_id,create_time,modify_time from t_order where modify_time >= "2022-12-02 00:00:00" AND $CONDITIONS'   \
--hive-partition-key dt   \
--hive-partition-value '2022-12-02' \
-m 1

--split-by id 、$CONDITIONS、-m 1 -m 指定maptask的个数 ,如果指定-m为2,底层两个maptask则会处理不同的数据 如果表中有5条数据,哪些数据给第一个maptask,哪些数据给第二个maptask呢? --split-by id 让底层根据id这一列进行拆分 , 例如拆分结果 1<= id <= 2 2 < id <= 5 $CONDITIONS 上述拆分数据形成的条件会替换$CONDITIONS


四、DWD层设计开发

本层主要表类型:

  • 存储各业务表的全量快照

  • 存储各业务表的拉链表

    在我们项目中,本层存储的业务表[拉链表] 主要就是ods中的各种增量导入数据进行全量合并后生成的全量快照表或者拉链表; 快照表和拉链表,都是分区全量表! 只不过,快照表需要保存每一天的分区,才能查询到每一天的该表的数据状态; 而拉链表,则只要保留最后一天的分区即可;


问题:在ODS是增量导入的情况的下,如何在DWD层获取完整数据?

增量合并成全量快照

为了便于后续的统计分析方便,用增量抽取策略抽取过来的增量数据,都要每天进行滚动合并

image-20221201001438256.png

a). 在DWD层创建订单全量快照表
create table dwd.t_order_full(
id int     ,
amt double   ,
status int     ,
user_id int     ,
create_time string   ,
modify_time string  
)partitioned by(dt string)

b).添加数据
注意我们的逻辑:
1.先添加ods.t_order(dt=2022-12-02)的数据
2.然后添加dwd.t_order_full(dt='2022-12-02')的数据
3.然后添加ods.t_order(dt=2022-12-03)的数据
4.然后添加dwd.t_order_full(dt='2022-12-03')的数据

添加dwd.t_order_full(dt='2022-12-03')的数据是根据ods.t_order(dt=2022-12-03)和dwd.t_order_full(dt='2022-12-02')进行full join而来

insert overwrite table dwd.t_order_full
partition(dt='2022-12-02')
select
    if(t1.id is not null,t1.id,t2.id)   id ,
if(t1.id is not null,t1.amt,t2.amt) amt,
if(t1.id is not null,t1.`status`,t2.`status`) `status`,
if(t1.id is not null,t1.user_id,t2.user_id)     user_id,
if(t1.id is not null,t1.create_time,t2.create_time)     create_time,
if(t1.id is not null,t1.modify_time,t2.modify_time)     modify_time
from
(
  select * from ods.t_order where dt = '2022-12-02'
)t1 full join
(
  select * from dwd.t_order_full where dt = '2022-12-01'
)t2 on t1.id = t2.id


拉链表概念及实现逻辑

image-20221201001936433.png

a) 创建一张订单拉链表
create table dwd.t_order_zip(
id int     ,
amt double   ,
status int     ,
user_id int     ,
create_time string   ,
modify_time string   ,
start_dt   string   ,
end_dt     string  
)partitioned by(dt string)

b) 向拉链表添加数据

insert overwrite table dwd.t_order_zip
partition(dt='2022-12-02')
select t1.id,t1.amt,t1.`status`,t1.user_id,t1.create_time,t1.modify_time,
      t1.start_dt,
      case when t1.end_dt = '9999-12-31'  and t2.id is not null then '2022-12-01'
              else t1.end_dt end   end_dt
from (
  select * from dwd.t_order_zip where dt = '2022-12-01'
)t1 left join (
  select * from ods.t_order where dt = '2022-12-02'
)t2 on t1.id = t2.id
union all
select   id,amt,`status`,user_id,create_time,modify_time, '2022-12-02' start_dt,'9999-12-31' end_dt
from ods.t_order where dt = '2022-12-02'


insert overwrite table dwd.t_order_zip
partition(dt='2022-12-03')
select t1.id,t1.amt,t1.`status`,t1.user_id,t1.create_time,t1.modify_time,
      t1.start_dt,
      case when t1.end_dt = '9999-12-31'  and t2.id is not null then '2022-12-02'
              else t1.end_dt end   end_dt
from (
  select * from dwd.t_order_zip where dt = '2022-12-02'
)t1 left join (
  select * from ods.t_order where dt = '2022-12-03'
)t2 on t1.id = t2.id
union all
select   id,amt,`status`,user_id,create_time,modify_time, '2022-12-03' start_dt,'9999-12-31' end_dt
from ods.t_order where dt = '2022-12-03'


五、DWS层设计开发

本层主要处理: 根据业务特点,提炼出若干“主题”(电商系统可以划分:活动主题、订单主题、用户主题、商品主题) 然后,区分每一个主题中的事实表[由用户的某个行动不断产生数据的表]、维度表[是对事实的一种描述]

张三是一个河南省郑州人,在京东平台上购买了一部华为手机 [维度建模]

按照维度建模的思想,按各主题,将核心事实表关联需要的维度表,得到宽表

需求:用户消费订单统计画像标签表

给用户打上一些消费相关(下单、退货、金额、客单价)的统计数据标签

  • 建表

create table dws.user_profile_consumer_tag(
  member_id                      bigint     ,--用户
  first_order_time               string     ,--首单日期
  last_order_time               string     ,--末单日期
  first_order_ago                bigint     ,--首单距今天数
  last_order_ago                 bigint     ,--末单距今天数
  month1_order_cnt               bigint     ,--近30天下单次数
  month1_order_amt               double     ,--近30天购买金额(总金额)
  month2_order_cnt               bigint     ,--近60天购买次数
  month2_order_amt               double     ,--近60天购买金额
  month3_order_cnt               bigint     ,--近90天购买次数
  month3_order_amt               double     ,--近90天购买金额
  max_order_amt                  double     ,--最大订单金额
  min_order_amt                  double     ,--最小订单金额
  total_order_cnt                bigint     ,--累计订单数(不含退拒)
  total_order_amt                double     ,--累计消费金额(不含退拒)
  total_coupon_amt               double     ,--累计使用代金券金额
  user_avg_order_amt             double     ,--平均订单金额(含退拒)
  month3_user_avg_amt            double     ,--近90天平均订单金额(含退拒)
  common_address                 string     ,--常用收货地址
  common_paytype                 string     ,--常用支付方式
  month1_cart_goods_cnt_30       bigint     ,--最近30天加购商品件数
  month1_cart_goods_amt_30       bigint     ,--最近30天加购商品金额
  month1_cart_cancel_cnt         bigint     ,--最近30天取消商品件数
  month1_cart_cancel_amt         bigint     ,--最近30天取消商品金额
  dw_date                       string      --计算日期
) partitioned by
(dt string)
;
  • SQL开发

with part1 as (
select  
t1.member_id,
min(t1.create_time) first_order_time,  -- 首单日期
max(t1.create_time) last_order_time,   -- 末单日期
datediff('2022-12-06',min(t1.create_time)) first_order_ago, -- 首单距今天数
datediff('2022-12-06',max(t1.create_time)) last_order_ago,  -- 末单距今天数
count( if(datediff('2022-12-06',t1.create_time) <= 30 , 1 , null ) )   month1_order_cnt, -- 近30天下单次数
sum( if(datediff('2022-12-06',t1.create_time) <= 30 , t1.total_amount , 0 ) )   month1_order_amt,   -- 近30天购买金额
count( if(datediff('2022-12-06',t1.create_time) <= 60 , 1 , null ) )   month2_order_cnt, -- 近60天下单次数
sum( if(datediff('2022-12-06',t1.create_time) <= 60 , t1.total_amount , 0 ) )   month2_order_amt,   -- 近60天购买金额
count( if(datediff('2022-12-06',t1.create_time) <= 90 , 1 , null ) )   month3_order_cnt, -- 近90天下单次数
sum( if(datediff('2022-12-06',t1.create_time) <= 90 , t1.total_amount , 0 ) )   month3_order_amt, -- 近90天购买金额
max(t1.total_amount) max_order_amt, -- 最大订单金额
min(t1.total_amount) min_order_amt, -- 最小订单金额
count(if(t2.order_id is null,1 , null)) total_order_cnt, -- 累计订单数(不含退拒)
sum(if(t2.order_id is null,t1.total_amount,0)) total_order_amt, -- 累计消费金额(不含退拒)
sum(t1.coupon_amount) total_coupon_amt,   --累计使用代金券金额
avg(t1.total_amount)   user_avg_order_amt, --平均订单金额(含退拒)
avg(if(datediff('2022-12-06',create_time)<=90,total_amount,null)) month3_user_avg_amt -- 近90天平均订单金额(含退拒)
from(
select
id,member_id,create_time,total_amount,coupon_amount,receiver_detail_address
from dwd.oms_order_zip
where dt = '2022-12-06' and start_dt <= '2022-12-06'  and end_dt >= '2022-12-06'
)t1 left join(
select order_id,sum(return_amount) return_amount
from dwd.oms_order_return_apply_zip
where dt = '2022-12-06' and start_dt <= '2022-12-06'  and end_dt >= '2022-12-06' and status = 2
group by order_id
)t2 on t1.id = t2.order_id
group by t1.member_id
)
,part2 as (
select member_id,receiver_detail_address common_address
from(
select
member_id,receiver_detail_address,count(*) c1,
row_number() over(partition by member_id order by count(*) desc) rn
from dwd.oms_order_zip
where dt = '2022-12-06' and start_dt <= '2022-12-06'  and end_dt >= '2022-12-06'
group by member_id,receiver_detail_address
)t1 where rn = 1
),part3 as (
 
   select member_id,pay_type common_paytype
from(
select
member_id,pay_type,count(*) c1,
row_number() over(partition by member_id order by count(*) desc) rn
from dwd.oms_order_zip
where dt = '2022-12-06' and start_dt <= '2022-12-06'  and end_dt >= '2022-12-06'  and pay_type != 0
group by member_id,pay_type
)t1 where rn = 1

),part4 as (
  select member_id,
    sum(if(datediff('2022-12-06',create_date)<=30,quantity,0)) month1_cart_goods_cnt_30,
sum(if(datediff('2022-12-06',create_date)<=30,quantity*price,0)) month1_cart_goods_amt_30,
sum(if(datediff('2022-12-06',create_date)<=30 and delete_status='1',quantity,0)) month1_cart_cancel_cnt,
sum(if(datediff('2022-12-06',create_date)<=30 and delete_status='1',quantity*price,0)) month1_cart_cancel_amt  
  from dwd.oms_cart_item_zip
  where dt = '2022-12-06' and start_dt <= '2022-12-06'  and end_dt >= '2022-12-06'
  group by member_id
)
insert overwrite table dws.user_profile_consumer_tag
partition(dt = '2022-12-06')
select part1.member_id        
,part1.first_order_time              
,part1.last_order_time              
,part1.first_order_ago              
,part1.last_order_ago                
,part1.month1_order_cnt              
,part1.month1_order_amt              
,part1.month2_order_cnt              
,part1.month2_order_amt              
,part1.month3_order_cnt              
,part1.month3_order_amt              
,part1.max_order_amt                
,part1.min_order_amt                
,part1.total_order_cnt              
,part1.total_order_amt              
,part1.total_coupon_amt              
,part1.user_avg_order_amt            
,part1.month3_user_avg_amt          
,part2.common_address                
,part3.common_paytype                
,part4.month1_cart_goods_cnt_30      
,part4.month1_cart_goods_amt_30      
,part4.month1_cart_cancel_cnt        
,part4.month1_cart_cancel_amt        
,'2022-12-06' dw_date    
from part1
left join part2 on part1.member_id = part2.member_id
left join part3 on part1.member_id = part3.member_id
left join part4 on part1.member_id = part4.member_id

总结

本次业务域数据仓库开发围绕数据分层架构展开,覆盖从业务库抽取到主题建模全流程,核心要点如下:
数据层设计遵循 ODS、DWD、DWS 三层架构:ODS 层保持业务库表结构一致,通过 Sqoop 按策略抽取数据,维度表采用全量或增量抽取,事实表每日增量导入并按日期分区;DWD 层通过全量快照表和拉链表处理 ODS 增量数据,快照表每日滚动合并保留历史状态,拉链表以start_dtend_dt记录数据生命周期,减少存储冗余;DWS 层按业务主题(如用户消费)构建宽表,整合多源数据生成用户画像标签。
关键技术上,Sqoop 通过--query和分区参数实现增量导入,DWD 层用full join合并增量与历史数据生成快照,拉链表通过union allcase when更新数据有效期,DWS 层基于 CTE(with子句)分模块计算用户消费指标,结合窗口函数定位常用地址、支付方式等维度信息,最终形成结构化的分析型数据,为后续业务决策提供支撑。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。