Spark Streaming 快速入门系列(3) | DStream中如何创建数据源
Spark Streaming 原生支持一些不同的数据源。
一. RDD 队列(测试用)
- 1. 用法及说明
- 2. 案例实操
需求:循环创建几个 RDD,将 RDD 放入队列。通过 Spark Streaming创建 Dstream,计算 WordCount
package com.buwenbuhuo.spark.streaming.day01
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
* @author 不温卜火
* @create 2020-08-07 13:08
* MyCSDN :
object WordCount1 {
def main(args: Array[String]): Unit = { // 从RDD队列中读取数据,仅仅用于压力测试 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount1") val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) val rdds: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]() val sourceStream: InputDStream[Int] = ssc.queueStream(rdds,false) val result: DStream[Int] = sourceStream.reduce(_+_) result.print() ssc.start() val sc: SparkContext = ssc.sparkContext while (true) { rdds.enqueue(sc.parallelize(1 to 100)) Thread.sleep(10) } ssc.awaitTermination()
- 3. 运行结果
二. 自定义数据源
- 1. 使用及说明
- 2. 需求:
- 3. 源码
package com.buwenbuhuo.spark.streaming.day01
import{BufferedReader, InputStreamReader}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
* @author 不温卜火
* @create 2020-08-07 17:03
* MyCSDN :
object MyReceiverDemo {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyReceiverDemo") val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) val sourceStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002",9999) sourceStream .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_ + _) .print() ssc.start() ssc.awaitTermination()
// 接收器从socket接受数据
class MyReceiver(host:String,port:Int) extends Receiver[String](storageLevel = StorageLevel.MEMORY_ONLY) { var socket:Socket = _
var reader:BufferedReader = _
override def onStart(): Unit = { runInThread{ try { socket = new Socket(host, port) reader = new BufferedReader(new InputStreamReader(socket.getInputStream, "utf-8")) var line: String = reader.readLine() // 当对方发送一个流结束标志的时候,会受到null while (line != null && socket.isConnected) { store(line) line = reader.readLine() // 如果流中没有数据,这将会一直阻塞 } }catch { case e => e.printStackTrace() }finally { restart("重启服务器") // 自动立即调用onStop,然后再调用onStart } }
} // 在一个子线程中去执行传入的代码
def runInThread(op: => Unit) ={ new Thread(){ override def run():Unit = op }.start()
} // 释放资源
override def onStop(): Unit = { if(socket != null) socket.close() if(reader != null) reader.close()
- 4. 测试结果
nc -lk 9999
- 1
三. Kafka 数据源
1. 准备工作
- 1. 用法及说明
在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。
包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。
- 2. 导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version>
- 1
- 2
- 3
- 4
- 5
- 6
- 3. 启动zookeeper和kafka集群
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/
[bigdata@hadoop002 kafka]$
- 1
- 2
- 4. 开启一个生产者和消费端
// 生产者
[bigdata@hadoop002 kafka]$ bin/ --broker-list hadoop002:9092 --topic first0810
// 消费者
[bigdata@hadoop002 kafka]$ bin/ --bootstrap-server hadoop002:9092 --topic first0810
- 1
- 2
- 3
- 4
- 5
- 5. 通过IDEA接收数据
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
** @author 不温卜火
* @create 2020-08-10 9:34
* MyCSDN :
object WorldCount1 {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount1") val ssc = new StreamingContext(conf, Seconds(3)) val params: Map[String, String] = Map[String,String]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "" -> "0810" ) val srouceStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, Set("first0810") ) srouceStream.print ssc.start() ssc.awaitTermination() }
2. 正式运行
2.1 直接消费
- 1. 源码
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
** @author 不温卜火
* @create 2020-08-10 9:34
* MyCSDN :
object WorldCount1 {
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount1") val ssc = new StreamingContext(conf, Seconds(3)) val params: Map[String, String] = Map[String,String]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "" -> "0810" ) KafkaUtils .createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, Set("first0810") ).flatMap { case (_, v) => v.split("\\W+") }.map((_,1)) .reduceByKey(_ + _) .print() ssc.start() ssc.awaitTermination() }
- 2. 运行结果
2.2 with checkpoint(解决数据丢失问题)
缺点: 小文件过多
- 1. 源码
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
* @author 不温卜火
* @create 2020-08-10 11:24
* MyCSDN :
object WordCount2 {
def creatSSC(): StreamingContext ={ val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount2") val ssc = new StreamingContext(conf, Seconds(3)) // 把offset的跟踪在checkpoint中 ssc.checkpoint("ck1") val params: Map[String, String] = Map[String,String]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "" -> "0810" ) KafkaUtils .createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, params, Set("first0810") ).flatMap { case (_, v) => v.split("\\W+") }.map((_,1)) .reduceByKey(_ + _) .print() // 返回一个ssc ssc
def main(args: Array[String]): Unit = { /* 从ckeckpoint中恢复一个StreamingContext, 如果ckeckpoint不存在,则调用后面的函数去创建一个StreamingContext */ val ssc: StreamingContext = StreamingContext.getActiveOrCreate("ck1",creatSSC) ssc.start() ssc.awaitTermination() }
2.3 with checkpoint的改良版本(常用)
- 1. 源码
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
* @author 不温卜火
* @create 2020-08-10 12:29
* MyCSDN :
object WorldCount3 {
val groupId = "0810"
val params: Map[String, String] = Map[String, String]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "" -> groupId)
val topics: Set[String] = Set("first0810")
// KafkaUtils KafkaCluster
val cluster: KafkaCluster = new KafkaCluster(params)
读取开始的offsets */
def readOffsets() = { var resultMap = Map[TopicAndPartition, Long]() // 1.获取这些topic的所有分区 val topicAndPartitionSetEither: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(topics) topicAndPartitionSetEither match { // 2. 获取topic和分区的信息 case Right(topicAndPartitionSet: Set[TopicAndPartition]) => // 3. 获取到分区信息和他的offset val topicAndPartitionToLongEither: Either[Err, Map[TopicAndPartition, Long]] = cluster.getConsumerOffsets(groupId, topicAndPartitionSet) topicAndPartitionToLongEither match { // 没有每个topic的每个分区都已经存储过偏移量,表示曾经消费过,而且也维护过这个偏移量 case Right(map) => resultMap ++= map // 表示这个topic的这个分区是第一次消费 case _ => topicAndPartitionSet.foreach(topicAndPartition => { resultMap += topicAndPartition -> 0L }) } case _=> // 表示不存在任何topic } resultMap
def saveOffsets(stream: InputDStream[String]): Unit ={ // 保存offset一定从kafka消费到的直接的那个Stream保存 // 每个批次执行一次传递过去的函数 stream.foreachRDD(rdd =>{ var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition,Long]() // 如果这个rdd是直接来自与Kafka,则可以强转成 HasOffsetRanges val hasOffsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges] // 所有的分区的偏移量 val ranges: Array[OffsetRange] = hasOffsetRanges.offsetRanges ranges.foreach(OffsetRange => { val key: TopicAndPartition = OffsetRange.topicAndPartition() val value: Long = OffsetRange.untilOffset map += key -> value }) cluster.setConsumerOffsets(groupId,map) })
} def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount3") val ssc = new StreamingContext(conf, Seconds(3)) val sourceStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, params, readOffsets(), (handler: MessageAndMetadata[String, String]) => handler.message() ) sourceStream .flatMap(_.split("\\W+")) .map((_,1)) .reduceByKey(_+_) .print(1000) // 如果不写数字具体为10行 saveOffsets(sourceStream) ssc.start() ssc.awaitTermination() }
- 运行结果
