华为云大数据分析7天训练营(四)

举报
花溪 发表于 2021/08/03 23:52:00 2021/08/03
【摘要】 根据《使用DLI Flink SQL进行电商实时业务数据分析》操作指导内容完成项目实践1)作业进行到任何步骤都可打卡,视个人情况完成即可,不强制要求;2)操作类作业若未领取到免费资源或未领取到代金券请勿操作,创建服务作业或实例的区域需与领取套餐区域保持一致;免费资源领取:https://developer.huaweicloud.com/activity/7days_bigdata.html...

根据《使用DLI Flink SQL进行电商实时业务数据分析》操作指导内容完成项目实践

1)作业进行到任何步骤都可打卡,视个人情况完成即可,不强制要求;

2)操作类作业若未领取到免费资源或未领取到代金券请勿操作,创建服务作业或实例的区域需与领取套餐区域保持一致;

免费资源领取:https://developer.huaweicloud.com/activity/7days_bigdata.html

代金券领取:在规定时间,完成前两天打卡后,会在第三天收到课程小助手发放的代金券。(代金券数量有限,先完成先获取)

3)本实验前需使用Kafka客户端写入数据,参考https://support.huaweicloud.com/qs-kafka/kafka-qs-0409006.html

1. 本地安装java、kafka client;

2. DMS挂EIP;

3. 命令行发送数据。

4)建议操作时间1-2小时,结束后立即删除有关服务实例、作业、资源,避免扣费!!!

例如:

a. 停止作业,删除DLI 跨源连接,删除队列;

b. 删除dms、rds资源;

c. 删除kafka资源;

d. 删除VPC相关资源等。

使用DLI Flink进行电商实时业务数据分析的操作过程主要包括7个步骤:

步骤1:注册账号。使用DLI对数据进行分析之前,需要注册华为云账号并进行实名认证。

注册华为云账号并进行实名认证。 如果您已完成华为云账号注册和实名认证,可跳过该步骤

步骤2:创建资源。在您的账户下创建作业需要的相关资源,涉及VPCDMSDLIRDS

1.创建VPC
创建vpc
2.创建DMS Kafka实例
3.创建RDS MySQL实例
4.创建DLI CCE队列
5.创建DLV大屏
说明
创建资源时请注意以下几点:
1. KafkaMySQL实例创建时需指定VPC,该VPC需提前创建好,且网段不与后续创建的DLI列网段冲突
2. DLI Flink Opensource语法目前仅支持容器化队列(目前仍在封闭测试阶段),因此创建队列前需在官网提工单申请开通CCE队列使用权限后,再创建DLI队列。
3. 请创建DLI队列时请创建“包年包月”或者“按需-专属资源”模式的通用队列。

步骤3:获取DMS连接地址并创建Topic。获取DMS Kafka实例连接地址并创建DMSTopic

步骤4:创建RDS数据库表。获取RDS实例内网地址,登录RDS实例创建RDS数据库及MySQL表。

CREATE TABLE `dli-demo`.`trade_channel_collect` (
    `begin_time` VARCHAR(32) NOT NULL,
    `channel_code` VARCHAR(32) NOT NULL,
    `channel_name` VARCHAR(32) NULL,
    `cur_gmv` DOUBLE UNSIGNED NULL,
    `cur_order_user_count` BIGINT UNSIGNED NULL,
    `cur_order_count` BIGINT UNSIGNED NULL,
    `last_pay_time` VARCHAR(32) NULL,
    `flink_current_time` VARCHAR(32) NULL,
    PRIMARY KEY (`begin_time`, `channel_code`)
)   ENGINE = InnoDB
    DEFAULT CHARACTER SET = utf8mb4
    COLLATE = utf8mb4_general_ci
    COMMENT = '各渠道的销售总额实时统计';


步骤5:创建DLI增强型跨源。创建DLI增强型跨源,并测试队列与RDSDMS实例连通性。

点击【全局配置】-【服务授权】,选中【VPC Administrator】,点击更新委托权限。此举目的是赋予DLI操作用户VPC资源的权限,用于创建VPC【对等连接】。

网段不与前面创建的DLI队列网段冲突

步骤6:创建并提交Flink作业。创建DLI Flink OpenSource SQL作业并运行。

