Spark Streaming 项目实战 (2) | 从 Kafka中消费数据

举报
不温卜火 发表于 2020/12/03 00:27:22 2020/12/03
【摘要】   大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客...

  大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/

  本片博文为大家带来的是Spark Streaming 项目实战 (2) | 从 Kafka中消费数据。
1


2
  编写App, 从 kafka 读取数据

  新建一个Maven项目:spark-streaming-project

  在依赖选择上spark-streaming-kafka此次选用0-10_2.11而非0-08_2.11

 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.1</version> </dependency>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

3

一. 测试是否能够从Kafka消费到数据

  • 1. 新建APP(Trait)
package com.buwenbuhuo.streaming.project.app


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
trait App {
  def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("App").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "bigdata0814", "auto.offset.reset" -> "latest", // 自动提交管理 "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("ads_log0814") val sourceStream: DStream[String] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, // 标配 Subscribe[String, String](topics, kafkaParams) ).map(_.value()) sourceStream.print(1000) ssc.start() ssc.awaitTermination()
  }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 2. 新建AreaTopAPP
package com.buwenbuhuo.streaming.project.app

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AreaTopAPP extends App {

}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 3. 运行AreaTopAPP

4
  上述即为测试 ,但是其实在这个app内,有一部分可以专门封装成一个新的样例类

  测试能够成功得到所想要的结果,下面给出完善最终的程序源码

二. 完整程序源码

编写App, 从 kafka 读取数据

  • bean 类 AdsInfo
package com.buwenbuhuo.streaming.project.bean
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 17:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
case class AdsInfo(ts: Long, area: String, city: String, userId: String, adsId: String, var timestamp: Timestamp = null, var dayString: String = null, // 2012-8-14 var hmString: String = null) { // 11:20 timestamp = new Timestamp(ts) val date = new Date(ts)
  dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)
  hmString = new SimpleDateFormat("HH:mm").format(date)
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 2. 工具类类 MyKafkaUtils
package com.buwenbuhuo.streaming.project.util

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 15:20
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object MyKafkaUtils { val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "bigdata0814", "auto.offset.reset" -> "latest", // 自动提交管理 "enable.auto.commit" -> (true: java.lang.Boolean)
  ) /*
  * 根据传入的参数,返回从kafka得到的流
  * @param ssc
  * @param topic
  * @return
  */
  def getKafkaSteam(ssc:StreamingContext,topics:String*): DStream[String] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, // 标配 Subscribe[String, String](topics.toIterable, kafkaParams) ).map(_.value())
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 3. 从kafka消费数据(APP)
package com.buwenbuhuo.streaming.project.app


import com.buwenbuhuo.streaming.project.bean.AdsInfo
import com.buwenbuhuo.streaming.project.util.MyKafkaUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
trait App {
  def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("App").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) // 得到最原始的流 val sourceStream: DStream[String] = MyKafkaUtils.getKafkaSteam(ssc,"ads_log0814") val adsInfoStream: DStream[AdsInfo] = sourceStream.map(s => { val spilt: Array[String] = s.split(",") AdsInfo(spilt(0).toLong, spilt(1), spilt(2), spilt(3), spilt(4)) }) adsInfoStream.print(1000) ssc.start() ssc.awaitTermination()
  }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 4. AreaTopAPP
package com.buwenbuhuo.streaming.project.app

/**
 *
 * @author 不温卜火
 * @create 2020-08-14 13:41
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AreaTopAPP extends App {

}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. 运行结果

同时运行MockRealtimeData(数据生产者)和AreaTopAPP(数据消费者)
5
6
  本次的分享就到这里了,


14

  好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
  如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
  码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!

15
16

文章来源: buwenbuhuo.blog.csdn.net,作者:不温卜火,版权归原作者所有,如需转载,请联系作者。

原文链接:buwenbuhuo.blog.csdn.net/article/details/108002293

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。