【Pyspark】常用数据分析基础操作

举报
野猪佩奇996 发表于 2022/05/23 00:17:26 2022/05/23
【摘要】 文章目录 一、pyspark.sql部分1.窗口函数2.更换列名:3.sql将一个字段根据某个字符拆分成多个字段显示4.pd和spark的dataframe进行转换:5.报错ValueError:...

一、pyspark.sql部分

1.窗口函数

# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F


window_out = Window.partitionBy("user_id") \
                   .orderBy(F.desc("collect_time")) 
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
                          .where(F.col('rank') <= 3) 
user_feed_test.show(30)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

结果如下,和mysql的窗口函数类似的,以每个user_id分组,然后组内排序,这里我只获取排序后collect_time前3的数据,即最近3次的用户收藏数据:

+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type|        collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272|           4|2021-08-22 04:54:...|   1|
|10065188| 885812|           5|2020-10-23 18:53:...|   2|
|10068979|1069390|           5|2021-06-20 07:44:...|   1|
|10074915|     -2|           4|2021-11-27 13:42:...|   1|
|10074915|1122682|           4|2021-09-07 14:26:...|   2|
|10075397| 947751|           4|2022-01-30 07:30:...|   1|
|10075397| 336641|           5|2022-01-30 07:23:...|   2|
|10075397| 886179|           4|2022-01-05 10:35:...|   3|
|10104842| 886462|           1|2021-02-28 17:04:...|   1|
|10122654|1531961|           4|2022-03-16 11:09:...|   1|
|10122654| 893655|           4|2022-03-15 04:32:...|   2|
|10122654| 303121|           4|2022-03-14 05:59:...|   3|
|10134095|      0|           3|2021-07-24 13:02:...|   1|
|10134095|1023250|           4|2021-07-22 00:31:...|   2|
|10139927|      0|           5|2020-09-05 19:14:...|   1|
|10139927|      0|           5|2020-09-03 17:51:...|   2|
|10245428| 889915|           5|2020-05-18 14:41:...|   1|
|10245428|1073074|           5|2020-05-18 14:07:...|   2|
+--------+-------+------------+--------------------+----+

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.更换列名:

如现在有个人员信息表,新加上一列coun try字段信息:

# 修改列名
from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))
 
# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType


data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
print(data)

"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000), 
('Michael', 'Rose', '', '2000-05-19', 'M', 4000), 
('Robert', '', 'Williams', '1978-09-05', 'M', 4000), 
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000), 
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

先给出对应的字段,创建我们的DataFrame格式,然后通过withColumn新加上一列,其中lit("ABC")是指整列的数据都是对应的ABC字符串:

# schema只需要给出列名即可
columns = ["firstname","middlename","lastname","dob","gender","salary"]

# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()

# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()

df3 = df.withColumn("salary",col("salary")*100)
df3.show()

# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

结果如下:

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|400000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -100|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|    ABC|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    ABC|
|   Robert|          |Williams|1978-09-05|     M|  4000|    ABC|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    ABC|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    ABC|
+---------+----------+--------+----------+------+------+-------+

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

3.sql将一个字段根据某个字符拆分成多个字段显示

可以通过withColumnsplit进行分隔,参考上次写的【Pyspark基础】sql获取user最近3次使用的item

更多参考:
https://www.cnblogs.com/360aq/p/13269417.html
https://www.pythonheidong.com/blog/article/690421/aa556949151c244e81f8/

4.pd和spark的dataframe进行转换:

  • 当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用toPandas()
  • 当需要从Pandas DataFrame创建Spark DataFrame时,可以采用createDataFrame(pandas_df)

更多参考:
厦大数据实验室-借助arrow实现pyspark和pandas之间的数据转换:http://dblab.xmu.edu.cn/blog/2752-2/

5.报错ValueError: Some of types cannot be determined after inferring

是因为有字段类型spark识别不了:

(1)可以提高数据采样率:

sqlContext.createDataFrame(rdd, samplingRatio=0.01)

  
 
  • 1

(2)显式声明要创建的 DataFrame 的数据结构schema

from pyspark.sql.types import *
schema = StructType([
    StructField("c1", StringType(), True),
    StructField("c2", IntegerType(), True)
])
df = sqlContext.createDataFrame(rdd, schema=schema)

# 方法二: 使用toDF
from pyspark.sql.types import *
schema = StructType([
    StructField("c1", StringType(), True),
    StructField("c2", IntegerType(), True)
])
df = rdd.toDF(schema=schema)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

参考:https://www.codeleading.com/article/64083243294/

6.df按行打乱

每行生成随机数后排序,然后删除这一随机数的列。

import pyspark.sql.functions as F

# 从rdd生成dataframe
schema = StructType(fields)
df_1 = spark.createDataFrame(rdd, schema)

# 乱序: pyspark.sql.functions.rand生成[0.0, 1.0]中double类型的随机数
df_2 = df_1.withColumn('rand', F.rand(seed=42))

# 按随机数排序
df_rnd = df_2.orderBy(df_2.rand)

# 删除随机数的一列
df = df_rnd.drop(df_rnd.rand)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

7.表格的联结

Spark DataFrame理解和使用之两个DataFrame的关联操作
SQL数据库语言基础之SqlServer多表连接查询与INNER JOIN内连接查询
SQL的表格之间的join连接方式——inner join/left join/right join/full join语法及其用法实例

8.dataframe的操作

9.createDataFrame的几种方法

(1)注:structtype的格式(官方文档):
可以通过StructType设置schema,关于其使用:

# 读取beat数据
schema = StructType([StructField("beatid", StringType(), True)\
                   ,StructField("name", StringType(), True)\
                   ,StructField("language", StringType(), True)])
