spark streaming 实时流处理项目实战

举报
小米粒-biubiubiu 发表于 2020/12/03 00:37:36 2020/12/03
6.7k+ 0 0
【摘要】                                              spark streaming  实时流处理项目实战 一、 编写python 日志产生脚本,并使用linux 的crontab 命令 一分钟执行一次 tool.lu/crontab 网站 可以测试 cron表达式 cd  /home/hadoop/data/project 将以下...

                                             spark streaming  实时流处理项目实战

一、 编写python 日志产生脚本,并使用linux 的crontab 命令 一分钟执行一次

tool.lu/crontab 网站 可以测试 cron表达式

cd  /home/hadoop/data/project

将以下python 脚本放到 /home/hadoop/data/project 下


      #coding=UTF-8
      import random
      import time
      url_paths = [
      "class/112.html",
      "class/128.html",
      "class/145.html",
      "class/146.html",
      "class/131.html",
      "class/130.html",
      "learn/821",
      "course/list"
      ]
      ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98]
      http_referers = [
        "http://www.baidu.com/?wd={query}",
        "https://www.sogou.com/web?query={query}",
        "http://cn.bing.com/search?q={query}",
        "https://search.yahoo.com/search?p={query}",
      ]
      search_keyword = [
      "Spark sql实战",
      "Hadoop基础",
      "Strom实战",
      "Spark streaming实战",
      "大数据面试"]
      status_code = ["200","404","500"]
      def sample_url():
      return random.sample(url_paths,1)[0]
      def sample_ip():
       slice = random.sample(ip_slices,4)
      return ".".join([str(item) for item in slice])
      def sample_referer():
      if random.uniform(0,1)>0.2:
      return "-"
       refer_str =random.sample(http_referers,1)
       query_str =random.sample(search_keyword,1)
      return refer_str[0].format(query=query_str[0])
      def sample_status_code():
      return  random.sample(status_code,1)[0]
      def generate_log(count=10):
       time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
       f = open("/home/hadoop/data/project/logs/access.log","w+")
      #f = open("D:\\access.log","w+")
      while count >=1:
       query_log ="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(local_time=time_str,url=sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code())
       print(query_log)
       f.write(query_log+"\n")
       count = count -1
      if __name__ == '__main__':
       generate_log(50)
      # python命令执行py脚本
      #python generate_log.py
  
 

cd  /home/hadoop/data/project

编写shell脚本

vim  log_generator.sh 

python /home/hadoop/data/project/generate_log.py
 

chmod u+x log_generator.sh

./log_generator.sh

crontab   -e

*/1 * * * *  /home/hadoop/data/project/log_generator.sh
 

tail -300f /home/hadoop/data/project/logs/access.log 查看日志

执行 date 命令,查看是否过了一分钟,当过去了1分钟之后,再次查看 access.log日志文件,发现有日志生成,

并且1分钟生成一次。

二、对接python 日志产生器输出的日志到flume

cd  $Flume_HOME/conf

编写 配置文件  streaming_project.config


      #执行source,sink,channel名称
      exec-memory-logger.sources = exec-source
      exec-memory-logger.sinks = logger-sink
      exec-memory-logger.channels =memory-channel
      #配置source 类型为exec ,从文件读取
      exec-memory-logger.sources.exec-source.type = exec
      exec-memory-logger.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
      exec-memory-logger.sources.exec-source.shell = /bin/sh -c
      #配置channel类型为memory
      exec-memory-loggers.channels.memory-channel.type = memory
      #配置sink的类型为logger
      exec-memory-logger.sinks.logger-sink.type = logger
      #将 source ,channel ,sink 串起来
      exec-memory-logger.sources.exec-source.channels =memory-channel
      exec-memory-logger.sinks.logger-sink.channel = memory-channel
  
 

flume-ng agent \ 

--name exec-memory-logger \ 

--conf  $FLUME_HOME/conf \ 
--conf-file  $FLUME_HOME/streaming_project.conf \

-Dflume.root.logger = INFO,console

等待一分钟,之后看到控制台获取到了 access.log 文件新产生的 日志信息。

三、将flume 采集的日志信息 输出到 kafka 

启动 zk

./zkServer.sh  start

启动 kafka

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

jps 查看 zk和kafka 是否启动成功

