Spark_udf使用

举报
bigdata张凯翔 发表于 2021/03/26 00:36:07 2021/03/26
【摘要】 package cn.itzkx.spark_udf import org.apache.spark.sql.SparkSession case class Stu(name: String, like: String) object FunctionApp { def main(args: Array[String]): Unit = { val spark = Spark...
package cn.itzkx.spark_udf

import org.apache.spark.sql.SparkSession
case class Stu(name: String, like: String)
object FunctionApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() // .master("local[2]") // .appName("AnalyzerTrain") // .getOrCreate() import spark.implicits._ val likeDF = spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t苍老师,波老师")) .map(x => { val fileds = x.split("\t") Stu(fileds(0).trim, fileds(1).trim) } ).toDF() spark.udf.register("like_count", (like: String) => like.split(",").size) //注册函数like_count //方式一: likeDF.createOrReplaceTempView("info") //DF通过createOrReplaceTempView注册成临时表info spark.sql("select name,like,like_count(like) num from info").show() spark.stop() }
  //定义了每个人喜欢的人的个数的函数;以上的是定义函数以后通过sql来使用的,那如何通过API来使用呢?看下面的代码 /** * 执行结果 * +----+--------------+---+ |name| like|num| +----+--------------+---+ |17er|ruoze,j哥,星星,小海|  4| |  老二| zwr,17er|  2| |  小海| 苍老师,波老师|  2| +----+--------------+---+ */

}
package cn.itzkx.spark_udf

import org.apache.spark.sql.SparkSession

case class Student(name: String, like: String)
object FunctionApp1 {
  def main(args: Array[String]): Unit = { val spark =SparkSession.builder()// .master("local[2]")// .appName("AnalyzerTrain")// .getOrCreate() import spark.implicits._ val likeDF= spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t苍老师,波老师")) .map(x => { val fileds = x.split("\t") Student(fileds(0).trim, fileds(1).trim) } ).toDF() spark.udf.register("like_count" ,(like:String)=>like.split(",").size) //注册函数like_count //likeDF.createOrReplaceTempView("info")//DF通过createOrReplaceTempView注册成临时表info //方式二:注册函数以后直接就可以当成内置函数使用的模式来使用 likeDF.selectExpr("name","like","like_count(like) as cnt").show() }
}

package cn.itzkx.spark_udf

import org.apache.spark.sql.{SparkSession, functions}

case class Stut(name: String, like: String)
object FunctionApp2 {
  def main(args: Array[String]): Unit = { val spark =SparkSession.builder()// .master("local[2]")// .appName("AnalyzerTrain")// .getOrCreate() import spark.implicits._ val likeDF= spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t苍老师,波老师")) .map(x => { val fileds = x.split("\t") Stut(fileds(0).trim, fileds(1).trim) } ).toDF() //方式三: val like_count =functions.udf((like:String)=>like.split(",").size) //spark.udf.register("like_count" ,(like:String)=>like.split(",").size) likeDF.select($"name",$"like",like_count($"like").alias("cnt") ).show() //like_count 需要通过val来定义,这样在select里边使用的时候才不会爆红 spark.stop() }
}
package cn.itzkx.spark_udf

import cn.itzkx.spark_udf.UdfDemo.isAdult
import org.apache.spark.sql.SparkSession

object TestnewUdf { /** * 根据年龄大小返回是否成年 成年:true,未成年:false */
  def isAdult(age: Int) = { if (age < 18) { false } else { true } } def main(args: Array[String]): Unit = { /** * 新版本(Spark2.x)Spark Sql udf示例 */ //spark初始化 val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") // 注册一张user表 userDF.createOrReplaceTempView("user") //注册自定义函数(通过匿名函数) spark.udf.register("strLen", (str: String) => str.length()) //注册自定义函数(通过实名函数) spark.udf.register("isAdult", isAdult _) spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show //关闭 spark.stop() }
}

package cn.itzkx.spark_udf


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object TestoldDfUdf { /** * 根据年龄大小返回是否成年 成年:true,未成年:false */
  def isAdult(age: Int) = { if (age < 18) { false } else { true }
  } /** * 旧版本(Spark1.x)DataFrame udf示例 * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的 * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究 */ def main(args: Array[String]): Unit = { //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldDfUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 sc.stop() }
}
package cn.itzkx.spark_udf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object Testoldudf {
  /** * 根据年龄大小返回是否成年 成年:true,未成年:false */
  def isAdult(age: Int) = { if (age < 18) { false } else { true }
  }
  def main(args: Array[String]): Unit = { /** * 旧版本(Spark1.x)Spark Sql udf示例 */ //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") // 注册一张user表 userDF.registerTempTable("user") // 注册自定义函数(通过匿名函数) sqlContext.udf.register("strLen", (str: String) => str.length()) sqlContext.udf.register("isAdult", isAdult _) // 使用自定义函数 sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show //关闭 sc.stop()
  }
}




package cn.itzkx.spark_udf

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Spark Sql 用户自定义函数示例
  */
object UdfDemo { def main(args: Array[String]): Unit = { oldUdf newUdf newDfUdf oldDfUdf
  } /** * 根据年龄大小返回是否成年 成年:true,未成年:false */
  def isAdult(age: Int) = { if (age < 18) { false } else { true } } /** * 旧版本(Spark1.x)Spark Sql udf示例 */
  def oldUdf() { //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") // 注册一张user表 userDF.registerTempTable("user") // 注册自定义函数(通过匿名函数) sqlContext.udf.register("strLen", (str: String) => str.length()) sqlContext.udf.register("isAdult", isAdult _) // 使用自定义函数 sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show //关闭 sc.stop() } /** * 新版本(Spark2.x)Spark Sql udf示例 */
  def newUdf() { //spark初始化 val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") // 注册一张user表 userDF.createOrReplaceTempView("user") //注册自定义函数(通过匿名函数) spark.udf.register("strLen", (str: String) => str.length()) //注册自定义函数(通过实名函数) spark.udf.register("isAdult", isAdult _) spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show //关闭 spark.stop() } /** * 新版本(Spark2.x)DataFrame udf示例 */
  def newDfUdf() { val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 spark.stop()
  }
  /** * 旧版本(Spark1.x)DataFrame udf示例 * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的 * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究 */
  def oldDfUdf() { //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldDfUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 sc.stop()
  }

}

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/b1ebcea457c4

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。