beats = spark.read.csv("filepath", header=False, schema=schema)
# print(beats.show())
beats.show()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

(2)从pandas dataframe创建spark dataframe

# 从pandas dataframe创建spark dataframe
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)

color_df=spark.createDataFrame(color_df)
color_df.show()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html?highlight=structtype#pyspark.sql.types.StructType

10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理

pandasDF_out.createOrReplaceTempView("pd_data")
# %%
spark.sql("select * from pd_data").show()
# %%
res = spark.sql("""select * from pd_data 
                where math>= 90 
                order by english desc""")
res.show()
# %%
output_DF = res.toPandas()
print(type(output_DF))

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

更多参考:https://blog.csdn.net/weixin_46408961/article/details/120407900

11.filter筛选

可以通过filter进行筛选,如找出category_title00后的对应行:

# 方法一:filter
category_beat.filter(" category_title = '00后' ").head(5)

  
 
  • 1
  • 2

https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.filter.html

12. 新增或者修改spark.sql中dataframe的某列

官方文档pyspark.sql.DataFrame.withColumn描述:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html?highlight=withcolumn#pyspark.sql.DataFrame.withColumn

13.将dataframe保存为csv文件

这里的repartition(num)是一个可选项:

# save positive_res_data
file_location = "file:///home/hadoop/development/RecSys/data/"
positive_res_data.repartition(2).write.csv(file_location + 'positive_res_data.csv')
# 保存成功,有多个文件

  
 
  • 1
  • 2
  • 3
  • 4

也可以使用save:
https://wenku.baidu.com/view/1329194ea75177232f60ddccda38376baf1fe078.html

14. 取出对应表项内容

首先初始表category_beat是这样的:

+--------------------------+----------------------------------+
|            category_title|                 collect_set(name)|
+--------------------------+----------------------------------+
|                      00|[白月光与朱砂痣, 致姗姗来迟的你...|
|                04年唱的歌|  [祝我生日快乐, 旅行的意义,...|
|10年前没有iPhone但有这些歌|    [逆光, 改变自己, 达尔文,...|
+--------------------------+----------------------------------+

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

现在要得到第一行的collect_set表项内容,即对应的集合:

import random

ans = category_beat.where(col("category_title") == '00后').select(col("collect_set(name)"))
# type(ans)
ans.show()

# 取出对应行列里的set,并且转为对应的list
lst = ans.take(1)[0][0]
# take是取出前1行

lst = list(lst)
print(lst, "\n")

print("推荐的歌曲:", lst[random.randint(0, len(lst))])

"""
+----------------------------------+
|                 collect_set(name)|
+----------------------------------+
|[白月光与朱砂痣, 致姗姗来迟的你...|
+----------------------------------+

['白月光与朱砂痣', '致姗姗来迟的你', '陨落', '踏山河', '花.间.酒', '山海', '

推荐的歌曲: 山海
"""

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

15.agg和groupby结合使用

二、Spark Core模块

2.1 udf函数的传参:

https://blog.csdn.net/yeshang_lady/article/details/121570361

2.2 pandas core dataframe

可以使用rdd的api。

2.3 rdd操作

rdd是spark的重点,包括两个算子:
【变换】map、flatMap、groupByKey、reduceByKey等
【动作】collect、count、take、top、first等

2.4 filter操作

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd = rdd.filter(lambda x: x % 2 == 0)
rdd.collect()
# [2, 4, 6]

  
 
  • 1
  • 2
  • 3
  • 4

2.5 flatMap

对rdd中每个元素按照函数操作,并将结果进行扁平化处理。

rdd = sc.parallelize([3, 4, 5])
fm = rdd.flatMap(lambda x: range(1, x))
fm.collect()
# [1, 2, 1, 2, 3, 1, 2, 3, 4]

  
 
  • 1
  • 2
  • 3
  • 4

三、MLlib模块

MLlib (DataFrame-based)
MLlib (RDD-based)

3.1 kmeans聚类分析

api的使用本身不难,和sklearn的使用差不多:

from pyspark.ml.clustering import KMeans
kMeans = KMeans(k=25, seed=1)

model = kMeans.fit(kmeans_data.select('features'))
model.transform(kmeans_data).show(1000)

# 分析模型训练的结果

# 训练得到的模型是否有summary
print(model.hasSummary, "\n")

# 获得聚类中心
print(model.clusterCenters(), "\n")

# 每个簇的大小(含有的数据点数)
print(model.summary.clusterSizes)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3.2 gbdt分类和回归

四、推荐算法

4.1 达观数据竞赛:3种改进DL算法

http://www.360doc.com/content/17/0615/21/16619343_663476259.shtml

【安装pyspark】
在Ubuntu上安装pyspark:https://zhuanlan.zhihu.com/p/34635519
apache官网上的安装包:https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

Reference

[1] https://spark.apache.org/docs/latest/api/python/reference/index.html
[2] 算法工程师的数据分析:https://zhuanlan.zhihu.com/p/343375787
[3] 用createDataFrame创建表的几种方法
[4] spark dataframe按行随机打乱
[5] dataframe的常用操作:
0)https://blog.csdn.net/xc_zhou/article/details/118617642
1)https://blog.csdn.net/yeshang_lady/article/details/89528090
2)https://www.jianshu.com/p/acd96549ee15
3)https://www.dandelioncloud.cn/article/details/1441272697576869890

文章来源: andyguo.blog.csdn.net,作者:山顶夕景,版权归原作者所有,如需转载,请联系作者。

原文链接:andyguo.blog.csdn.net/article/details/124715851

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。