--********************************************************************--
-- 数据源:trade_order_detail_info (订单详情宽表)
--********************************************************************--
create table trade_order_detail (
  order_id string,      -- 订单ID
  order_channel string,   -- 渠道
  order_time string,     -- 订单创建时间
  pay_amount double,     -- 订单金额
  real_pay double,      -- 实际付费金额
  pay_time string,      -- 付费时间
  user_id string,      -- 用户ID
  user_name string,     -- 用户名
  area_id string       -- 地区ID
) with (
  "connector.type" = "kafka",
  "connector.version" = "0.10",
  "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
  "connector.properties.group.id" = "trade_order",   -- Kafka groupID
  "connector.topic" = "trade_order_detail_info",     -- Kafka topic
  "format.type" = "json",
  "connector.startup-mode" = "latest-offset"
);
 
--********************************************************************--
-- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
--********************************************************************--
create table trade_channel_collect(
  begin_time string,       --统计数据的开始时间
  channel_code string,      -- 渠道编号
  channel_name string,      -- 渠道名
  cur_gmv double,         -- 当天GMV
  cur_order_user_count bigint, -- 当天付款人数
  cur_order_count bigint,    -- 当天付款订单数
  last_pay_time string,     -- 最近结算时间
  flink_current_time string,
  primary key (begin_time, channel_code) not enforced
) with (
  "connector.type" = "jdbc",
  "connector.url" = "jdbc:mysql://xxxx:3306/xxxx",    -- mysql连接地址,jdbc格式
  "connector.table" = "xxxx",            -- mysql表名
  "connector.driver" = "com.mysql.jdbc.Driver",
  "connector.username" = "xxx",                    -- mysql用户名
  "connector.password" = "xxxx",                   -- mysql密码
  "connector.write.flush.max-rows" = "1000",
  "connector.write.flush.interval" = "1s"
);
 
--********************************************************************--
-- 临时中间表
--********************************************************************--
create view tmp_order_detail
as
select *
    , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
           else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
    , case when t.order_channel = "webShop" then _UTF16"网页商城"
           when t.order_channel = "appShop" then _UTF16"app商城"
           when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
           else _UTF16"其他" end as channel_name --渠道名称
from (
    select *
        , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
    from trade_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
    and real_pay is not null
) t
where t.rn = 1;
 
-- 按渠道统计各个指标
insert into trade_channel_collect
select
      begin_time  --统计数据的开始时间
    , channel_code
    , channel_name
    , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
    , count(distinct user_id) as cur_order_user_count --当天付款人数
    , count(1) as cur_order_count --当天付款订单数
    , max(pay_time) as last_pay_time --最近结算时间
    , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name;

注意  修改成自己的

 "connector.properties.group.id" = "da68afc6-b67b-4c47-a470-cd01cdc287e0",   -- Kafka groupID  (改成自己的实例ID)
  "connector.topic" = "topic-2007310628",     -- Kafka topic

安装jdk

下载:版本是jdk-8u221-linux-x64.tar.gz
链接:https://pan.baidu.com/s/1GZnUwB3nraObgperKlmBCg
提取码:17xx

安装

tar -zxvf jdk-8u221-linux-x64.tar.gz 

配置环境变量

vim /etc/profile

添加如下内容

export JAVA_HOME=/root/jdk/jdk1.8.0_221
export PATH=:$JAVA_HOME/bin:$PATH

置完后,执行 source /etc/profile生效

java -version

安装Kafka客户端

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxf kafka_2.11-1.1.0.tgz

~/kafka_2.11-1.1.0/bin# sh kafka-console-producer.sh --broker-list 172.16.0.171:9092,172.16.0.184:9092,172.16.0.89:9092 --topic  topic-2007310628

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}


步骤7:查询结果。查询Flink作业结果,使用DLV进行大屏展示。

由于数据结果一直过不来 这里只说明做法

配置DLV大屏,执行SQL查询RDS MySQL,即可以实现大屏实时展示。具体配置方法可参考:DLV开发大屏

配置数据

配置DLV大屏,执行SQL查询RDS MySQL,即可以实现大屏实时展示。具体配置方法可参考:DLV开发大屏

7-2.png

一直查不到数据 后续再解决

-未完待续-

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。