spark streaming 整合 kafka 两种 Receiver-based 和 Direct Approach

举报
小米粒-biubiubiu 发表于 2020/12/03 00:57:24 2020/12/03
【摘要】 一、Receiver 方式整合 spark streaming  和 kafka 启动zookeeper zkServer.sh start 启动kafka : kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 创建topic: kafka-to...

一、Receiver 方式整合 spark streaming  和 kafka

启动zookeeper

zkServer.sh start

启动kafka :

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

创建topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming-topic

查看topic 是否创建成功:

kafka-topics.sh --list --zookeeper localhost:2181

通过控制台测试本topic 是否能够正常生产和消费信息:

kafka-console-producer.sh  --broker-list localhost:9092 --topic kafka-streaming-topic

kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-streaming-topic

项目添加依赖


      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>2.2.0</version>
      </dependency>
  
 

应用程序代码编写 :


      package com.dzx.scala.dzxbootscala.spark
      import kafka.serializer.{StringDecoder, StringEncoder}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      /**
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月03日 16:29:19
       */
      class KafkaReceiverWordCount {
      //args : hadoop000:2181 test kafka-streaming-topic 1
        def main(args: Array[String]): Unit = {
      if(args.length!=4){
      System.err.print("必须为四个参数")
      System.exit(1)
       }
       val Array(zkQuorum,group,topics,numThreads) =args
       val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
       val ssc = new StreamingContext(sparkConf, Seconds(5))
      //todo spark streaming 对接kafka
       val topicMap  = topics.split(",").map((_,numThreads.toInt)).toMap
       val messages =   KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
      // todo 测试为什么要取第二个
       messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
       ssc.start()
       ssc.awaitTermination()
        }
      }
  
 

运行代码,并生产一些消息,控制台可以看到词频统计结果

在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),


      mvn clean package -DskipTests
      spark-submit  \
      --class com.dzx.scala.dzxbootscala.spark.KafkaReceiverWordCount \
      --master local[2] \
      --name KafkaReceiverWordCount \
      --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
      /home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka-streaming-topic 1
  
 

      生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问
      192.168.42.85:4040 ui界面可以看到详细信息
  
 

二、Direct  方式 整合spark streaming  和 kafka

前置步骤 同 方式一 。

应用程序代码编写 :


      object KafkaDirectWordCount{
       //args : 192.168.42.85:9092 kafka-streaming-topic
        def main(args: Array[String]): Unit = {
      if(args.length!=2){
       System.err.print("必须为四个参数")
       System.exit(1)
       }
       val Array(brokers,topics) =args
       val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
       val ssc = new StreamingContext(sparkConf, Seconds(5))
      //todo spark streaming 对接kafka
       val topicSet  = topics.split(",").toSet
       val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)
       val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringEncoder](ssc,kafkaParams,topicSet)
      // todo 测试为什么要取第二个
       messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
       ssc.start()
       ssc.awaitTermination()
        }
      }
  
 

运行代码,并生产一些消息,控制台可以看到词频统计结果

在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),


      mvn clean package -DskipTests
      spark-submit  \
      --class com.dzx.scala.dzxbootscala.spark.KafkaDirectWordCount \
      --master local[2] \
      --name KafkaDirectWordCount \
      --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
      /home/hadoop/lib/sparktrain-1.0.jar 192.168.42.85:9092 kafka-streaming-topic
  
 

生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问 
192.168.42.85:4040 ui界面可以看到详细信息

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85701662

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。