统计推荐模块

举报
bigdata张凯翔 发表于 2021/03/27 23:14:21 2021/03/27
【摘要】 package com.zkx.statistics import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache...
package com.zkx.statistics

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp


case class Rating(userId:Int,productId:Int,score:Double,timestamp: Int)
case class MongoConfig(uri:String,db:String)
object StatisticsRecommemder {
  //定义mongodb中存储的表名
  val MONGODB_RATING_COLLECTION = "Rating" val RATE_MORE_PRODUCTS = "RateMoreProducts"
  val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts" val AVERAGE_PRODUCTS = "AverageProducts" def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[1]", "mongo.uri" -> "mongodb://localhost:27017/recommender", "mongo.db" -> "recommender" ) //创建一个spark config val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender") //创建spark session val spark = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) // 加载数据 val ratingDF = spark.read .option("uri", mongoConfig.uri) .option("collection", MONGODB_RATING_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[Rating] .toDF() //创建一张叫ratings的临时表 ratingDF.createOrReplaceTempView("ratings") //TODO:用spark sql 去做不同的统计推荐 //1.历史热门商品,按照评分个数统计,productId,count val rateMoreProductsDF = spark.sql("select productId,count(productId) as count from ratings group by productId order by count desc") storeDFInMongoDB(rateMoreProductsDF, RATE_MORE_PRODUCTS) //2.近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId,count,yearmonth //创建一个日期格式工具 val simpleDateFormat = new SimpleDateFormat("yyyyMM") //注册UDF,将timestamp转化为年月格式yyyyMM spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt) //把原始rating数据转换成想要的结构productId,count,score,yearmonth val ratingOfYearMonthDF = spark.sql("select productId,score,changeDate(timestame) as yearmonth from ratings") ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth") val rateMoreRecentlyProductsDF = spark.sql("select productId,count(productId) as count,yearmonth from ratingOfMonth group by yearmonth,productId order by yearmonth desc,count desc") //把df保存到mongodb storeDFInMongoDB(rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS) //3.优质商品统计,商品的平均分 productId,avg val averageProductsDF = spark.sql("select productId,avg(score) as avg from ratings group by productId order by avg desc") storeDFInMongoDB(averageProductsDF,AVERAGE_PRODUCTS) spark.stop()
  } //存到mongodb中的某一张表
  def storeDFInMongoDB(df: DataFrame, collection_name:String)(implicit mongoConfig: MongoConfig): Unit = { df.write .option("uri",mongoConfig.uri) .option("collection",collection_name) .mode("overwrite") .format("com.mongodb.spark.sql") .save()
}
}

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/54af6dee5f07

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。