Flink实时数仓架构概览

举报
tea_year 发表于 2025/09/25 11:41:27 2025/09/25
【摘要】 Flink 实时数仓在 ODS 到 DWD 层处理行为日志时,需对用户属性、页面信息、地域等公共维度做退维,避免后续重复关联,提升分析效率,核心技术如下: 采用 “MySQL CDC 同步 + HBase 存储 + Flink Lookup Join” 架构:业务库(如用户注册表)通过 MySQL CDC 监听 binlog,实时同步数据至 HBase 维表;行为日志流用 Flink Looku

Flink实时数仓架构概览

image-20230719001949868.png


行为事件公共维度退维ETL开发

1 退维概念

  • 行为日志数据的各类统计分析需求中,免不了要进行各类指标的多维分析

  • 而原始的行为日志数据(相当于多维模型中的中心事实表)中,很多维度字段只有维度外键,并没有维度的详细信息,因而需要进行维度信息(维表)的关联;

  • 在做各类多维统计分析时,每次都去关联“维表”,效率较低;如果能够在做多维统计之前统一进行维度退化,则可以节省大量的重复关联运算;

  • 然而,我们的行为日志数据,包含近百类行为事件,不同类型的行为事件,其日志数据拥有着不同的维度字段,不合适做统一全局退维;但他们都包含了一些共同的公共维度字段,可以提前退维;

  • 所以,我们在ods->dwd的处理过程中,考虑把这些公共的维度信息进行退维处理,以避免后续的重复关联;至于不同事件所拥有的个性化维度,则在对应主题分析时再去关联


2 退维处理需求

公共维度

  • 用户属性维度(如会员渠道、会员性别、会员等级、会员年龄……)

  • 页面信息维度(页面标题,页面类型,页面所属子业务系统,页面所属栏目、频道,页面所属营销活动等)

  • 地域维度(行为事件发生时所在的省、市、区、街道、商圈等)

  • 设备信息维度(设备品牌,设备厂家,设备的价位区间等)

输出目标

  • 落地到DWD层的存储系统kafka中(提供给后续的流式计算)


3 用户属性维表处理

用户属性退维核心思路:行为事件日志数据中,有用户的登录账号,但缺失注册时间、会员等级、性别等我们需要的维度信息;用户属性退维,就是需要寻找到相关信息,并通过登录账号关联后,集成到行为日志数据中来;


用户属性数据,来源于用户注册信息;而用户注册信息,存储在业务系统的(OLTP)联机数据库的用户注册信息表中

在flink程序中,如何对输入的行为日志数据流,去关联用户注册信息呢?

方案要点:LOOKUP查询hbase表

通过mysql-cdc-connector,监听实时数仓所需要查询的各类维表信息后,将数据整理后写入hbase中,后续的流式维表查询场景中,在使用hbase-connector +lookup join 的方式去hbase中查询;

这种方式,显然不会给业务库带来额外压力;

在流式查询中,并发查询能力也能得到hbase的有力支撑,查询响应延迟也在可接受范围内

流程图 (1).jpg


lookup关联方式,可以设置缓存来减轻维表所在库的查询压力

但是开启lookup缓存机制,会造成可能关联到过时数据的问题

lookup.cache.max-rows = -1
lookup.cache.ttl = 0 s


4 用户注册信息表维表开发

用户注册信息业务表说明

