【王喆-推荐系统】特征工程篇-(task2)用Spark进行特征处理

举报
野猪佩奇996 发表于 2022/01/22 23:50:59 2022/01/22
【摘要】 学习心得 (1)本次task学习了推荐系统中特征处理的主要方式,并利用 Spark 实践了类别型特征和数值型特征的主要处理方法,深度学习和传统机器学习的区别并不大,TensorFlow、PyTorch ...

学习心得

(1)本次task学习了推荐系统中特征处理的主要方式,并利用 Spark 实践了类别型特征和数值型特征的主要处理方法,深度学习和传统机器学习的区别并不大,TensorFlow、PyTorch 等深度学习平台也提供了类似的特征处理函数。

(2)其中几个特征处理API:
Normalizer,是范式归一化操作,保证归一化之后范式为1
StandardScaler,是标准差归一化操作,保证归一化之后均值为0标准差为1
RobustScaler,是使用分位数进行鲁棒归一化操作,可以有效减少异常值的干扰
MinMaxScaler,是使用最大值和最小值进行归一化操作。

(3)Spark 的计算过程:Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。
注意:OneHotEncoderEstimator()在PySpark 3.0.0及以上版本已经更改为 OneHotEncoder()

在这里插入图片描述

零、背景引入

  • 上次task学习了推荐系统要使用的常用特征——基本分为“用户行为”、“用户关系”、“属性标签”、“内容数据”、“场景信息”这五个类别。但这些原始的特征是无法直接提供给推荐模型使用的,因为推荐模型本质上是一个函数,输入输出都是数字或数值型的向量。像动作、喜剧、爱情、科幻这些电影风格,是怎么转换成数值供推荐模型使用的呢?用户的行为历史又是怎么转换成数值特征的呢?

  • 类似的特征处理过程在数据量变大之后还会变得更加复杂,因为工业界的数据集往往都是 TB 甚至 PB 规模的,这在单机上肯定是没法处理的。那业界又是怎样进行海量数据的特征处理呢?

一、业界主流的大数据处理利器:Spark

1.1 Spark原理

Spark是业界主流的大数据处理利器。
分布式:指的是计算节点之间不共享内存,需要通过网络通信的方式交换数据。
Spark 是一个分布式计算平台。Spark 最典型的应用方式就是建立在大量廉价的计算节点上,这些节点可以是廉价主机,也可以是虚拟的 Docker Container(Docker 容器)。

Spark 的架构图中:

  • Spark 程序由 Manager Node(管理节点)进行调度组织
  • 由 Worker Node(工作节点)进行具体的计算任务执行
  • 最终将结果返回给 Drive Program(驱动程序)。

在物理的 Worker Node 上,数据还会分为不同的 partition(数据分片),可以说 partition 是 Spark 的基础数据单元。

在这里插入图片描述

图1 Spark架构图

Spark 计算集群能够比传统的单机高性能服务器具备更强大的计算能力,就是由这些成百上千,甚至达到万以上规模的工作节点并行工作带来的。

1.2 一个具体栗子

那在执行一个具体任务的时候,Spark 是怎么协同这么多的工作节点,通过并行计算得出最终的结果呢?这里我们用一个任务来解释一下 Spark 的工作过程。

一个具体任务过程:
(1)先从本地硬盘读取文件 textFile;
(2)再从分布式文件系统 HDFS 读取文件 hadoopFile;
(3)然后分别对它们进行处理;
(4)再把两个文件按照 ID 都 join 起来得到最终的结果。

  • 在 Spark 平台上处理这个任务的时候,会将这个任务拆解成一个子任务 DAG(Directed Acyclic Graph,有向无环图),再根据 DAG 决定程序各步骤执行的方法。从图 2 中可以看到,这个 Spark 程序分别从 textFile 和 hadoopFile 读取文件,再经过一系列 map、filter 等操作后进行 join,最终得到了处理结果。

在这里插入图片描述

图2 某Spark程序的任务有向无环图
  • 最关键的过程是要理解哪些是可以纯并行处理的部分,哪些是必须 shuffle(混洗)和 reduce 的部分:这里的 shuffle 指的是所有 partition 的数据必须进行洗牌后才能得到下一步的数据,最典型的操作就是图 2 中的 groupByKey 操作和 join 操作。以 join 操作为例,必须对 textFile 数据和 hadoopFile 数据做全量的匹配才可以得到 join 后的 dataframe(Spark 保存数据的结构)。而 groupByKey 操作则需要对数据中所有相同的 key 进行合并,也需要全局的 shuffle 才能完成。

  • 与之相比,mapfilter 等操作仅需要逐条地进行数据处理和转换,不需要进行数据间的操作,因此各 partition 之间可以完全并行处理

  • 在得到最终的计算结果之前,程序需要进行 reduce 的操作,从各 partition 上汇总统计结果,随着 partition 的数量逐渐减小,reduce 操作的并行程度逐渐降低,直到将最终的计算结果汇总到 master 节点(主节点)上。可以说,shufflereduce 操作的触发决定了纯并行处理阶段的边界。

