spark streaming 整合 kafka 两种 Receiver-based 和 Direct Approach

举报
小米粒-biubiubiu 发表于 2020/12/03 00:57:24 2020/12/03
【摘要】 一、Receiver 方式整合 spark streaming  和 kafka 启动zookeeper zkServer.sh start 启动kafka : kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 创建topic: kafka-to...

一、Receiver 方式整合 spark streaming  和 kafka

启动zookeeper

zkServer.sh start

启动kafka :

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

创建topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming-topic

查看topic 是否创建成功:

kafka-topics.sh --list --zookeeper localhost:2181

通过控制台测试本topic 是否能够正常生产和消费信息:

kafka-console-producer.sh  --broker-list localhost:9092 --topic kafka-streaming-topic

kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-streaming-topic

项目添加依赖


  
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>

应用程序代码编写 :


  
  1. package com.dzx.scala.dzxbootscala.spark
  2. import kafka.serializer.{StringDecoder, StringEncoder}
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. /**
  7. * @author DuanZhaoXu
  8. * @ClassName:
  9. * @Description:
  10. * @date 2019年01月03日 16:29:19
  11. */
  12. class KafkaReceiverWordCount {
  13. //args : hadoop000:2181 test kafka-streaming-topic 1
  14. def main(args: Array[String]): Unit = {
  15. if(args.length!=4){
  16. System.err.print("必须为四个参数")
  17. System.exit(1)
  18. }
  19. val Array(zkQuorum,group,topics,numThreads) =args
  20. val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
  21. val ssc = new StreamingContext(sparkConf, Seconds(5))
  22. //todo spark streaming 对接kafka
  23. val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
  24. val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
  25. // todo 测试为什么要取第二个
  26. messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }

运行代码,并生产一些消息,控制台可以看到词频统计结果

在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),


  
  1. mvn clean package -DskipTests
  2. spark-submit \
  3. --class com.dzx.scala.dzxbootscala.spark.KafkaReceiverWordCount \
  4. --master local[2] \
  5. --name KafkaReceiverWordCount \
  6. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
  7. /home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka-streaming-topic 1

  
  1. 生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问 
  2. 192.168.42.85:4040 ui界面可以看到详细信息

二、Direct  方式 整合spark streaming  和 kafka

前置步骤 同 方式一 。

应用程序代码编写 :


  
  1. object KafkaDirectWordCount{
  2. //args : 192.168.42.85:9092 kafka-streaming-topic
  3. def main(args: Array[String]): Unit = {
  4. if(args.length!=2){
  5. System.err.print("必须为四个参数")
  6. System.exit(1)
  7. }
  8. val Array(brokers,topics) =args
  9. val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
  10. val ssc = new StreamingContext(sparkConf, Seconds(5))
  11. //todo spark streaming 对接kafka
  12. val topicSet = topics.split(",").toSet
  13. val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)
  14. val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringEncoder](ssc,kafkaParams,topicSet)
  15. // todo 测试为什么要取第二个
  16. messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  17. ssc.start()
  18. ssc.awaitTermination()
  19. }
  20. }

运行代码,并生产一些消息,控制台可以看到词频统计结果

在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),


  
  1. mvn clean package -DskipTests
  2. spark-submit \
  3. --class com.dzx.scala.dzxbootscala.spark.KafkaDirectWordCount \
  4. --master local[2] \
  5. --name KafkaDirectWordCount \
  6. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
  7. /home/hadoop/lib/sparktrain-1.0.jar 192.168.42.85:9092 kafka-streaming-topic

生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问 
192.168.42.85:4040 ui界面可以看到详细信息

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85701662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。