spark streaming 整合flume ,kafka 打造通用流处理

举报
小米粒-biubiubiu 发表于 2020/12/02 23:53:24 2020/12/02
【摘要】 一、编写LoggerGenerator.java文件用于生成日志信息 添加依赖 <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> &l...

一、编写LoggerGenerator.java文件用于生成日志信息

添加依赖


      <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
      </dependency>
  
 

编写  日志产生类 LoggerGenerator.java 


      public class LoggerGenerator {
      private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
      public static void main(String[] args) throws InterruptedException {
      int index = 0;
      while (true) {
       Thread.sleep(1000);
       logger.info("current value is " + index++);
       }
       }
      }
  
 

编写 log4j.properties 配置文件  


      log4j.rootLogger=INFO,stdout,flume
      log4j.appender.stdout = org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.Target = System.out
      log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
      #日志输出到flume
      log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
      log4j.appender.flume.Hostname=192.168.42.85
      log4j.appender.flume.Port=41414
      log4j.appender.flume.UnsafeMode=true
  
 

二、使用flume 采集 log4j产生的日志

cd  $FLUME_HOME/conf  ,编写 streaming.conf


      agent1.sources=avro-source
      agent1.channels=logger-channel
      agent1.sinks=log-sink
      agent1.sources.avro-source.type=avro
      agent1.sources.avro-source.bind=0.0.0.0
      agent1.sources.avro-source.port=41414
      agent1.channels.logger-channel.type=memory
      agent1.sinks.log-sink.type=logger
      agent1.sources.avro-source.channels=logger-channel
      agent1.sinks.log-sink.channel=logger-channel
  
 

 

启动 flume ,运行 日志产生的java 文件,可以看到 控制台上面有 采集到的log4j 日志输出

flume-ng agent \

--conf $FLUME_HOME/conf \

--conf-file $FLUME_HOME/conf/streaming.conf \

--name agent1 -Dflume.root.logger=INFO,console

三、将flume 收集到的数据输出到kafka 

启动 zookeeper

./kafka-server-start.sh -daemon  $KAFKA_HOME/config/server.properties

jps

查看 topic

./kafka-topics.sh --list --zookeeper hadoop000:2181

创建一个 streamingtopic 主题

./kafka-topics.sh  --create --zookeeper hadoop000:2181  --replication-factor 1 --partitions  1  --topic  "streamingtopic"

cd   $FLUME_HOME/conf   ,编写  streaming2.conf  


      agent1.sources=avro-source
      agent1.channels=logger-channel
      agent1.sinks=kafka-sink
      agent1.sources.avro-source.type=avro
      agent1.sources.avro-source.bind=0.0.0.0
      agent1.sources.avro-source.port=41414
      agent1.channels.logger-channel.type=memory
      agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
      agent1.sinks.kafka-sink.topic= streamingtopic
      agent1.sinks.kafka-sink.brokerList = hadoop000:9092
      agent1.sinks.kafka-sink.requiredAcks= 1
      agent1.sinks.kafka-sink.batchSize = 20
      agent1.sources.avro-source.channels=logger-channel
      agent1.sinks.kafka-sink.channel=logger-channel
  
 

启动 flume 

flume-ng  agent  --name agent1  \ 

--conf  $FLUME_HOME/conf  \

--conf-file $FLUME_HOME/conf/streaming2.conf  \

-Dflume.root.logger= INFO,console

启动 kafka

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic  streamingtopic

 编写  KafkaReceiverWordCount.java  文件


      package com.dzx.scala.dzxbootscala.spark
      import kafka.serializer.{StringDecoder, StringEncoder}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      /**
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月03日 16:29:19
       */
      class KafkaReceiverWordCount {
      //args : hadoop000:2181 test streamingtopic 1
        def main(args: Array[String]): Unit = {
      if(args.length!=4){
       System.err.print("必须为四个参数")
       System.exit(1)
       }
      val Array(zkQuorum,group,topics,numThreads) =args
      val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
      val ssc = new StreamingContext(sparkConf, Seconds(5))
      //todo spark streaming 对接kafka
      val topicMap  = topics.split(",").map((_,numThreads.toInt)).toMap
      val messages =   KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
      // todo 测试为什么要取第二个
       messages.map(_._2).count.print()
       ssc.start()
       ssc.awaitTermination()
        }
      }
  
 

 运行LoggerGenerator.java ,确保 flume ,kafka 都启动好了之后,运行  KafkaReceiverWordCount .java  文件 , flume 每采集到 20条 日志, KafkaReceiverWordCount.java 文件的控制台就会 输出 一个 数字20. 

现在是在本地 用 idea 运行 日志产生类,然后使用flume ,kafka ,spark streaming 进行 处理操作的,在生产上面我们需要打成一个jar 包

  • 将 日志产生类 打成 一个jar 包
  • 启动 flume 和 kafka、
  • spark  streaming 也是需要打成 jar包的,然后使用spark-submit的 方式进行 提交到环境上 执行

使用哪种方式,可以根据实际情况进行选择模式 : local/yarn/ standlone /mesos

 在生产上 整个 流处理的流程 是一样的,不同的是我们的业务逻辑的复杂性。

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/86158082

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。