在这里插入图片描述

图3 被shuffle操作分割的DAG stages

注意:
(1)shuffle 操作需要在不同计算节点之间进行数据交换,非常消耗计算、通信及存储资源,因此 shuffle 操作是 spark 程序应该尽量避免的。
shuffle可以理解为一个串行操作,需要等到在此之前的并行工作完成之后才可以顺序开始。
(2)简述Spark 的计算过程:Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。

下面将应用Spark在推荐系统的特征处理上,用 Spark 处理我们的 Sparrow Recsys 项目的数据集。带着2个问题学习: 经典的特征处理方法有什么?Spark 是如何实现这些特征处理方法的?

二、如何利用 One-hot 编码处理类别型特征

2.1 One-hot编码

广义上来讲,所有的特征都可以分为两大类:
(1)第一类是类别、ID 型特征(以下简称类别型特征)。
拿电影推荐来说,电影的风格、ID、标签、导演演员等信息,用户看过的电影 ID、用户的性别、地理位置信息、当前的季节、时间(上午,下午,晚上)、天气等等,这些无法用数字表示的信息全都可以被看作是类别、ID 类特征。
(2)第二类是数值型特征,能用数字直接表示的特征就是数值型特征。
典型的包括用户的年龄、收入、电影的播放时长、点击量、点击率等。

特征处理的目的:把所有的特征全部转换成一个数值型的特征向量,对于数值型特征,这个过程非常简单,直接把这个数值放到特征向量上相应的维度上就可以了。但是对于类别、ID 类特征,怎么处理它们呢?

这里就要用到 One-hot 编码(也被称为独热编码),它是将类别、ID 型特征转换成数值向量的一种最典型的编码方式。它通过把所有其他维度置为 0,单独将当前类别或者 ID 对应的维度置为 1 的方式生成特征向量。

One-hot编码举栗:假设某样本有三个特征,分别是星期、性别和城市,我们用 [Weekday=Tuesday, Gender=Male, City=London] 来表示,用 One-hot 编码对其进行数值化的结果。
在这里插入图片描述

图4 One-hot编码特征向量

除了上面栗子的类别型特征外,ID 型特征也经常使用 One-hot 编码。

比如,在 SparrowRecsys 中,用户 U 观看过电影 M,这个行为是一个非常重要的用户特征,如何向量化这个行为呢?其实也是使用 One-hot 编码。假设,我们的电影库中一共有 1000 部电影,电影 M 的 ID 是 310(编号从 0 开始),那这个行为就可以用一个 1000 维的向量来表示,让第 310 维的元素为 1,其他元素都为 0。

2.2 Sparrow系统栗子

下面看看 SparrowRecsys 是如何利用 Spark 完成这一过程的。这里使用 Spark 的机器学习库 MLlib 来完成 One-hot 特征的处理。

其中,最主要的步骤是,先创建一个负责 One-hot 编码的转换器,OneHotEncoderEstimator,然后通过它的 fit 函数完成指定特征的预处理,并利用 transform 函数将原始特征转换成 One-hot 特征。实现思路大体上就是这样,具体的步骤可以参考下面给出的源码:

def oneHotEncoderExample(samples:DataFrame): Unit ={
  //samples样本集中的每一条数据代表一部电影的信息,其中movieId为电影id
  val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType))


  //利用Spark的机器学习库Spark MLlib创建One-hot编码器
  val oneHotEncoder = new OneHotEncoderEstimator()
    .setInputCols(Array("movieIdNumber"))
    .setOutputCols(Array("movieIdVector"))
    .setDropLast(false)


  //训练One-hot编码器,并完成从id特征到One-hot向量的转换
  val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
  //打印最终样本的数据结构
  oneHotEncoderSamples.printSchema()
  //打印10条样本查看结果
  oneHotEncoderSamples.show(10)