修改flume 的配置文件输出到kafka

vim streaming_project2.conf


      #执行source,sink,channel名称
      exec-memory-kafka.sources = exec-source
      exec-memory-kafka.sinks = kafka-sink
      exec-memory-kafka.channels =memory-channel
      #配置source 类型为exec ,从文件读取
      exec-memory-kafka.sources.exec-source.type = exec
      exec-memory-kafka.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
      exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
      #配置channel类型为memory
      exec-memory-kafka.channels.memory-channel.type = memory
      #配置sink的类型为kafka
      exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafak.KafkaSink
      exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
      exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
      exec-memory-kafka.sinks.kafka-sink.batchSize = 5
      exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
      #将 source ,channel ,sink 串起来
      exec-memory-kafka.sources.exec-source.channels =memory-channel
      exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
  
 

启动kafka消费者进行监听:

kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic  streamingtopic

启动flume

flume-ng agent \

--name exec-memory-kafka \

--conf $FLUME_HOME/conf  \

--conf-file /home/hadoop/data/project/streaming_project2.conf \

-Dflume.root.logger = INFO,console

等待一分钟后看到  kafka 消费者,输出了flume 采集到的日志信息

四、Spark streaming 对接 kafka 进行消费

功能1:今天到现在为止,实战课程 的访问量


      /**
       * 清洗后的日志信息
       * @param ip 日志访问的ip
       * @param time 日志访问的时间
       * @param courseId 日志访问的课程编号
       * @param statusCode 状态码
       * @param referer 日志访问的referer信息,途径(日志从哪里过来的)
       */
      case  class ClickLog (ip:String,time:String,courseId:Int,statusCode:Int,referer:String){
      }
  
 

      /**
       * 实战课程点击数
       * @param day_course 对应的就是hbase 中的rowkey,20171111 1
       * @param click_count 对应的20171111 1 的访问总数
       */
      case  class CourseClickCount(day_course:String,click_count:Long) {
      }
  
 

      /**
       * hbase操作工具类
       */
      public class HBaseUtils {
       HBaseAdmin admin = null;
       Configuration configuration = null;
      private HBaseUtils() {
       configuration = new Configuration();
       configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
       configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
      try {
       admin = new HBaseAdmin(configuration);
       } catch (IOException e) {
       e.printStackTrace();
       }
       }
      public static HBaseUtils instance = null;
      public static synchronized HBaseUtils getInstance() {
      if (instance == null) {
       instance = new HBaseUtils();
       }
      return instance;
       }
      /**
       * 根据表名获取table实例
       *
       * @param tableName 表名
       * @return
       */
      public HTable getTable(String tableName) {
       HTable hTable = null;
      try {
       hTable = new HTable(configuration, tableName);
       } catch (IOException e) {
       e.printStackTrace();
       }
      return hTable;
       }
      /**
       * 添加一条记录到Hbase表中
       *
       * @param tableName 表名
       * @param rowKey 表的rowkey
       * @param cf columnfamily名
       * @param column 列名
       * @param value 写入Habse表的值
       */
      public void put(String tableName, String rowKey, String cf, String column, String value) {
       HTable table = getTable(tableName);
       Put put = new Put(Bytes.toBytes(rowKey));
       put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
      try {
       table.put(put);
       } catch (IOException e) {
       e.printStackTrace();
       }
       }
      public static void main(String[] args) {
      // HTable hTable = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
      // System.out.println(hTable.getName().getNameAsString());
      //测试,插入一条数据
      String tableName = "imooc_course_clickcount";
      String rowKey = "20171111_88";
      String cf = "info";
      String column = "click_count";
      String value = "2";
       HBaseUtils.getInstance().put(tableName, rowKey, cf, column, value);
      //scan 'imooc_course_clickcount' 查看表的数据
       }
      }
  
 

      package com.dzx.scala.dzxbootscala.spark
      import com.dzx.scala.dzxbootscala.spark.dao.CourseClickCountDao
      import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import scala.collection.mutable.ListBuffer
      object ImoocStatStreamingApp {
        def main(args: Array[String]): Unit = {
      //args : hadoop000:2181 test streamingtopic 1
       def main(args: Array[String]): Unit = {
      if(args.length!=4){
       System.err.print("必须为四个参数")
       System.exit(1)
       }
       val Array(zkQuorum,group,topics,numThreads) =args
       val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
       val ssc = new StreamingContext(sparkConf, Seconds(60))
      //todo spark streaming 对接kafka
       val topicMap  = topics.split(",").map((_,numThreads.toInt)).toMap
       val messages =   KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
      // todo 测试为什么要取第二个
      // messages.map(_._2).count().print()
      //messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
       val logs = messages.map(_._2)
       val cleanData =  logs.map(line =>{
      //infos(2) = "GET /class/130.html HTTP/1.1"
       val infos= line.split("\t")
       val url =infos(2).split(" ")(1)
      var courseId = 0
      if(url.startsWith("/class")){
       val courseIdHtml  =   url.split("/")(2)
      //获取到 课程编号
       courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
       }
       ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
       }).filter(clicklog=>clicklog.courseId!=0)
      // cleanData.print()
       // /**
      // * 数据清洗结果类似如下:
      // * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
      // * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
      // * 到数据清洗完为止,日志中只包含了实战课程的日志
      // * /
      //功能1:今天到现在为止,实战课程 的访问量
      //yyyyMMdd courseId
      //使用 数据库来进行 存储我们的统计结果
      // spark streaming 把统计结果写入到数据库里面
      // 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
      // 选择什么数据库作为统计结果的存储呢
      // RDBMS:MySQL,oracle ...
      // day course_id click_count
      // 20171111 1 10
      // 20171111 2 10
      // 下一个批次数据进来之后
      // click_count + 下一个批次统计结果
      // NoSql : HBase、Redis
      // Hbase: 一个api就能搞定,非常方便
      // 这就是 我们为什么要选择Hbase的一个原因所在
      // 前提:启动 hdfs ,zk,hbase
      //hbase表设计
      // create 'imooc_course_clickcount', 'info'
      // 查看表结构
      //desc 'imooc_course_clickcount'
      // RowKey设计
      // day_courseid
      //如何 使用 scala 操作 hbase
       cleanData.map(x=>{
       (x.time.substring(0,8)+"_"+x.courseId,1)
       }).reduceByKey(_+_).foreachRDD(rdd=>{
       rdd.foreachPartition(partitionRecords =>{
       val list  = new ListBuffer[CourseClickCount]
       partitionRecords.foreach(pair=>{
       list.append(CourseClickCount(pair._1,pair._2))
       })
      //将数据存入到hbase
       CourseClickCountDao.save(list)
       })
       })
       ssc.start()
       ssc.awaitTermination()
       }
        }
      }
  
 

