Spark Streaming 进阶实战五个例子
【摘要】 一、带状态的算子:UpdateStateByKey
实现 计算 过去一段时间到当前时间 单词 出现的 频次
object StatefulWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName(...
一、带状态的算子:UpdateStateByKey
实现 计算 过去一段时间到当前时间 单词 出现的 频次
-
object StatefulWordCount {
-
def main(args: Array[String]): Unit = {
-
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
-
val ssc = new StreamingContext(sparkConf, Seconds(5))
-
//如果使用了 stateful 的算子,必须要设置 checkpoint,
-
//因为老的值 必须要 存在 某个 目录下面,新的值 才能去更新老的值
-
//在生产环境中,建议把checkpoint 设置到 hdfs 的某个文件夹中
-
ssc.checkpoint(".")
-
val lines = ssc.socketTextStream("localhost", 6789)
-
val result = lines.flatMap(_.split(" ").map((_, 1)))
-
val state = result.updateStateByKey[Int](updateFunction _)
-
state.print()
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
-
/**
-
* 把当前的数据去更新已有的或者老的数据
-
* @param currentValues 当前的
-
* @param preValues 老的
-
* @return
-
*/
-
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
-
val current = currentValues.sum
-
val pre = preValues.getOrElse(0)
-
Some(current + pre)
-
}
-
}
二、实战:计算到目前为止累积出现的单词的个数写入到mysql中
-
/**
-
* 使用 spark streaming 完成 词频统计,并输出到 mysql 数据库
-
* 创建 数据库
-
*
-
* 创建数据表
-
* create table wordcount (
-
* word varchar(50) default null,
-
* wordcount int(10) default null
-
* )
-
*/
-
object ForeachRDDApp {
-
-
def main(args: Array[String]): Unit = {
-
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
-
val ssc = new StreamingContext(sparkConf, Seconds(5))
-
-
-
val lines = ssc.socketTextStream("localhost", 6789)
-
val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)
-
//result.print()
-
-
//将结果写入到mysql
-
//1、错误的方式
-
// result.foreachRDD(rdd =>{
-
// val connection = createConnection()
-
// rdd.foreach {
-
// record =>
-
// val sql = "insert into wordcount (word,wordcount)" +
-
// "values ('"+record._1+"','"+record._2+"')"
-
// connection.createStatement().execute(sql)
-
// }
-
// })
-
-
//2、正确的方式
-
result.foreachRDD(rdd => {
-
rdd.foreachPartition(partitionOfRecords => {
-
if (partitionOfRecords.size > 0) {
-
val connection = createConnection()
-
partitionOfRecords.foreach(pair => {
-
val sql = "insert into wordcount (word,wordcount)" +
-
"values ('" + pair._1 + "','" + pair._2 + "')"
-
connection.createStatement().execute(sql)
-
})
-
connection.close()
-
}
-
})
-
})
-
-
//3、更好的方式,查阅官方文档,使用 连接池的方式
-
-
//存在的问题,这样每次都会插入新的数据,同样的单词频次字段不会去累加更新
-
//解决方案 :每次 insert 之前,判断一下,该单词是否已经存在数据库中,如果已经存在则update
-
//或者 存放在 hbase /redis 中,调用相应的api ,直接 插入和更新。
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
-
def createConnection() = {
-
Class.forName("com.mysql.jdbc.Driver")
-
DriverManager.getConnection("jdbc://mysql://localhost:3306/dzx_spark", "root", "1234")
-
}
-
}
三、基于window 的统计
window :定时的进行一个时间段内的数据处理
window length : 窗口的长度
sliding interval : 窗口的间隔
这2个参数和我们的batch size 成倍数关系。如果不是倍数关系运行直接报错
每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc ==>每隔 sliding interval 统计 window length 的值
pair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))
四、黑名单过滤
-
/**
-
* 黑名单过滤
-
*
-
* 访问日志 ==> DStream
-
*
-
* 20180808,zs
-
* 20180808,ls
-
* 20180808,ww
-
*
-
* ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)
-
*
-
* 黑名单列表 ==》 RDD
-
* zs ls
-
* ==>(zs:true) (ls:true)
-
*
-
*
-
* leftjoin
-
* (zs:[<20180808,zs>,<true>]) pass ...
-
* (ls:[<20180808,ls>,<true>]) pass ...
-
* (ww:[<20180808,ww>,<false>]) ==> tuple1 ok...
-
*
-
*
-
*/
-
object BlackNameListApp {
-
-
def main(args: Array[String]): Unit = {
-
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
-
val ssc = new StreamingContext(sparkConf, Seconds(5))
-
-
-
/**
-
* 构建黑名单
-
*/
-
val blacks = List("zs", "ls")
-
val blacksRDD = ssc.sparkContext.parallelize(blacks)
-
.map(x => (x, true))
-
val lines = ssc.socketTextStream("localhost", 6789)
-
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
-
rdd.leftOuterJoin(blacksRDD).filter(x => x._2._2.getOrElse(false) != true).map(x => x._2._1)
-
})
-
clicklog.print()
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
五、 spark streaming 整合 spark sql 实战
-
object SqlNetworkWordCount {
-
-
def main(args: Array[String]): Unit = {
-
-
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")
-
val ssc = new StreamingContext(sparkConf, Seconds(5))
-
val lines = ssc.socketTextStream("192.168.42.85", 6789)
-
val words = lines.flatMap(_.split(" "))
-
-
// Convert RDDs of the words DStream to DataFrame and run SQL query
-
words.foreachRDD { (rdd: RDD[String], time: Time) =>
-
// Get the singleton instance of SparkSession
-
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
-
import spark.implicits._
-
// Convert RDD[String] to RDD[case class] to DataFrame
-
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
-
// Creates a temporary view using the DataFrame
-
wordsDataFrame.createOrReplaceTempView("words")
-
// Do word count on table using SQL and print it
-
val wordCountsDataFrame =
-
spark.sql("select word, count(*) as total from words group by word")
-
println(s"========= $time =========")
-
wordCountsDataFrame.show()
-
}
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
-
-
/** Case class for converting RDD to DataFrame */
-
-
case class Record(word: String)
-
-
/** Lazily instantiated singleton instance of SparkSession */
-
-
object SparkSessionSingleton {
-
@transient private var instance: SparkSession = _
-
def getInstance(sparkConf: SparkConf): SparkSession = {
-
if (instance == null) {
-
instance = SparkSession
-
.builder
-
.config(sparkConf)
-
.getOrCreate()
-
}
-
instance
-
}
-
}
-
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/85676609
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)