【Python使用】嘿马推荐系统全知识和项目开发教程第4篇:1、Spark SQL 概述,spark 入门【附代码文档】
教程总体简介:2、DataFrame、spark ml的模型训练是基于内存的、如果数据过大、内存空间小、迭代次数过多的化、可能会造成内存溢出、报错、设置Checkpoint的话、会把所有数据落盘、这样如果异常退出、下次重启后、可以接着上次的训练节点继续运行、但该方法其实指标不治本、因为无法防止内存溢出、所以还是会报错、如果数据量大、应考虑的是增加内存、或限制迭代次数和训练数据量级等、从hdfs加载CSV文件、返回一个PythonRDD类型、此时还没开始计算、用户对商品类别的打分数据、map返回的结果是rdd类型、需要调用toDF方法转换为Dataframe、注意、toDF不是每个rdd都有的方法、仅局限于此处的rdd、可通过该方法获得 user-cate-matrix、但由于cateId字段过多、这里运算量比很大、机器内存要求很高才能执行、否则无法完成任务、请谨慎使用、但好在我们训练ALS模型时、不需要转换为user-cate-matrix、所以这里可以不用运行、cate_rating_df.groupBy("userId").povit("cateId").min("rating")、用户对类别的偏好打分数据、使用pyspark中的ALS矩阵分解方法实现CF评分预测、文档地址、利用打分数据、训练ALS模型、此处训练时间较长、model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品、由于是给所有用户、从HDFS加载用户基本信息数据、发现pvalue_level和new_user_class_level存在空值、(注意此处的null表示空值、而如果是NULL、则往往表示是一个字符串)、因此直接利用schema就可以加载进该数据、无需替换null值、这里的null会直接被pyspark识别为None数据、也就是na数据、所以这里可以直接利用schem
全套教程部分目录:
1、Spark SQL 概述
Spark SQL概念
-
Spark SQL is Apache Spark's module for working with structured data.
-
它是spark中用于处理结构化数据的一个模块
Spark SQL历史
- Hive是目前大数据领域,事实上的数据仓库标准。
- Shark:shark底层使用spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。
- 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
- 2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发
Spark SQL优势
- Write Less Code
- Performance
python操作RDD,转换为可执行代码,运行在java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间 通信很耗费性能。
DataFrame
- 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
-
dataFrame引入schema和off-heap(使用操作系统层面上的内存)
-
1、解决了RDD的缺点
- 序列化和反序列化开销大
- 频繁的创建和销毁对象造成大量的GC
- 2、丢失了RDD的优点
- RDD编译时进行类型检查
- RDD具有面向对象编程的特性
用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划
- CatalystOptimizer:Catalyst优化器
- ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
- Code gen:代码生成器
直接编写RDD也可以自实现优化代码,但是远不及SparkSQL前面的优化操作后转换的RDD效率高,快1倍左右
优化引擎:类似mysql等关系型数据库基于成本的优化器
首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成为RDD
- Language-independent API
用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD
- Schema
结构化数据,可以直接看出数据的详情
在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化。
为什么要学习sparksql
sparksql特性
- 1、易整合
- 2、统一的数据源访问
- 3、兼容hive
- 4、提供了标准的数据库连接(jdbc/odbc)
spark 入门
目标:
- 了解spark概念
- 知道spark的特点(与hadoop对比)
- 独立实现spark local模式的启动
1.1 spark概述
-
1、什么是spark
-
基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的计算,并没有涉及到数据的存储。
-
2、为什么要学习spark
MapReduce框架局限性
- 1,Map结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
- 2,任务调度和启动开销大
- 3,无法充分利用内存
- 4,不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
- 5,不适合流式处理(点击日志分析)
- 6,MapReduce编程不够灵活,仅支持Map和Reduce两种操作
Hadoop生态圈
- 批处理:MapReduce、Hive、Pig
- 流式计算:Storm
- 交互式计算:Impala、presto
需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
- 内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
- DAG引擎,较少多次计算之间中间结果写到HDFS的开销
- 使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
spark的缺点是:吃内存,不太稳定
-
3、spark特点
-
1、速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)
- spark中的job中间结果可以不落地,可以存放在内存中。
- mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。
-
2、易用性(可以通过java/scala/python/R开发spark应用程序)
- 3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)
- 4、兼容性(spark程序可以运行在standalone/yarn/mesos)
1.2 spark启动(local模式)和WordCount(演示)
-
启动pyspark
-
在$SPARK_HOME/sbin目录下执行
- ./pyspark
-
-
```python sc = spark.sparkContext words = sc.textFile('file:///home/hadoop/tmp/word.txt') \ .flatMap(lambda line: line.split(" ")) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b).collect()
* 输出结果:
```shell
[('python', 2), ('hadoop', 1), ('bc', 1), ('foo', 4), ('test', 2), ('bar', 2), ('quux', 2), ('abc', 2), ('ab', 1), ('you', 1), ('ac', 1), ('bec', 1), ('by', 1), ('see', 1), ('labs', 2), ('me', 1), ('welcome', 1)]
一 个性化电商广告推荐系统介绍
1.1 数据集介绍
- Ali_Display_Ad_Click是阿里巴巴提供的一个淘宝展示广告点击率预估数据集
数据集来源:天池竞赛
- 原始样本骨架raw_sample
淘宝网站中随机抽样了114万用户8天内的广告展示/点击日志(2600万条记录),构成原始的样本骨架。 字段说明如下:
- user_id:脱敏过的用户ID;
- adgroup_id:脱敏过的广告单元ID;
- time_stamp:时间戳;
- pid:资源位;
- noclk:为1代表没有点击;为0代表点击;
- clk:为0代表没有点击;为1代表点击;
用前面7天的做训练样本(20170506-20170512),用第8天的做测试样本(20170513)
- 广告基本信息表ad_feature
本数据集涵盖了raw_sample中全部广告的基本信息(约80万条目)。字段说明如下:
- adgroup_id:脱敏过的广告ID;
- cate_id:脱敏过的商品类目ID;
- campaign_id:脱敏过的广告计划ID;
- customer_id: 脱敏过的广告主ID;
- brand_id:脱敏过的品牌ID;
- price: 宝贝的价格
其中一个广告ID对应一个商品(宝贝),一个宝贝属于一个类目,一个宝贝属于一个品牌。
- 用户基本信息表user_profile
本数据集涵盖了raw_sample中全部用户的基本信息(约100多万用户)。字段说明如下:
- userid:脱敏过的用户ID;
- cms_segid:微群ID;
- cms_group_id:cms_group_id;
- final_gender_code:性别 1:男,2:女;
- age_level:年龄层次; 1234
- pvalue_level:消费档次,1:低档,2:中档,3:高档;
- shopping_level:购物深度,1:浅层用户,2:中度用户,3:深度用户
- occupation:是否大学生 ,1:是,0:否
-
new_user_class_level:城市层级
-
用户的行为日志behavior_log
本数据集涵盖了raw_sample中全部用户22天内的购物行为(共七亿条记录)。字段说明如下:
user:脱敏过的用户ID; time_stamp:时间戳; btag:行为类型, 包括以下四种: 类型 | 说明 pv | 浏览 cart | 加入购物车 fav | 喜欢 buy | 购买 cate_id:脱敏过的商品类目id; brand_id: 脱敏过的品牌id; 这里以user + time_stamp为key,会有很多重复的记录;这是因为我们的不同的类型的行为数据是不同部门记录的,在打包到一起的时候,实际上会有小的偏差(即两个一样的time_stamp实际上是差异比较小的两个时间)
1.2 项目效果展示
1.3 项目实现分析
-
主要包括
-
一份广告点击的样本数据raw_sample.csv:体现的是用户对不同位置广告点击、没点击的情况
- 一份广告基本信息数据ad_feature.csv:体现的是每个广告的类目(id)、品牌(id)、价格特征
- 一份用户基本信息数据user_profile.csv:体现的是用户群组、性别、年龄、消费购物档次、所在城市级别等特征
- 一份用户行为日志数据behavior_log.csv:体现用户对商品类目(id)、品牌(id)的浏览、加购物车、收藏、购买等信息
我们是在对非搜索类型的广告进行点击率预测和推荐(没有搜索词、没有广告的内容特征信息)
-
推荐业务处理主要流程: 召回 ===> 排序 ===> 过滤
-
离线处理业务流
- raw_sample.csv ==> 历史样本数据
- ad_feature.csv ==> 广告特征数据
- user_profile.csv ==> 用户特征数据
- raw_sample.csv + ad_feature.csv + user_profile.csv ==> CTR点击率预测模型
- behavior_log.csv ==> 评分数据 ==> user-cate/brand评分数据 ==> 协同过滤 ==> top-N cate/brand ==> 关联广告
- 协同过滤召回 ==> top-N cate/brand ==> 关联对应的广告完成召回
-
在线处理业务流
-
数据处理部分:
- 实时行为日志 ==> 实时特征 ==> 缓存
- 实时行为日志 ==> 实时商品类别/品牌 ==> 实时广告召回集 ==> 缓存
-
推荐任务部分:
- CTR点击率预测模型 + 广告/用户特征(缓存) + 对应的召回集(缓存) ==> 点击率排序 ==> top-N 广告推荐结果
-
-
-
涉及技术:Flume、Kafka、Spark-streming\HDFS、Spark SQL、Spark ML、Redis
- Flume:日志数据收集
- Kafka:实时日志数据处理队列
- HDFS:存储数据
- Spark SQL:离线处理
- Spark ML:模型训练
- Redis:缓存
1.4 点击率预测(CTR--Click-Through-Rate)概念
- 电商广告推荐通常使用广告点击率(CTR--Click-Through-Rate)预测来实现
点击率预测 VS 推荐算法
点击率预测需要给出精准的点击概率,比如广告A点击率0.5%、广告B的点击率0.12%等;而推荐算法很多时候只需要得出一个最优的次序A>B>C即可。
点击率预测使用的算法通常是如逻辑回归(Logic Regression)这样的机器学习算法,而推荐算法则是一些基于协同过滤推荐、基于内容的推荐等思想实现的算法
点击率 VS 转化率
点击率预测是对每次广告的点击情况做出预测,可以判定这次为点击或不点击,也可以给出点击或不点击的概率
转化率指的是从状态A进入到状态B的概率,电商的转化率通常是指到达网站后,进而有成交记录的用户比率,如用户成交量/用户访问量
**搜索和非搜索广告点击率
- 点赞
- 收藏
- 关注作者
评论(0)