使用步骤三中的flume启动命令和 kafka消费者的启动 命令,使用idea 将 需要传入的四个 参数 ,传入main方法之后执行ImoocStatStreamingApp ,执行 hbase  shell ,等待一分钟,执行 scan 'imooc_course_clickcount' ,查看当前表中是否有数据进来,再次等待一分钟,发现每个 rowkey对应的课程点击数量自动进行了累加更新。

功能2:统计今天到现在为止从搜索引擎引流过来的实战课程的访问量

Hbase 表设计:创建表

create 'imooc_course_search_clickcount','info'

设计rowkey:根据我们的业务需求进行设计


      /**
       * 从搜索引擎过来的实战课程点击数实体类
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月23日 17:06:16
       */
      case  class CourseSearchClickCount(day_search_course:String,click_count:Long) {
      }
  
 

      /**
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月22日 15:57:02
       */
      object CourseSearchClickCountDao {
        val tableName =  "imooc_course_search_clickcount"
        val cf = "info"
        val qualifer  = "click_count"
       /**
       * 保存数据到hbase中
       * @param list
       */
        def  save(list:ListBuffer[CourseSearchClickCount]): Unit ={
       val table = HBaseUtils.getInstance().getTable(tableName)
      for(ele <- list){
      //保存的基础上自动累加值大小 比如第一次访问量为8 ,第二次访问量为10,再次保存时访问量就
      //自动累加为18 了。
       table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),Bytes.toBytes(cf),Bytes.toBytes(qualifer),ele.click_count)
       }
        }
       /**
       * 根据rowkey查询值
       * @param day_course
       * @return
       */
        def  count(day_search_course:String): Long ={
       val table = HBaseUtils.getInstance().getTable(tableName)
       val get  = new Get(Bytes.toBytes(day_search_course))
       val value = table.get(get).getValue(cf.getBytes(),qualifer.getBytes())
      if(value == null ){
      0L
       }else{
       Bytes.toLong(value)
       }
        }
        def main(args: Array[String]): Unit = {
       val list  =new ListBuffer[CourseSearchClickCount]
      list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
      list.append(CourseSearchClickCount("20171111_www.sougou.com_9",9))
      list.append(CourseSearchClickCount("20171111_www.google.com_1",100))
       save(list)
       println(count("20171111_www.baidu.com_8")+","+count("20171111_www.sougou.com_9")+","+count("20171111_www.google.com_1"))
        }
      }
  
 

      package com.dzx.scala.dzxbootscala.spark
      import com.dzx.scala.dzxbootscala.spark.dao.{CourseClickCountDao, CourseSearchClickCountDao}
      import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount, CourseSearchClickCount}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import scala.collection.mutable.ListBuffer
      object ImoocStatStreamingApp {
        def main(args: Array[String]): Unit = {
      //args : hadoop000:2181 test streamingtopic 1
       def main(args: Array[String]): Unit = {
      if(args.length!=4){
       System.err.print("必须为四个参数")
       System.exit(1)
       }
       val Array(zkQuorum,group,topics,numThreads) =args
       val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[5]")
       val ssc = new StreamingContext(sparkConf, Seconds(60))
      //todo spark streaming 对接kafka
       val topicMap  = topics.split(",").map((_,numThreads.toInt)).toMap
       val messages =   KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
      // todo 测试为什么要取第二个
      // messages.map(_._2).count().print()
      //messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
       val logs = messages.map(_._2)
       val cleanData =  logs.map(line =>{
      //infos(2) = "GET /class/130.html HTTP/1.1"
       val infos= line.split("\t")
       val url =infos(2).split(" ")(1)
      var courseId = 0
      if(url.startsWith("/class")){
       val courseIdHtml  =   url.split("/")(2)
      //获取到 课程编号
       courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
       }
       ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
       }).filter(clicklog=>clicklog.courseId!=0)
       cleanData.print()
      // /**
      // * 数据清洗结果类似如下:
      // * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
      // * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
      // * 到数据清洗完为止,日志中只包含了实战课程的日志
      // * /
      //功能1:今天到现在为止,实战课程 的访问量
      //yyyyMMdd courseId
      //使用 数据库来进行 存储我们的统计结果
      // spark streaming 把统计结果写入到数据库里面
      // 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
      // 选择什么数据库作为统计结果的存储呢
      // RDBMS:MySQL,oracle ...
      // day course_id click_count
      // 20171111 1 10
      // 20171111 2 10
      // 下一个批次数据进来之后
      // click_count + 下一个批次统计结果
      // NoSql : HBase、Redis
      // Hbase: 一个api就能搞定,非常方便
      // 这就是 我们为什么要选择Hbase的一个原因所在
      // 前提:启动 hdfs ,zk,hbase
      //hbase表设计
      // create 'imooc_course_clickcount', 'info'
      // 查看表结构
      //desc 'imooc_course_clickcount'
      // RowKey设计
      // day_courseid
      //如何 使用 scala 操作 hbase
      //功能1:今天到现在为止,实战课程 的访问量
       cleanData.map(x=>{
       (x.time.substring(0,8)+"_"+x.courseId,1)
       }).reduceByKey(_+_).foreachRDD(rdd=>{
       rdd.foreachPartition(partitionRecords =>{
       val list  = new ListBuffer[CourseClickCount]
       partitionRecords.foreach(pair=>{
       list.append(CourseClickCount(pair._1,pair._2))
       })
       CourseClickCountDao.save(list)
       })
       })
       cleanData.map(x=>{
      /**
       * https://www.sougou.com/web?query=Spark sql实战
       */
       val referer = x.referer.replace("//","/")
       val splits = referer.split("/")
      var host = ""
      if(splits.length>2){
       host=splits(1)
       }
       (host,x.courseId,x.time)
       }).filter(_._1 !="").map(x=>{
       (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
       }).reduceByKey(_+_).foreachRDD(rdd=>{
       rdd.foreachPartition(partitionRecords =>{
       val list  = new ListBuffer[CourseSearchClickCount]
       partitionRecords.foreach(pair=>{
       list.append(CourseSearchClickCount(pair._1,pair._2))
       })
       CourseSearchClickCountDao.save(list)
       })
       })
       ssc.start()
       ssc.awaitTermination()
       }
        }
      }
  
 

执行 hbase  shell  命令, truncate ‘imooc_course_search_clickcount’ 先清空 imooc_course_search_clickcount表中的数据

,执行 date 命令,查看时间是否过去了一分钟,查看kafka 消费者控制台是否 接收到 了数据,之后执行 scan ‘imooc_course_search_clickcount’  命令 查看 hbase 中是否 插入 这一批次的数据。

五、将项目打包运行到服务器环境中
 

将 ImoocStatStreamingApp.java文件中的

.setMaster("local[5]") 注释掉,因为这个参数一般再提交spark 任务的时候去指定


mvn  clean package -DskipTests 打包项目中可能会报错误,原因是因为 scala文件中import 了java 中的类。

解决方法 :将 pom.xml中的 <build>标签中的 

<sourceDirectory>src/main/scala</sourceDirectory>

<testSourceDirectory>src/test/scala</testSourceDirectory>

将打包后的sparktrain-1.0.jar上传到linux服务器,

启动spark 任务


      ./spark-submit --master local[5] \
      --jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
      --class com.imooc.spark.project.spark.ImoocStatStreamingApp \
      --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
      /home/hadoop/lib/sparktrain-1.0.jar \
      hadoop000:2181 test streamingtopic 1
  
 

等待一分钟之后,spark streaming 就会接收到 kafka 传来的数据,scan 'imooc_course_search_clickcount'

可看到搜索引擎引流过来的课程 点击 次数 有了变化(次数增加)。

六、spring boot 集成 hbase ,Echarts 可视化项目实战 

项目链接地址:https://gitee.com/dzxmy/imooc_web_visualization

推荐一个大数据可视化神器阿里云DataV https://data.aliyun.com/visual/datav?spm=5176.8142029.dataTechnology.6.3dbd6d3evCLMCZ

七、使用java 开发 spark 应用程序

1、使用java 读取 文件中的单词,完成词频统计


      package com.dzx.scala.dzxbootscala.java_spark;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.streaming.Duration;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import scala.Tuple2;
      import java.util.Arrays;
      import java.util.List;
      /**
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月25日 11:32:06
       */
      public class WordCountApp {
      public static void main(String[] args) throws InterruptedException {
      SparkSession spark = SparkSession.builder().appName("WordCountApp")
       .master("local[2]").getOrCreate();
      /**
       * hello.txt
       * hello world
       * world
       * welcome hello
       */
      JavaRDD<String> lines =  spark.read().textFile("/opt/data/hello.txt").javaRDD();
      JavaRDD<String> words = lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator());
      JavaPairRDD<String,Integer> counts = words.mapToPair(word->new Tuple2<String,Integer>(word,1))
       .reduceByKey((x,y)->x+y);
      List<Tuple2<String, Integer>> output = counts.collect();
      for(Tuple2<String,Integer>  tupln :output){
      System.out.println(tupln._1()+":"+tupln._2());
       }
       spark.stop();
      }
      }
  
 

2、使用java 监听 socket 网络 数据 ,完成词频统计


      package com.dzx.scala.dzxbootscala.java_spark;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.streaming.Duration;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import scala.Tuple2;
      import java.util.Arrays;
      import java.util.List;
      /**
       * @author DuanZhaoXu
       * @ClassName:
       * @Description:
       * @date 2019年01月25日 11:32:06
       */
      public class WordCountApp {
      public static void main(String[] args) throws InterruptedException {
      SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountApp");
      JavaStreamingContext jssc =new JavaStreamingContext(conf, Durations.seconds(5));
      // nc -lk 9999
      //创建一个Dstream
      JavaReceiverInputDStream<String> lines  = jssc.socketTextStream("localhost", 9999);
      JavaPairDStream<String,Integer> counts =  lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator()).mapToPair(word->new Tuple2<String,Integer>(word,1))
       .reduceByKey((x,y)->x+y);
       counts.print();
       jssc.start();
       jssc.awaitTermination();
       }
      }
  
 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/86593028

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。