spark streaming 整合 flume两种方式 push 和 pull

举报
小米粒-biubiubiu 发表于 2020/12/03 01:09:39 2020/12/03
【摘要】 一、push 方式 :spark streaming  整合  flume  配置开发 cd  $FLUME_HOME cd  conf 编写 flume配置文件, vim  flume_push_streaming.conf simple-agent.sources = netcat-sourcesimple-agent.sinks = avro-sinksi...

一、push 方式 :spark streaming  整合  flume  配置开发

cd  $FLUME_HOME

cd  conf

编写 flume配置文件, vim  flume_push_streaming.conf


  
  1. simple-agent.sources = netcat-source
  2. simple-agent.sinks = avro-sink
  3. simple-agent.channels = memory-channel
  4. simple-agent.sources.netcat-source.type = netcat
  5. simple-agent.sources.netcat-source.bind = hadoop000
  6. simple-agent.sources.netcat-source.port = 44444
  7. simple-agent.sinks.avro-sink.type = avro
  8. simple-agent.sinks.avro-sink.hostname = hadoop000
  9. simple-agent.sinks.avro-sink.port = 41414
  10. simple-agent.channels.memory-channel.type = memory
  11. simple-agent.channels.memory-channel.capacity = 1000
  12. simple-agent.channels.memory-channel.transactionCapacity = 100
  13. simple-agent.sources.netcat-source.channels = memory-channel
  14. simple-agent.sinks.avro-sink.channel = memory-channel

项目 中 添加 依赖


  
  1. <!--添加spark streaming 整合 flume 依赖-->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-streaming-flume_2.11</artifactId>
  5. <version>2.2.0</version>
  6. </dependency>

编写  spark streaming 整合 flume 的代码,使用 FlumeUtils 


  
  1. object FlumePushWordCount {
  2. def main(args: Array[String]): Unit = {
  3. if(args.length != 2){
  4. System.exit(1)
  5. }
  6. val Array(hostname,port) =args
  7. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
  8. val ssc = new StreamingContext(sparkConf,Seconds(5))
  9. val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)
  10. flumeStream.map(x=>new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  11. ssc.start()
  12. ssc.awaitTermination()
  13. }

启动 flume


  
  1. flume-ng agent --name simple-agent \
  2. --conf $FLUME_HOME/conf \
  3. --conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
  4. -Dflume.root.logger=INFO,console

使用idea 运行代码,并传入两个参数 hostname 和  port 

执行 nc -lk  44444

输入任意字母单词 按回车键,看到控制台上输出单词频次统计结果 ,说明运行成功。

将代码 打包并提交到 服务器上运行:


  
  1. object FlumePushWordCount {
  2. def main(args: Array[String]): Unit = {
  3. if(args.length != 2){
  4. System.exit(1)
  5. }
  6. val Array(hostname,port) =args
  7. //注释掉master 和 appName,因为这两个在服务器上运行时是使用spark-submit 命令指定的
  8. //.setMaster("local[2]").setAppName("FlumePushWordCount")
  9. val sparkConf = new SparkConf()
  10. val ssc = new StreamingContext(sparkConf,Seconds(5))
  11. val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)
  12. flumeStream.map(x=>new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  13. ssc.start()
  14. ssc.awaitTermination()
  15. }

执行 mvn  clean package -DskipTests 将代码打包

执行以下命令,(--packages 导入依赖的spark-streaming-flume_2.11.jar)即可运行应用程序。


  
  1. spark-submit --class com. dzx.scala.dzxbootscala.spark.FlumePushWordCount
  2. --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
  3. /home/hadoop/lib/sparktrain-1.0.jar hadoop000 41414

 

二、pull方式 :spark streaming  整合  flume  配置开发(推荐这种更稳定,容错,安全)

添加依赖 


  
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-flume-sink_2.11</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-lang3</artifactId>
  9. <version>3.5</version>
  10. </dependency>

 

编写 flume 配置文件 flume_pull_streaming.conf


  
  1. simple-agent.sources = netcat-source
  2. simple-agent.sinks = spark-sink
  3. simple-agent.channels = memory-channel
  4. simple-agent.sources.netcat-source.type = netcat
  5. simple-agent.sources.netcat-source.bind = hadoop000
  6. simple-agent.sources.netcat-source.port = 44444
  7. simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
  8. simple-agent.sinks.spark-sink.hostname = hadoop000
  9. simple-agent.sinks.spark-sink.port = 41414
  10. simple-agent.channels.memory-channel.type = memory
  11. simple-agent.channels.memory-channel.capacity = 1000
  12. simple-agent.channels.memory-channel.transactionCapacity = 100
  13. simple-agent.sources.netcat-source.channels = memory-channel
  14. simple-agent.sinks.spark-sink.channel = memory-channel

 编写 应用程序代码


  
  1. object FlumePullWordCount {
  2. def main(args: Array[String]): Unit = {
  3. if(args.length != 2){
  4. System.exit(1)
  5. }
  6. val Array(hostname,port) =args
  7. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
  8. val ssc = new StreamingContext(sparkConf,Seconds(5))
  9. //只有这一个地方和 push 的方法不同,其他代码完全相同
  10. val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
  11. flumeStream.map(x=>new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  12. ssc.start()
  13. ssc.awaitTermination()
  14. }

 先启动 flume 


  
  1. flume-ng agent --name simple-agent \
  2. --conf $FLUME_HOME/conf \
  3. --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf \
  4. -Dflume.root.logger=INFO,console

在idea 运行 应用程序同方式一,或者 打包到服务器上面 运行命令如下:


  
  1. spark-submit --class com. dzx.scala.dzxbootscala.spark.FlumePullWordCount
  2. --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
  3. /home/hadoop/lib/sparktrain-1.0.jar hadoop000 41414

 

 

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85693902

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。