DLI中Flink使用场景样例介绍(一)

举报
潇湘暮雨 发表于 2021/05/07 16:37:52 2021/05/07
【摘要】        DLI是数据湖探索(Data Lake Insight)的简称,它是完全兼容Apache Spark、Apache Flink、openLooKeng(基于Apache Presto)生态,提供一站式的流处理、批处理、交互式分析的Serverless融合处理分析服务。用户不需要管理任何服务器,即开即用。支持标准SQL/Spark SQL/Flink SQL,支持多种接入方式,并...
       DLI是数据湖探索(Data Lake Insight)的简称,它是完全兼容Apache Spark、Apache Flink、openLooKeng(基于Apache Presto)生态,提供一站式的流处理、批处理、交互式分析的Serverless融合处理分析服务。用户不需要管理任何服务器,即开即用。支持标准SQL/Spark SQL/Flink SQL,支持多种接入方式,并兼容主流数据格式。数据无需复杂的抽取、转换、加载,使用SQL或程序就可以对云上CloudTable、RDS、DWS、CSS、OBS、ECS自建数据库以及线下数据库的异构数据进行探索。
       Flink SQL是DLI支持的作业类型之一,而当前在DLI中Flink SQL作业主要分为两种,即DLI团队自研的Flink SQL语法(DLI称之为Flink SQL类型)和完全基于开源社区的开源Flink SQL语法(DLI称之为Flink OpenSource SQL)。该文章主要对DLI中Flink OpenSource SQL的使用场景样例进行介绍。
       当前线上购物无疑是最为火热的购物方式,而购物平台则又可以以多种方式接入,比如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。而购物平台则需要每天统计每个平台的实时访问数据量、订单数、访问人数等等指标,从而能够在显示大屏等展示指标实时变化,更能及时且简捷的了解这些变化,有针对性的指定相应的策略。而每天如何实现统计这些指标呢?我们假设平台已经将每个商品的点击量实时的写入kafka中,而这些信息包括订单的id、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户id、用户姓名、订单地区id等信息。而我们需要做的就是根据当前的这些信息,实时统计每种渠道的指标,并实时输出到数据库中。那我们如何实现这些统计功能呢?而这就需要使用DLI中Flink OpenSource SQL。
       首先,我们需要在DLI中创建Flink OpenSource SQL类型的作业,而这个过程就不再赘述。
       其次,我们需要编写sql代码,以便能够按照一定的逻辑实时处理数据。
  • 因为数据是存放在kafka中,因此我们需要从kafka中读取数据,因此我们需要按照Flink OpenSource SQL的书写方式创建数据源表,源表所使用的connector也应该为kafka,同时源表的每个字段又需要与kafka中的字段相对应,因此就不一一介绍。
--创建源表
create table trade_order_detail (
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
) with (
    "connector.type" = "kafka",
    "connector.version" = "0.10",
    "connector.properties.bootstrap.servers" = "x.x.x.x:9092",
    "connector.properties.group.id" = "trade_order_detail",
    "connector.topic" = "test",
    "format.type" = "json",
    "connector.startup-mode" = "latest-offset"
);
  • 有了源表,我们也需要创建一个结果表,用于存放计算结果。而这里由于我们需要将结果存放到数据库中,因此我们结果表中所使用的connector类型为jdbc。表中每个字段的意义分别为开始统计指标的时间、下单渠道的代码、下单的渠道名称、营业额、当前下单用户数量、当前订单的数量、最近结算时间、flink任务中的时间,并将begin_time和channel_code作为该表的主键。
--创建结果表
create table trade_channel_collect(
    begin_time string,
    channel_code string,
    channel_name string,
    cur_gmv double,
    cur_order_user_count bigint,
    cur_order_count bigint,
    last_pay_time string,
    flink_current_time string,
    primary key (begin_time, channel_code) not enforced
) with (
    "connector.type" = "jdbc",
    "connector.url" = "jdbc:mysql://x.x.x.x:3306/job",
    "connector.table" = "numcollect",
    "connector.driver" = "com.mysql.jdbc.Driver",
    "connector.username" = "xxxx",
    "connector.password" = "xxxxxx",
    "connector.write.flush.max-rows" = "1000",
    "connector.write.flush.interval" = "1s"
);
  • 创建好了源表和结果表,我们需要实现相应的处理逻辑,以实现各个指标的统计。为了简化最终的处理逻辑,我们首先创建视图处理。首先,我们利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计。然后,根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
create view tmp_order_detail
as
select *
    , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
           else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
    , case when t.order_channel = "webShop" then _UTF16"网页商城"
           when t.order_channel = "appShop" then _UTF16"app商城"
           when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
           else _UTF16"其他" end as channel_name --渠道名称
from (
    select *
        , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
    from trade_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
    and real_pay is not null
) t
where t.rn = 1;
  • 创建好了源表和结果表,并进行了简单的逻辑处理和筛选,我们需要对相应指标进行统计和筛选,并将其结果插入到结果表中。主要的计算逻辑就是统计所有实际支付金额的和、根据user_id统计付款的人数和当天所有的付款订单数、最近结算时间,其实际内容见下面代码的注释中即可。
-- 按渠道统计各个指标
insert into trade_channel_collect
select
      begin_time  --统计数据的开始时间
    , channel_code
    , channel_name
    , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
    , count(distinct user_id) as cur_order_user_count --当天付款人数
    , count(1) as cur_order_count --当天付款订单数
    , max(pay_time) as last_pay_time --最近结算时间
	, cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name
      最后,所有的sql代码已经编写完成,我们选择相应的队列,并运行即可。
      假设我们使用如下的构造数据插入kafka中,则其最终的数据库中的结果应该如最下面数据所示。其中输出的flink_current_time的时间由于为运行时的实时时间,因此改时间与最终结果会不同,请按实际运行结果为准。
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
2021-03-25 00:00:00,appShop,app商城,50.5,1,1,2021-03-25 06:07:00,2021-03-26 09:50:01.37
2021-03-25 00:00:00,miniAppShop,小程序商城,148.8,2,2,2021-03-25 12:03:00,2021-03-26 09:50:25.738
2021-03-25 00:00:00,other,其他,400.0,1,1,2021-03-25 15:10:00,2021-03-26 09:49:15.208
2021-03-25 00:00:00,webShop,网页商城,986.5,2,5,2021-03-25 16:16:16,2021-03-26 09:50:14.527
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。