Flink实例:电商用户行为实时分析

举报
TiAmoZhang 发表于 2023/05/27 08:56:56 2023/05/27
【摘要】 合运用 Flink 的各种 API,基于 EventTime 实现分析电商用户行为。

【示例1】综合运用 Flink 的各种 API,基于 EventTime 实现分析电商用户行为。

电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。

电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、单击和浏览页面、页面停留时间及页面跳转等,可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从 web 服务器日志中直接读取,而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,一般会在业务系统中相应的位置埋点,然后收集日志进行分析。业务行为数据又可以简单分为两类:一类是能够明显地表现出用户兴趣的行为,例如对商品的收藏、喜欢、评分和评价,可以从中对数据进行深入分析,得到用户画像,进而对用户给出个性化的推荐商品列表,这个过程往往会用到机器学习相关的算法;另一类则是常规的业务操作,但需要着重关注一些异常状况以做好风控,例如登录和订单支付。

本项目限于数据,只实现实时热门商品统计。 

01、数据源说明

本案例使用阿里天池的一份淘宝用户行为数据集,格式为 csv 文件。本数据集包含了2017年11月25日至2017年12月3日之间,约一百万随机用户的所有行为(行为包括单击、购买、加购、喜欢)。数据集的每行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。

注意/

从天池上下载的数据集UserBehavior.csv解压缩后为3.41G,这里截取其中一部分(485730行)用于开发测试,命名为UserBehavior_part.csv。

关于数据集中每列的详细描述,见表1。

■ 表1淘宝用户行为数据集说明

640.png

其中用户行为类型共有4种,它们分别是:

(1) pv:商品详情页pv,等价于单击。

(2) buy:商品购买。

(3) cart:将商品加入购物车。

(4) fav:收藏商品。

部分用户行为数据示例如下:

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
...

02、预备知识

Flink SQL提供了如下一些与日期处理有关的函数。

1)TO_TIMESTAMP

将BIGINT类型的日期或者VARCHAR类型的日期转换成TIMESTAMP类型,其语法如下:

TIMESTAMP TO_TIMESTAMP(BIGINT time) //time:毫秒
TIMESTAMP TO_TIMESTAMP(VARCHAR date) //date:yyyy-MM-dd HH:mm:ss
TIMESTAMP TO_TIMESTAMP(VARCHAR date, VARCHAR format)

2)FROM_UNIXTIME

返回值为VARCHAR类型的日期值,默认日期格式:yyyy-MM-dd HH:mm:ss,若指定日期格式则按指定格式输出。如果任一输入参数是NULL,返回NULL,其语法如下:

VARCHAR FROM_UNIXTIME(BIGINT UNIXtime[, VARCHAR format])

说明/


(1) 参数UNIXtime为长整型,是以秒为单位的时间戳。

(2) 参数format可选,为日期格式,默认格式为yyyy-MM-dd HH:mm:ss,表示返回VARCHAR类型的符合指定格式的日期,如果有参数为null或解析错误,则返回null。

03、任务实现

(1) 将用户行为数据采集到Kafka。

(2) 使用Table API读取Kafka并写入MySQL。

(3)用Grafana实时可视化显示。

首先,项目添加依赖。因为要读取的Kafka的用户行为事件是CSV格式的,所以在项目的pom.xml文件中添加如下的依赖:

<!-- 需要添加flink-csv依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.13.2</version>
</dependency>

创建Kafka源表的SQL语句如下:

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime AS PROCTIME(), -- 使用计算列生成处理时间属性
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- 定义ts列上的水印, 将ts标记为事件时间属性
) WITH (
    'connector' = 'kafka', -- 使用kafka连接器
    'topic' = 'user_behavior', -- kafka topic
    'scan.startup.mode' = 'earliest-offset', -- 从头开始读取
    'properties.Bootstrap.servers' = 'kafka:9094', -- kafka broker 地址
    'format' = 'json'        -- 数据格式
);