CREATE TABLE `ums_member` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`member_level_id` bigint(20) DEFAULT NULL COMMENT '用户会员等级ID',
`username` varchar(64) DEFAULT NULL COMMENT '用户名',
`password` varchar(64) DEFAULT NULL COMMENT '密码',
`nickname` varchar(64) DEFAULT NULL COMMENT '昵称',
`phone` varchar(64) DEFAULT NULL COMMENT '手机号码',
`status` int(1) DEFAULT NULL COMMENT '帐号启用状态:0->禁用;1->启用',
`create_time` datetime DEFAULT NULL COMMENT '注册时间',
`icon` varchar(500) DEFAULT NULL COMMENT '头像',
`gender` int(1) DEFAULT NULL COMMENT '性别:0->未知;1->男;2->女',
`birthday` date DEFAULT NULL COMMENT '生日',
`city` varchar(64) DEFAULT NULL COMMENT '所在城市',
`job` varchar(100) DEFAULT NULL COMMENT '职业',
`personalized_signature` varchar(200) DEFAULT NULL COMMENT '个性签名',
`source_type` int(1) DEFAULT NULL COMMENT '用户来源',
`integration` int(11) DEFAULT NULL COMMENT '积分',
`growth` int(11) DEFAULT NULL COMMENT '成长值',
`luckey_count` int(11) DEFAULT NULL COMMENT '剩余抽奖次数',
`history_integration` int(11) DEFAULT NULL COMMENT '历史积分数量',
`modify_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_username` (`username`),
UNIQUE KEY `idx_phone` (`phone`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='会员表';


  • 技术要点和开发步骤

  1. 对业务数据所在mysql库,开启binlog功能

  2. 在hbase中创建接收数据的目标表

注意,hbase的表结构和正统的二维关系表结构差异很大

二维关系表结构中是一个一个的字段,而hbase中只能定义rowkey和column family

因此,二维关系表==> hbase表,就有一个比较特别的结构映射

(以用户的账号或者手机号作为rowkey,其他字段都归入同一个family,在family中再以字段名和类型来定义)

  1. 开发flink程序,利用mysql-cdc-connector监听binlog数据,并做整理后,利用hbase-connector写入hbase中


业务库开启binlog

a) 修改/etc/my.cnf文件
server-id=1
log_bin=/var/lib/mysql/mysql-bin.log

b) 重启mysql服务
systemctl restart mysqld


hbase目标维表创建

流程图 (2).jpg

行键(rowkey)

我们在后续的事件流处理中,是根据用户的 username来查询该用户的注册信息

因此,我们很容易就能想到,在hbase中这份维表,应该以username作为表的行键

建表语句:

create 'dim_user_info','f'


hbase表的热点问题考虑

- 什么叫热点问题:
读热点:对hbase的查询操作,总是集中在某个或某几个少数的region上;
写热点:对hbase的写入操作,总是集中在某个或某几个少数的region上;

- 我们这个场景中是否有热点问题?
我们选择的rowkey是用户的登录名,它自身不具备顺序性
- 写入规律
登录名本身无规律顺序性,所以从mysql同步数据到hbase时,写入过程是无规律的
- 读取规律
此场景在查询数据时,是拿着行为日志中的 username 去查询hbase,因此查询过程也没有规律性

只不过,如果hbase的这个用户注册信息表只有一个region,则真的会产生热点问题;

所以,在建表时,需要提前做好预分区,而且分区的个数优先考虑将来查询时的并行能力;

带预分区的建表语句:
create 'dim_user_info','f',SPLITS=>['a','e','i','n','r','w']


  • flink cdc sql版代码开发

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class UserInfo2Hbase {

   public static void main(String[] args) {
       // 构造flink编程入口
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       // 构造table编程环境
       StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

       /**
        * 监听MySQL的binlog,得到 用户注册信息表数据,然后写入 hbase
        */

       // 建连接器表,映射mysql中的 用户注册信息表<ums_member>
       // 挑选一些关心的维度字段: id,username,phone,status,create_time,gender,province,city,job
       tenv.executeSql("CREATE TABLE ums_member_source (   " +
               "     id BIGINT,                                   " +
               "     username STRING,                             " +
               "     phone STRING,                               " +
               "     status int,                                 " +
               "     create_time timestamp(3),                   " +
               "     gender int,                                 " +
               "     birthday date,                               " +
               "     province STRING,                             " +
               "     city STRING,                                 " +
               "     job STRING ,                                 " +
               "     source_type INT ,                           " +
               "     PRIMARY KEY (id) NOT ENFORCED           " +
               "     ) WITH (                                 " +
               "     'connector' = 'mysql-cdc',               " +
               "     'hostname' = 'hadoop10'   ,               " +
               "     'port' = '3306'         ,               " +
               "     'username' = 'root'     ,               " +
               "     'password' = '123456'     ,               " +
               "     'database-name' = 'db1',         " +
               "     'table-name' = 'ums_member'             " +
               ")");


       // 建连接器表,映射 hbase中的 用户信息维表<dim_user_info>
       tenv.executeSql("CREATE TABLE ums_member_hbasesink( " +
               " username STRING, " +
               " f ROW<id BIGINT,phone STRING, status INT, create_time TIMESTAMP(3), gender INT, birthday DATE, province STRING, city STRING, job STRING, source_type INT>, " +
               " PRIMARY KEY (username) NOT ENFORCED " +
               ") WITH (                             " +
               " 'connector' = 'hbase-2.2',         " +
               " 'table-name' = 'dim_user_info',     " +
               " 'zookeeper.quorum' = 'hadoop10:2181' " +
               ")");

       // 写一个insert ... select from .. 的 sql
       tenv.executeSql("insert into ums_member_hbasesink select username,row(id,phone,status,create_time,gender,birthday,province,city,job,source_type) as f from ums_member_source");
  }
}