_(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering__中的oneHotEncoderExample函数)_

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • One-hot 编码也可以自然衍生成 Multi-hot 编码(多热编码)。比如,对于历史行为序列类、标签特征等数据来说,用户往往会与多个物品产生交互行为,或者一个物品被打上多个标签,这时最常用的特征向量生成方式就是把其转换成 Multi-hot 编码
  • 在 SparrowRecsys 中,因为每个电影都是有多个 Genre(风格)类别的,所以我们就可以用 Multi-hot 编码完成标签到向量的转换。
  • 可以尝试着用 Spark 实现该过程,也可以参考 SparrowRecsys 项目中 multiHotEncoderExample 的实现。

2.3 Multiple编码

Multiple编码特征将多个属性同时编码到一个特征中。在推荐场景中,单个用户对哪些物品感兴趣的特征就是一种Multiple编码特征,如,表示某用户对产品1、产品2、产品3、产品4是否感兴趣,则这个特征可能有多个取值,如用户A对产品1和产品2感兴趣,用户B对产品1和产品4感兴趣,用户C对产品1、产品3和产品4感兴趣,则用户兴趣特征为

用户 UserInterests
A [1, 2]
B [1, 4]
C [1, 3, 4]

  
 
  • 1
  • 2
  • 3
  • 4

Multiple编码采用类似oneHot编码的形式进行编码,根据物品种类数目,展成物品种类数目大小的向量,当某个用户感兴趣时,对应维度为1,反之为0,如下

用户 UserInterests
A [1, 1, 0, 0]
B [1, 0, 0, 1]
C [1, 0, 1, 1]

  
 
  • 1
  • 2
  • 3
  • 4

如何使用Multiple编码呢?
我们将多个属性同时编码到同一个特征中,目的就是同时利用多个属性的特征。经过Multiple编码后的特征大小为[batch_size, num_items],记作U,构建物品items的Embedding矩阵,该矩阵维度为[num_items, embedding_size],记作V,将矩阵U和矩阵V相乘,我们就得到了大小为[batch_size, embedding_size]的多属性表示。

三、数值型特征的处理——归一化和分桶

为啥处理特征:一是特征的尺度,二是特征的分布。

3.1 解决特征的尺度相差过大

前者即防止特征尺度之间相距过大,比如在电影推荐中有两个特征,一个是电影的评价次数 fr(无上限),一个是电影的平均评分 fs。fr波动范围高于fs几个数量级,可能会完全掩盖fs作用,所以将两个特征尺度拉平到一个区域内(通常是[0, 1],即所谓的归一化)。

3.2 解决特征分布不均匀问题

归一化虽然能够解决特征取值范围不统一的问题,但无法改变特征值的分布。比如图 5 就显示了 Sparrow Recsys 中编号在前 1000 的电影平均评分分布。由于人们打分有“中庸偏上”的倾向,因此评分大量集中在 3.5 的附近,而且越靠近 3.5 的密度越大。这对于模型学习来说也不是一个好的现象,因为特征的区分度并不高

在这里插入图片描述

图5 电影的平均评分分布

经常会用分桶的方式来解决特征值分布极不均匀的问题。
分桶(Bucketing):将样本按照某特征的值从高到低排序,然后按照桶的数量找到分位数,将样本分到各自的桶中,再用桶 ID 作为特征值。

在 Spark MLlib 中,分别提供了两个转换器 MinMaxScalerQuantileDiscretizer,来进行归一化和分桶的特征处理。它们的使用方法和之前介绍的 OneHotEncoderEstimator 一样,都是先用 fit 函数进行数据预处理,再用 transform 函数完成特征转换。下面的代码就是 SparrowRecSys 利用这两个转换器完成特征归一化和分桶的过程。

