大数据分析Day04使用DLI Flink SQL进行电商实时业务数据分析

举报
孙小北 发表于 2021/08/04 09:44:28 2021/08/04
【摘要】 大数据分析Day04使用DLI Flink SQL进行电商实时业务数据分析 场景描述多种方式接入:web方式访问、app的方式访问、微信小程序访问电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额...

大数据分析Day04使用DLI Flink SQL进行电商实时业务数据分析

场景描述

  • 多种方式接入:web方式访问、app的方式访问、微信小程序访问

  • 电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据

  • 假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。

  • 根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。

场景方案

image.png

步骤1:注册账号

  • 注册华为云账号并进行实名认证
  • 已完成华为云账号注册和实名认证,可跳过该步骤

步骤2:创建资源

创建资源时注意以下几点:

  • Kafka与MySQL实例创建时需指定VPC,该VPC需提前创建好,且网段不与后续创建的DLI队列网段冲突.

  • Kafka+MySQl : 192, DLI:172

  • 【按需】模式的购买产品

  • (1)创建VPC:【VPC】–【创建虚拟私有云】
    image.png

  • (2)购买队列:【DLI】–【队列管理】–【购买队列】
    image.png

步骤3:创建DMS topic并获取连接地址

  • 选择“分布式消息服务DMS”,单击进入DMS服务、控制台页面。
    image.png
    image.png
    image.png
  • 在“Kafka专享版”页面找到创建的Kafka实例。单击“基本信息”,获取“连接地址”
    image.png
10.0.0.202:9092,10.0.0.83:9092,10.0.0.138:9092
  • 单击“Topic管理”,创建一个Topic:trade_order_detail_info
    image.png

步骤4:创建RDS数据库表

  • 购买MySQL数据库
    image.png
    image.png
    image.png
  • 登录
    image.png
    image.png
  • 创建数据库
    image.png
  • 创建表
    image.png
DROP TABLE `dli-demo`.`trade_channel_collect`;
CREATE TABLE `dli-demo`.`trade_channel_collect` (
    `begin_time` VARCHAR(32) NOT NULL,
    `channel_code` VARCHAR(32) NOT NULL,
    `channel_name` VARCHAR(32) NULL,
    `cur_gmv` DOUBLE UNSIGNED NULL,
    `cur_order_user_count` BIGINT UNSIGNED NULL,
    `cur_order_count` BIGINT UNSIGNED NULL,
    `last_pay_time` VARCHAR(32) NULL,
    `flink_current_time` VARCHAR(32) NULL,
    PRIMARY KEY (`begin_time`, `channel_code`)
)   ENGINE = InnoDB
    DEFAULT CHARACTER SET = utf8mb4
    COLLATE = utf8mb4_general_ci
    COMMENT = '各渠道的销售总额实时统计';

步骤5:创建增强型跨源打通网络

  • 【DLI】,找到【数据湖探索】,【全局配置】-【服务授权】,选中【VPC Administrator】
    image.png

  • 【跨源连接】-【增强型跨源】-【创建】

  • 【绑定队列】选择您所创建的通用队列,【虚拟私有云】和【子网】选择 Kafka 与 MySQL 实例所在的 VPC 与子网
    image.png
    image.png

  • 测试队列与RDS、DMS实例连通性。点击【队列管理】,选择您所使用的队列,点击【更多】-【测试地址连通性】。输入前序步骤3-2获取的DMS Kafka实例连接地址和步骤4-2获取的RDS MySQL实例内网地址,进行网络连通性测试。测试结果显示可达,则DLI队列与Kafka、MySQL实例的网络已经联通。
    注:10没有调通,后重新改为192(MySQL和Kafka)和172(DLI)
    image.png

  • 配置安全组 3306 9092
    image.png
    image.png

  • 修改后可达
    image.png
    image.png

步骤6:创建并提交Flink作业

– 【DLI控制台】–【作业管理】,选择【Flink作业】–【创建作业】,选择作业类型为:Flink OpenSource SQL,名称自定义。
image.png

  • 注:Kafka连接地址,mysql连接地址、数据库名称,用户名和密码修改为自己前面的地址
--********************************************************************--
-- 数据源:trade_order_detail_info (订单详情宽表)
--********************************************************************--
create table trade_order_detail (
  order_id string,      -- 订单ID
  order_channel string,   -- 渠道
  order_time string,     -- 订单创建时间
  pay_amount double,     -- 订单金额
  real_pay double,      -- 实际付费金额
  pay_time string,      -- 付费时间
  user_id string,      -- 用户ID
  user_name string,     -- 用户名
  area_id string       -- 地区ID
) with (
  "connector.type" = "kafka",
  "connector.version" = "0.10",
  "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
  "connector.properties.group.id" = "trade_order",   -- Kafka groupID
  "connector.topic" = "trade_order_detail_info",     -- Kafka topic
  "format.type" = "json",
  "connector.startup-mode" = "latest-offset"
);
 
--********************************************************************--
-- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
--********************************************************************--
create table trade_channel_collect(
  begin_time string,       --统计数据的开始时间
  channel_code string,      -- 渠道编号
  channel_name string,      -- 渠道名
  cur_gmv double,         -- 当天GMV
  cur_order_user_count bigint, -- 当天付款人数
  cur_order_count bigint,    -- 当天付款订单数
  last_pay_time string,     -- 最近结算时间
  flink_current_time string,
  primary key (begin_time, channel_code) not enforced
) with (
  "connector.type" = "jdbc",
  "connector.url" = "jdbc:mysql://xxxx:3306/xxxx",    -- mysql连接地址,jdbc格式
  "connector.table" = "xxxx",            -- mysql表名
  "connector.driver" = "com.mysql.jdbc.Driver",
  "connector.username" = "xxx",                    -- mysql用户名
  "connector.password" = "xxxx",                   -- mysql密码
  "connector.write.flush.max-rows" = "1000",
  "connector.write.flush.interval" = "1s"
);
 
--********************************************************************--
-- 临时中间表
--********************************************************************--
create view tmp_order_detail
as
select *
    , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
           else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
    , case when t.order_channel = "webShop" then _UTF16"网页商城"
           when t.order_channel = "appShop" then _UTF16"app商城"
           when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
           else _UTF16"其他" end as channel_name --渠道名称
from (
    select *
        , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
    from trade_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
    and real_pay is not null
) t
where t.rn = 1;
 
-- 按渠道统计各个指标
insert into trade_channel_collect
select
      begin_time  --统计数据的开始时间
    , channel_code
    , channel_name
    , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
    , count(distinct user_id) as cur_order_user_count --当天付款人数
    , count(1) as cur_order_count --当天付款订单数
    , max(pay_time) as last_pay_time --最近结算时间
    , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name;

首先,我们先定义一个Kafka源表,用来从Kafka指定topic中读取消费数据;再定义一个结果表,用来通过JDBC向MySQL中写入结果数据。

创建源表和结果表以后,需要实现相应的处理逻辑,以实现各个指标的统计。

为了简化最终的处理逻辑,使用创建视图进行数据预处理。首先利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string),请注意)。然后根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。

最后,我们根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。