5 地理位置维表处理

  • 利用hbase存储geohash地域维表

  • 对日志数据流,用hbase-connector和lookup join去查询地域信息


5.1 geohash地域维表构建并入库hbase

虽然我们是在做实时数仓,但并不代表在此过程中所有事情都需要用flink来做

geohash地域维表的构建,它有一个特点:这个工作是一次性的,并不是一个需要持续不断的

使用技术手段:geohash编码 + spark


hbase热点问题

此场景中,rowkey是用geohash码
查询中的地理位置规律是随机的,不存在热点问题
写入仅一次,且数据量很小,不需要考虑热点问题

如果非要考虑热点问题:geohash码有一个特点,相近区域的地点的geohash码的前缀相同

如果要把这个规律打散,可以: 把geohash反转作为rowkey即可!

hbase建表 : 做了预分区

create 'dim_geo_area','f',SPLITS=>['a','g','o','w']


5.2 核心代码开发

1,用sparksql从 mysql中的 gps地域坐标参考字典表 中读取到数据

2,对读取到的原始数据进行自关联join加工,得到扁平化的行结构: gps座标,省,市,区

3,利用geohash算法工具包,将数据进一步整理成 : geohash码,省,市,区

4,利用spark的 rdd.saveAsNewAPIHadoopDataset()方法,将数据写入hbase

import ch.hsr.geohash.GeoHash
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

import java.util.Properties

object GeoHashDimTableBuilder {
 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder()
    .master("local[*]")
    .appName("geohash码地域维表构建任务")
    .config("spark.sql.shuffle.partitions", 2)
    .getOrCreate()

   val sc = spark.sparkContext

   // hbase 目标表名
   val tablename = "dim_geo_area"

   // hbase集群所连的zookeeper配置信息
   sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop10")
   // 为sparkContext设置outputformat为hbase的TableOutputFormat
   sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)

   // 封装mapreduce的Job配置信息
   val job = Job.getInstance(sc.hadoopConfiguration)
   job.setOutputKeyClass(classOf[ImmutableBytesWritable])
   job.setOutputValueClass(classOf[Result])
   job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

   // 加载mysql中的原始地理位置信息数据表
   val props = new Properties()
   props.setProperty("user","root")
   props.setProperty("password","123456")

   val df = spark.read.jdbc("jdbc:mysql://hadoop10:3306/db1", "t_md_areas", props)
   df.createTempView("t")

   val gps2GeoHashcode = (lat:Double, lng:Double)=> GeoHash.geoHashStringWithCharacterPrecision(lat,lng,5).reverse
   spark.udf.register("geo",gps2GeoHashcode)

   // 加工地理位置参考坐标点字典数据
   val resDf = spark.sql(
     """
       |select
       |   geohash,
       |   province,
       |   city,
       |   nvl(region,'') as region
       |from(
       |   select
       |      geohash,
       |      province,
       |      city,
       |      region,
       |      -- 利用row_number()over() 对 相同重复的数据 进行去重
       |      row_number() over(partition by geohash order by province) as rn
       |   from
       |   (
       |      -- 对原始地理位置表,进行自关联,将层级数据扁平化
       |      SELECT
       |        geo(lv4.BD09_LAT, lv4.BD09_LNG) as geohash,
       |        lv1.AREANAME as province,
       |        lv2.AREANAME as city,
       |        lv3.AREANAME as region
       |      from t lv4
       |        join t lv3 on lv4.`LEVEL`=4 and lv4.bd09_lat is not null and lv4.bd09_lng is not null and lv4.PARENTID = lv3.ID
       |        join t lv2 on lv3.PARENTID = lv2.ID
       |        join t lv1 on lv2.PARENTID = lv1.ID
       |   ) o1
       |)o2
       |
       |where rn=1 and geohash is not null and length(trim(geohash))=5
       |and province is not null and trim(province)!=''
       |and city is not null and trim(city)!=''
       |
       |""".stripMargin)

   // 将上面整理好的结果: geohash码,省,市,区
   // 转换成TableOutputFormat所需要的 (ImmutableBytesWritable,Put) 结构
  val resRdd = resDf.rdd.map(row => {
     val geoHash = row.getAs[String]("geohash")
     val province = row.getAs[String]("province")
     val city = row.getAs[String]("city")
     val region = row.getAs[String]("region")

     // 用geohash码做rowkey
     val put = new Put(Bytes.toBytes(geoHash))
     put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("province"), Bytes.toBytes(province))
     put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("city"), Bytes.toBytes(city))
     put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("region"), Bytes.toBytes(region))

    (new ImmutableBytesWritable(), put)
  })

   // 将数据插入hbase
   resRdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
   spark.close()
}
}


