spark streaming 整合 kafka 两种 Receiver-based 和 Direct Approach
一、Receiver 方式整合 spark streaming 和 kafka
启动zookeeper
zkServer.sh start
启动kafka :
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming-topic
查看topic 是否创建成功:
kafka-topics.sh --list --zookeeper localhost:2181
通过控制台测试本topic 是否能够正常生产和消费信息:
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streaming-topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-streaming-topic
项目添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
应用程序代码编写 :
package com.dzx.scala.dzxbootscala.spark
import kafka.serializer.{StringDecoder, StringEncoder}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author DuanZhaoXu
* @ClassName:
* @Description:
* @date 2019年01月03日 16:29:19
*/
class KafkaReceiverWordCount {
//args : hadoop000:2181 test kafka-streaming-topic 1
def main(args: Array[String]): Unit = {
if(args.length!=4){
System.err.print("必须为四个参数")
System.exit(1)
}
val Array(zkQuorum,group,topics,numThreads) =args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//todo spark streaming 对接kafka
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
// todo 测试为什么要取第二个
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
运行代码,并生产一些消息,控制台可以看到词频统计结果
在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),
mvn clean package -DskipTests
spark-submit \
--class com.dzx.scala.dzxbootscala.spark.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
/home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka-streaming-topic 1
生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问
192.168.42.85:4040 ui界面可以看到详细信息
二、Direct 方式 整合spark streaming 和 kafka
前置步骤 同 方式一 。
应用程序代码编写 :
object KafkaDirectWordCount{
//args : 192.168.42.85:9092 kafka-streaming-topic
def main(args: Array[String]): Unit = {
if(args.length!=2){
System.err.print("必须为四个参数")
System.exit(1)
}
val Array(brokers,topics) =args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//todo spark streaming 对接kafka
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringEncoder](ssc,kafkaParams,topicSet)
// todo 测试为什么要取第二个
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
运行代码,并生产一些消息,控制台可以看到词频统计结果
在生产环境中,代码中去掉setAppName("KafkaReceiverWordCount").setMaster("local[2]"),
mvn clean package -DskipTests
spark-submit \
--class com.dzx.scala.dzxbootscala.spark.KafkaDirectWordCount \
--master local[2] \
--name KafkaDirectWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
/home/hadoop/lib/sparktrain-1.0.jar 192.168.42.85:9092 kafka-streaming-topic
生产一些消息,就可以在centos7 控制台看到 词频统计结果,或者访问
192.168.42.85:4040 ui界面可以看到详细信息
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/85701662
- 点赞
- 收藏
- 关注作者
评论(0)