在上面的SQL语句中,按照数据的格式声明了5个字段,除此之外,还通过计算列语法和 PROCTIME() 内置函数声明了一个产生处理时间的虚拟列。另外通过 WATERMARK 语法,在ts字段上声明了watermark策略(容忍5s乱序),ts字段因此也成了事件时间列。

接下来,编写流处理代码。

Scala代码实现:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object UserBehaviorDemo {

  def main(args: Array[String]) {
//设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//创建表环境
    val tEnv = StreamTableEnvironment.create(env)

//启用检查点//启用检查点,并设置并行度为1
    env.enableCheckpointing(5000).setParallelism(1)

//创建Kafka源表
    tEnv.executeSql(
      """
        |CREATE TABLE user_behavior (
        | user_id bigint,
        | item_id bigint,
        | category_id bigint,
        | behavior string,
        | behavior_time bigint,
        | ts AS TO_TIMESTAMP(FROM_UNIXTIME(behavior_time, 'yyyy-MM-dd HH:mm:ss')),
        | proctime AS PROCTIME(),
        | WATERMARK FOR ts AS ts - INTERVAL '5' SECONDS
        |)
        |with(
        | 'connector'='kafka',
        | 'topic'='user_behavior',
        | 'properties.Bootstrap.servers'='localhost:9092',
        | 'properties.group.id'='testGroup',
        | 'scan.startup.mode'='latest-offset',
        | 'format'='csv',
        | 'csv.ignore-parse-errors' = 'true',
        | 'csv.field-delimiter'=','
        |)
      """.stripMargin)

//创建mysql sink表
    tEnv.executeSql(
      """
        |CREATE TABLE buy_cnt_per_hour (
        | hour_of_day TIMESTAMP(3),
        | buy_cnt BIGINT
        |)
        |with(
        | 'connector'='JDBC',
        | 'url'='JDBC:mysql://localhost:3306/xueai8?useSSL=false',
        | 'table-name'='buy_cnt_per_hour',
        | 'driver'='com.mysql.JDBC.Driver',
        | 'username'='root',
        | 'password'='admin'
        |)
      """.stripMargin)

    tEnv
//读取源表
      .from("user_behavior")
//定义大小为5s,滑动为2s 的滑动窗口
      .window(Tumble over 1.minute on $"ts" as $"w")
//分组
      .groupBy($"w")
//聚合
      .select($"w".start.as("hour_of_day"),$"item_id".count.as("buy_cnt"))
//.execute.print()
//写入sink表
      .executeInsert("buy_cnt_per_hour")
  }
}

Java代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.util.TimeUtils;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

public class UserBehaviorDemo {

   public static void main(String[] args) throws Exception {
//设置流执行环境
      final StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

//创建表环境
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

//启用检查点,并设置并行度为1
      env.enableCheckpointing(1000).setParallelism(1);

//创建Kafka源表
      tEnv.executeSql(
         "CREATE TABLE user_behavior (" +
         " user_id bigint," +
         " item_id bigint," +
         " category_id bigint," +
         " behavior string," +
         " behavior_time bigint," +
         " ts AS TO_TIMESTAMP(FROM_UNIXTIME(behavior_time, 'yyyy-MM-dd HH:mm:ss'))," +
         " proctime AS PROCTIME()," +
         " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
         ")" +
         "WITH (" +
         " 'connector'='kafka'," +
         " 'topic'='user_behavior'," +
         " 'properties.Bootstrap.servers'='192.168.190.133:9092'," +
         " 'properties.group.id'='testGroup'," +
         " 'scan.startup.mode'='latest-offset'," +
         " 'format'='csv',"  +
         " 'csv.ignore-parse-errors' = 'true'," +
         " 'csv.field-delimiter'=','" +
         ")"
      );

//创建mysql sink表
      tEnv.executeSql("CREATE TABLE buy_cnt_per_hour (" +
         " hour_of_day TIMESTAMP(3)," +
         " buy_cnt BIGINT" +
         ") WITH (" +
         " 'connector' = 'JDBC'," +
         " 'url' = 'JDBC:mysql://localhost:3306/xueai8?useSSL=false'," +
         " 'table-name' = 'buy_cnt_per_hour'," +
         " 'driver' = 'com.mysql.JDBC.Driver'," +
         " 'username' = 'root'," +
         " 'password' = 'admin'" +
         ")"
      );

//统计每分钟的成交量
      /*
统计每小时的成交量就是每小时共有多少 "buy" 的用户行为。
因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。
,然后每个窗口分别统计 "buy" 的个数,这可以通过先过滤出 "buy" 的数据,然后 COUNT(*) 实现。
        SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
        FROM user_behavior
        WHERE behavior = 'buy'
        GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
       */
      /* 在MySQL数据库中创建相应的结果表:
        CREATE TABLE xueai8.buy_cnt_per_hour (
            hour_of_day timestamp,
            buy_cnt bigint
        )
       */

      tEnv
//读取源表
         .from("user_behavior")
//过滤出购买行为
         .filter($("behavior").isEqual("buy"))
//定义滚动窗口
         .window(Tumble.over(lit(1).minute()).on($("ts")).as("w"))
//分组
         .groupBy($("w"))
//聚合
         .select($("w").start().as("hour_of_day"),$("item_id").count().as("buy_cnt"))
//使用.extract(TimeIntervalUnit.MINUTE)抽取日期中的指定部分
         .select($("w").start().extract(TimeIntervalUnit.MINUTE).as("hour_of_day"),$("item_id").count().as("buy_cnt"))
         .execute().collect().forEachRemaining(System.out::println);
//写入sink表
         .executeInsert("buy_cnt_per_hour");
   }

}