6 页面信息维表处理

image-20230718233744562.png


这种数据整理工作量太大

而公司的各种页面的url都有一定的规律,我们可以只选择整理有规律的前缀,作为维表即可!

image-20230718233817985.png


页面信息维表数据来源:由业务系统前端开发人员来维护;
页面信息维表数据的特点:
- 数据比较固定,通常不会被修改更新,但是可能有偶尔的增加
- 数据量很小(可能就100+行)
 
- 这个维表信息,如果利用hbase来存储并提供给后续的流程序来查询,有点浪费;
- 因此,完全可以没有必要放在hbase中进行lookup查询,而可以在流程序启动时直接加载该维表完整数据后以广播形式去关联行为事件流
- 当然,如果考虑到公共维度退维中,用户注册信息维表已经使用了lookup关联hbase表的方式,那么我们依然还是可以选择存入hbase,后续用lookup来查询(统一退维的技术手段,而且这种方式的效率也并不比广播低,而且数据也方便维护)  


页面维表构建并入库hbase

1)在hbase中创建表

create 'dim_page_info','f'

2)导数脚本开发

脚本文件:dim_page_info_loadhbase.sh

put 'dim_page_info' , '/mall/'            , 'f:pt', '商品详情页'
put 'dim_page_info' , '/mall/'           , 'f:sv', '商城服务'
put 'dim_page_info' , '/content/article/' , 'f:pt', '文章页'
put 'dim_page_info' , '/content/article/' , 'f:sv', '内容服务'
put 'dim_page_info' , '/mall/promotion/' , 'f:pt', '活动页'
put 'dim_page_info' , '/mall/promotion/' , 'f:sv', '商城服务'
put 'dim_page_info' , '/mall/search/'     , 'f:pt', '搜索结果页'
put 'dim_page_info' , '/mall/search/'     , 'f:sv', '搜索服务'

3) 导入数据

hbase shell /root/dim_page_info_loadhbase.sh 

4) 查询

 scan 'dim_page_info',{LIMIT=>10,FORMATTER => 'toString'}


总结

Flink 实时数仓在 ODS 到 DWD 层处理行为日志时,需对用户属性、页面信息、地域等公共维度做退维,避免后续重复关联,提升分析效率,核心技术如下:

采用 “MySQL CDC 同步 + HBase 存储 + Flink Lookup Join” 架构:业务库(如用户注册表)通过 MySQL CDC 监听 binlog,实时同步数据至 HBase 维表;行为日志流用 Flink Lookup Join 关联 HBase 维表,补充维度信息后写入 Kafka 的 DWD 层。

  1. 用户属性维表:以用户名作 HBase 行键,建表时预分区防热点;Flink CDC 同步 MySQL 用户表数据至 HBase,Lookup Join 时可设缓存平衡性能与数据时效性。
  2. 地域维表:用 Spark 结合 GeoHash 算法处理 GPS 数据,反转 GeoHash 码作 HBase 行键打散热点,一次性写入 HBase 供 Flink 查询。
  3. 页面信息维表:数据量小且固定,可广播关联或存入 HBase,通过脚本维护页面 URL 前缀与属性映射。

HBase 表需合理设计行键与预分区;MySQL 需开启 binlog;Flink Lookup Join 需权衡缓存策略,确保实时性与数据准确性。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。