def ratingFeatures(samples:DataFrame): Unit ={
  samples.printSchema()
  samples.show(10)


  //利用打分表ratings计算电影的平均分、被打分次数等数值型特征
  val movieFeatures = samples.groupBy(col("movieId"))
    .agg(count(lit(1)).as("ratingCount"),
      avg(col("rating")).as("avgRating"),
      variance(col("rating")).as("ratingVar"))
      .withColumn("avgRatingVec", double2vec(col("avgRating")))


  movieFeatures.show(10)


  //分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中
  val ratingCountDiscretizer = new QuantileDiscretizer()
    .setInputCol("ratingCount")
    .setOutputCol("ratingCountBucket")
    .setNumBuckets(100)


  //归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化
  val ratingScaler = new MinMaxScaler()
    .setInputCol("avgRatingVec")
    .setOutputCol("scaleAvgRating")


  //创建一个pipeline,依次执行两个特征处理过程
  val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
  val featurePipeline = new Pipeline().setStages(pipelineStage)


  val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
  //打印最终结果
  movieProcessedFeatures.show(

_(参考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering中的ratingFeatures函数)_

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

3.3 YouTube的数值型特征处理

在经典的 YouTube 深度推荐模型中,可以看到一些很有意思的处理方法。比如,在处理观看时间间隔(time since last watch)和视频曝光量(previous impressions)这两个特征时,YouTube 模型对它们进行归一化后,又将它们各自处理成了三个特征(图 6 中红框内的部分),分别是原特征值 x,特征值的平方x^2,以及特征值的开方,这又是为什么呢?
在这里插入图片描述

图6 YouTube推荐模型(来源:Deep Neural Networks for YouTube Recommendations)

无论是平方还是开方操作,改变的还是这个特征值的分布,这些操作与分桶操作一样,都是希望通过改变特征的分布,让模型能够更好地学习到特征内包含的有价值信息。但由于我们没法通过人工的经验判断哪种特征处理方式更好,所以索性把它们都输入模型,让模型来做选择。

四、作业

(1)请你查阅一下 Spark MLlib 的编程手册,找出 NormalizerStandardScalerRobustScalerMinMaxScaler 这个几个特征处理方法有什么不同。

Normalizer、StandardScaler、RobustScaler、MinMaxScaler 都是用让数据无量纲化

  • Normalizer: 正则化;(和Python的sklearn一样是按行处理,而不是按列[每一列是一个特征]处理,原因是:Normalization主要思想是对每个样本计算其p-范数,然后对该样本中每个元素除以该范数,这样处理的结果是使得每个处理后样本的p-范数(l1-norm,l2-norm)等于1。)针对每行样本向量:l1: 每个元素/样本中每个元素绝对值的和,l2: 每个元素/样本中每个元素的平方和开根号,lp: 每个元素/每个元素的p次方和的p次根,默认用l2范数。

  • StandardScaler:数据标准化; ( x i − u ) / σ (xi - u) / σ (xiu)/σ 【u:均值,σ:方差】当数据(x)按均值(μ)中心化后,再按标准差(σ)缩放,数据就会服从为均值为0,方差为1的正态分布(即标准正态分布)。

  • RobustScaler: ( x i − m e d i a n ) / I Q R (xi - median) / IQR (ximedian)/IQR 【median是样本的中位数,IQR是样本的 四分位距:根据第1个四分位数和第3个四分位数之间的范围来缩放数据】

  • MinMaxScaler:数据归一化, ( x i − m i n ( x ) ) / ( m a x ( x ) − m i n ( x ) ) (xi - min(x)) / (max(x) - min(x)) (ximin(x))/(max(x)min(x)) ;当数据(x)按照最小值中心化后,再按极差(最大值 - 最小值)缩放,数据移动了最小值个单位,并且会被收敛到 [0,1]之间

(2)你能试着运行一下 SparrowRecSys 中的 FeatureEngineering 类,从输出的结果中找出,到底哪一列是我们处理好的 One-hot 特征和 Multi-hot 特征吗?以及这两个特征是用 Spark 中的什么数据结构来表示的呢?

答:One-hot特征是调用OneHotEncoderEstimator对movieId转换,生成了特征movieIdVector;
Multi-hot 特征是调用Vectors.sparse方法,对处理后的genreIndexes转换,生成vector。
这俩个特征都是稀疏向量表示(数据结构:SparseVector),不是稠密向量。
其中的数据分别是:(类别数量,索引数组,值数组)。索引数组长度必须等于值数组长度。

五、答疑

(1)对训练数据进行平方或者开方,是为了改变训练数据的分布。训练数据的分布被改变后,训练出来的模型岂不是不能正确拟合训练数据了。

答:本质上是改变了特征的分布,特征的分布和训练数据的分布没有本质的联系。只要你不改变训练数据label的分布,最终预测出的结果都应该是符合数据本身分布的。因为你要预测的是label,并不是特征本身。

Reference

(1)《深度学习推荐系统实战》,王喆
(2)王喆大佬的github:https://github.com/wzhe06
(3)Machine Learning Library (MLlib)
(4)https://www.codeleading.com/article/97252516619/#_OneHot_19
(5)http://spark.apache.org/docs/3.0.0/api/python/index.html

文章来源: andyguo.blog.csdn.net,作者:山顶夕景,版权归原作者所有,如需转载,请联系作者。

原文链接:andyguo.blog.csdn.net/article/details/120650754

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。