04、执行过程

请按以下步骤执行程序。

(1) 启动Kafka。

首先打开一个终端,运行ZooKeeper,命令如下:

$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties


另打开一个终端,运行Kafka服务器,命令如下:

$ ./bin/kafka-server-start.sh ./config/server.properties


再打开一个终端,创建名为user_behavior的Kafka主题,命令如下:

$./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_behavior


查看已经存在的主题,命令如下:

$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181


(2) 在MySQL上创建接收表,SQL语句如下:

CREATE TABLE buy_cnt_per_hour (
  hour_of_day timestamp,
  buy_cnt bigint
);


(3) 运行可视化终端Grafana。

Grafana是一款用Go语言开发的开源数据可视化工具,可以做数据监控和数据统计,带有告警功能。

首先安装Grafana,安装步骤如下:

① 下载并安装包,然后解压缩即可。

② 启动。在命令行启动Grafana服务器,命令如下:

E:\BigData\Grafana\grafana-7.5.0\bin>grafana-server.exe


③ 然后打开浏览器,访问地址http://localhost:3000/。

④ 在Grafana中要先配置好数据源,指向MySQL中的接收表,如图1所示。

640.png

图1 在Grafana中要先配置好数据源

⑤ 在Grafana中创建dashboard,查询获得数据,使用的SQL语句如下:

SELECT
UNIX_TIMESTAMP(hour_of_day) as time_sec,
     buy_cnt as value
FROM xueai8.buy_cnt_per_hour


为了更好地看到动态变化的效果,设置一个仪表板显示的时间范围,如图2所示。

640.png

■ 图2 在Grafana中设置一个仪表板显示的时间范围

(4) 运行Flink流程序。

(5) 执行数据生产者脚本streamuserbehavior.sh。它调用Kafka自带的生产者脚本,以每秒10条的速度将数据发送给Kafka的user_behavior主题。编辑生产数据的脚本文件streamuserbehavior.sh,代码如下:

#!/bin/bash
BROKER=$1
if [ -z "$1" ]; then
        BROKER="localhost:9092"
fi

cat UserBehavior_part.csv | while read line; do
        echo $line
        sleep 0.1
done | ~/bigdata/kafka_2.11-2.4.1/bin/kafka-console-producer.sh --broker-list $BROKER --topic user_behavior

执行该脚本,使用的命令如下

$ ./streamuserbehavior.sh


(6) 观察Grafana中数据实时呈现效果,如图3所示。

640.png

■ 图3  用户购买量实时统计结果显示

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。