【Pyspark基础】sql获取user最近3次使用的item

举报
野猪佩奇996 发表于 2022/05/16 22:36:19 2022/05/16
【摘要】 文章目录 一、先用window函数尝试二、获得对应的collection_list三、用withColumn和split进行分隔Reference 一、先用window函数尝试 pyspa...

一、先用window函数尝试

pyspark.sql和mysql一样有window窗口函数:
在这里插入图片描述

# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F


window_out = Window.partitionBy("user_id") \
                   .orderBy(F.desc("collect_time")) 
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
                          .where(F.col('rank') <= 3) 
user_feed_test.show(7)
# .where(F.col('row_number') <= 3).select("user_id","beat_id","collect_type","rank") \

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

这样就能得到每个用户最近三次使用的item了(根据时间进行排序,在最后一列),然后把一些beat_id异常值去掉:

# 对异常的beat_id的处理,去掉所有值为0的表项
user_feed_test = user_feed_test.filter(F.col('beat_id') > 0)
user_feed_test.show(7)

"""
+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type|        collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272|           4|2021-08-22 04:54:...|   1|
|10065188| 885812|           5|2020-10-23 18:53:...|   2|
|10068979|1069390|           5|2021-06-20 07:44:...|   1|
|10074915|1122682|           4|2021-09-07 14:26:...|   2|
|10075397| 947751|           4|2022-01-30 07:30:...|   1|
|10075397| 336641|           5|2022-01-30 07:23:...|   2|
|10075397| 886179|           4|2022-01-05 10:35:...|   3|
+--------+-------+------------+--------------------+----+
"""

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

二、获得对应的collection_list

# 这里要注意创建视图,否则上面的user_feed_test只是一个变量
user_feed_test.createOrReplaceTempView("user_feed_test")
#  把每个用户前3收藏的装入set中
df = spark.sql("select user_id, \
               concat_ws(',', collect_set(beat_id)) as collection_list  \
               FROM user_feed_test \
               GROUP BY user_id")
df.show(5)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

结果为:

+--------+--------------------+
| user_id|     collection_list|
+--------+--------------------+
|10065188|       827272,885812|
|10068979|             1069390|
|10074915|          -2,1122682|
|10075397|336641,886179,947751|
|10454402|888649,884711,108...|
+--------+--------------------+

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

三、用withColumn和split进行分隔

withColumn增加对应列:

# 用withColumn和split进行分隔
from pyspark.sql.functions import instr, split, when

# instr表示找到在字符串中对应字符第一次出现的下标位置
#df = df.withColumn('separator_if_exists',
#                   (instr(col('collection_list'),',') > 0) & instr(col('collection_list'),',').isNotNull())
df = df.withColumn('separator_if_exists', 
                   (split(col('collection_list'),',')[0] > 0).isNotNull())
df = df.withColumn('collect_beat1',
                   when(col('separator_if_exists') == True,
                        split(col('collection_list'),',')[0]).otherwise(None))
# .drop('separator_if_exists')

df = df.withColumn('collect_beat2',
                   when(col('separator_if_exists') == True,
                        split(col('collection_list'),',')[1]).otherwise(None))
df = df.withColumn('collect_beat3',
                   when(col('separator_if_exists') == True,
                        split(col('collection_list'),',')[2]).otherwise(None))

df.show(truncate=False)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

结果为:

+--------+----------------------+-------------------+-------------+-------------+-------------+
|user_id |collection_list       |separator_if_exists|collect_beat1|collect_beat2|collect_beat3|
+--------+----------------------+-------------------+-------------+-------------+-------------+
|10065188|827272,885812         |true               |827272       |885812       |null         |
|10068979|1069390               |true               |1069390      |null         |null         |
|10074915|-2,1122682            |true               |-2           |1122682      |null         |
|10364341|1070218               |true               |1070218      |null         |null         |
+--------+----------------------+-------------------+-------------+-------------+-------------+

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Reference

[1] https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.window.html?highlight=window#pyspark.sql.functions.window

文章来源: andyguo.blog.csdn.net,作者:山顶夕景,版权归原作者所有,如需转载,请联系作者。

原文链接:andyguo.blog.csdn.net/article/details/124773435

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。