spark streaming 实时流处理项目实战

举报
小米粒-biubiubiu 发表于 2020/12/03 00:37:36 2020/12/03
【摘要】                                              spark streaming  实时流处理项目实战 一、 编写python 日志产生脚本,并使用linux 的crontab 命令 一分钟执行一次 tool.lu/crontab 网站 可以测试 cron表达式 cd  /home/hadoop/data/project 将以下...

                                             spark streaming  实时流处理项目实战

一、 编写python 日志产生脚本,并使用linux 的crontab 命令 一分钟执行一次

tool.lu/crontab 网站 可以测试 cron表达式

cd  /home/hadoop/data/project

将以下python 脚本放到 /home/hadoop/data/project 下


  
  1. #coding=UTF-8
  2. import random
  3. import time
  4. url_paths = [
  5. "class/112.html",
  6. "class/128.html",
  7. "class/145.html",
  8. "class/146.html",
  9. "class/131.html",
  10. "class/130.html",
  11. "learn/821",
  12. "course/list"
  13. ]
  14. ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98]
  15. http_referers = [
  16. "http://www.baidu.com/?wd={query}",
  17. "https://www.sogou.com/web?query={query}",
  18. "http://cn.bing.com/search?q={query}",
  19. "https://search.yahoo.com/search?p={query}",
  20. ]
  21. search_keyword = [
  22. "Spark sql实战",
  23. "Hadoop基础",
  24. "Strom实战",
  25. "Spark streaming实战",
  26. "大数据面试"]
  27. status_code = ["200","404","500"]
  28. def sample_url():
  29. return random.sample(url_paths,1)[0]
  30. def sample_ip():
  31. slice = random.sample(ip_slices,4)
  32. return ".".join([str(item) for item in slice])
  33. def sample_referer():
  34. if random.uniform(0,1)>0.2:
  35. return "-"
  36. refer_str =random.sample(http_referers,1)
  37. query_str =random.sample(search_keyword,1)
  38. return refer_str[0].format(query=query_str[0])
  39. def sample_status_code():
  40. return random.sample(status_code,1)[0]
  41. def generate_log(count=10):
  42. time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
  43. f = open("/home/hadoop/data/project/logs/access.log","w+")
  44. #f = open("D:\\access.log","w+")
  45. while count >=1:
  46. query_log ="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(local_time=time_str,url=sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code())
  47. print(query_log)
  48. f.write(query_log+"\n")
  49. count = count -1
  50. if __name__ == '__main__':
  51. generate_log(50)
  52. # python命令执行py脚本
  53. #python generate_log.py

cd  /home/hadoop/data/project

编写shell脚本

vim  log_generator.sh 

python /home/hadoop/data/project/generate_log.py
 

chmod u+x log_generator.sh

./log_generator.sh

crontab   -e

*/1 * * * *  /home/hadoop/data/project/log_generator.sh
 

tail -300f /home/hadoop/data/project/logs/access.log 查看日志

执行 date 命令,查看是否过了一分钟,当过去了1分钟之后,再次查看 access.log日志文件,发现有日志生成,

并且1分钟生成一次。

二、对接python 日志产生器输出的日志到flume

cd  $Flume_HOME/conf

编写 配置文件  streaming_project.config


  
  1. #执行source,sink,channel名称
  2. exec-memory-logger.sources = exec-source
  3. exec-memory-logger.sinks = logger-sink
  4. exec-memory-logger.channels =memory-channel
  5. #配置source 类型为exec ,从文件读取
  6. exec-memory-logger.sources.exec-source.type = exec
  7. exec-memory-logger.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
  8. exec-memory-logger.sources.exec-source.shell = /bin/sh -c
  9. #配置channel类型为memory
  10. exec-memory-loggers.channels.memory-channel.type = memory
  11. #配置sink的类型为logger
  12. exec-memory-logger.sinks.logger-sink.type = logger
  13. #将 source ,channel ,sink 串起来
  14. exec-memory-logger.sources.exec-source.channels =memory-channel
  15. exec-memory-logger.sinks.logger-sink.channel = memory-channel

 

flume-ng agent \ 

--name exec-memory-logger \ 

--conf  $FLUME_HOME/conf \ 
--conf-file  $FLUME_HOME/streaming_project.conf \

-Dflume.root.logger = INFO,console

等待一分钟,之后看到控制台获取到了 access.log 文件新产生的 日志信息。

 

三、将flume 采集的日志信息 输出到 kafka 

启动 zk

./zkServer.sh  start

启动 kafka

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

jps 查看 zk和kafka 是否启动成功

修改flume 的配置文件输出到kafka

vim streaming_project2.conf


  
  1. #执行source,sink,channel名称
  2. exec-memory-kafka.sources = exec-source
  3. exec-memory-kafka.sinks = kafka-sink
  4. exec-memory-kafka.channels =memory-channel
  5. #配置source 类型为exec ,从文件读取
  6. exec-memory-kafka.sources.exec-source.type = exec
  7. exec-memory-kafka.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
  8. exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
  9. #配置channel类型为memory
  10. exec-memory-kafka.channels.memory-channel.type = memory
  11. #配置sink的类型为kafka
  12. exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafak.KafkaSink
  13. exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
  14. exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
  15. exec-memory-kafka.sinks.kafka-sink.batchSize = 5
  16. exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
  17. #将 source ,channel ,sink 串起来
  18. exec-memory-kafka.sources.exec-source.channels =memory-channel
  19. exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动kafka消费者进行监听:

kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic  streamingtopic

 

启动flume

flume-ng agent \

--name exec-memory-kafka \

--conf $FLUME_HOME/conf  \

--conf-file /home/hadoop/data/project/streaming_project2.conf \

-Dflume.root.logger = INFO,console

等待一分钟后看到  kafka 消费者,输出了flume 采集到的日志信息

四、Spark streaming 对接 kafka 进行消费

功能1:今天到现在为止,实战课程 的访问量


  
  1. /**
  2. * 清洗后的日志信息
  3. * @param ip 日志访问的ip
  4. * @param time 日志访问的时间
  5. * @param courseId 日志访问的课程编号
  6. * @param statusCode 状态码
  7. * @param referer 日志访问的referer信息,途径(日志从哪里过来的)
  8. */
  9. case class ClickLog (ip:String,time:String,courseId:Int,statusCode:Int,referer:String){
  10. }

  
  1. /**
  2. * 实战课程点击数
  3. * @param day_course 对应的就是hbase 中的rowkey,20171111 1
  4. * @param click_count 对应的20171111 1 的访问总数
  5. */
  6. case class CourseClickCount(day_course:String,click_count:Long) {
  7. }

  
  1. /**
  2. * hbase操作工具类
  3. */
  4. public class HBaseUtils {
  5. HBaseAdmin admin = null;
  6. Configuration configuration = null;
  7. private HBaseUtils() {
  8. configuration = new Configuration();
  9. configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
  10. configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
  11. try {
  12. admin = new HBaseAdmin(configuration);
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. public static HBaseUtils instance = null;
  18. public static synchronized HBaseUtils getInstance() {
  19. if (instance == null) {
  20. instance = new HBaseUtils();
  21. }
  22. return instance;
  23. }
  24. /**
  25. * 根据表名获取table实例
  26. *
  27. * @param tableName 表名
  28. * @return
  29. */
  30. public HTable getTable(String tableName) {
  31. HTable hTable = null;
  32. try {
  33. hTable = new HTable(configuration, tableName);
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. return hTable;
  38. }
  39. /**
  40. * 添加一条记录到Hbase表中
  41. *
  42. * @param tableName 表名
  43. * @param rowKey 表的rowkey
  44. * @param cf columnfamily名
  45. * @param column 列名
  46. * @param value 写入Habse表的值
  47. */
  48. public void put(String tableName, String rowKey, String cf, String column, String value) {
  49. HTable table = getTable(tableName);
  50. Put put = new Put(Bytes.toBytes(rowKey));
  51. put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
  52. try {
  53. table.put(put);
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. public static void main(String[] args) {
  59. // HTable hTable = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
  60. // System.out.println(hTable.getName().getNameAsString());
  61. //测试,插入一条数据
  62. String tableName = "imooc_course_clickcount";
  63. String rowKey = "20171111_88";
  64. String cf = "info";
  65. String column = "click_count";
  66. String value = "2";
  67. HBaseUtils.getInstance().put(tableName, rowKey, cf, column, value);
  68. //scan 'imooc_course_clickcount' 查看表的数据
  69. }
  70. }

  
  1. package com.dzx.scala.dzxbootscala.spark
  2. import com.dzx.scala.dzxbootscala.spark.dao.CourseClickCountDao
  3. import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount}
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.streaming.kafka.KafkaUtils
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. import scala.collection.mutable.ListBuffer
  8. object ImoocStatStreamingApp {
  9. def main(args: Array[String]): Unit = {
  10. //args : hadoop000:2181 test streamingtopic 1
  11. def main(args: Array[String]): Unit = {
  12. if(args.length!=4){
  13. System.err.print("必须为四个参数")
  14. System.exit(1)
  15. }
  16. val Array(zkQuorum,group,topics,numThreads) =args
  17. val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
  18. val ssc = new StreamingContext(sparkConf, Seconds(60))
  19. //todo spark streaming 对接kafka
  20. val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
  21. val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
  22. // todo 测试为什么要取第二个
  23. // messages.map(_._2).count().print()
  24. //messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  25. val logs = messages.map(_._2)
  26. val cleanData = logs.map(line =>{
  27. //infos(2) = "GET /class/130.html HTTP/1.1"
  28. val infos= line.split("\t")
  29. val url =infos(2).split(" ")(1)
  30. var courseId = 0
  31. if(url.startsWith("/class")){
  32. val courseIdHtml = url.split("/")(2)
  33. //获取到 课程编号
  34. courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
  35. }
  36. ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
  37. }).filter(clicklog=>clicklog.courseId!=0)
  38. // cleanData.print()
  39. // /**
  40. // * 数据清洗结果类似如下:
  41. // * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
  42. // * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
  43. // * 到数据清洗完为止,日志中只包含了实战课程的日志
  44. // * /
  45. //功能1:今天到现在为止,实战课程 的访问量
  46. //yyyyMMdd courseId
  47. //使用 数据库来进行 存储我们的统计结果
  48. // spark streaming 把统计结果写入到数据库里面
  49. // 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
  50. // 选择什么数据库作为统计结果的存储呢
  51. // RDBMS:MySQL,oracle ...
  52. // day course_id click_count
  53. // 20171111 1 10
  54. // 20171111 2 10
  55. // 下一个批次数据进来之后
  56. // click_count + 下一个批次统计结果
  57. // NoSql : HBase、Redis
  58. // Hbase: 一个api就能搞定,非常方便
  59. // 这就是 我们为什么要选择Hbase的一个原因所在
  60. // 前提:启动 hdfs ,zk,hbase
  61. //hbase表设计
  62. // create 'imooc_course_clickcount', 'info'
  63. // 查看表结构
  64. //desc 'imooc_course_clickcount'
  65. // RowKey设计
  66. // day_courseid
  67. //如何 使用 scala 操作 hbase
  68. cleanData.map(x=>{
  69. (x.time.substring(0,8)+"_"+x.courseId,1)
  70. }).reduceByKey(_+_).foreachRDD(rdd=>{
  71. rdd.foreachPartition(partitionRecords =>{
  72. val list = new ListBuffer[CourseClickCount]
  73. partitionRecords.foreach(pair=>{
  74. list.append(CourseClickCount(pair._1,pair._2))
  75. })
  76. //将数据存入到hbase
  77. CourseClickCountDao.save(list)
  78. })
  79. })
  80. ssc.start()
  81. ssc.awaitTermination()
  82. }
  83. }
  84. }

使用步骤三中的flume启动命令和 kafka消费者的启动 命令,使用idea 将 需要传入的四个 参数 ,传入main方法之后执行ImoocStatStreamingApp ,执行 hbase  shell ,等待一分钟,执行 scan 'imooc_course_clickcount' ,查看当前表中是否有数据进来,再次等待一分钟,发现每个 rowkey对应的课程点击数量自动进行了累加更新。

功能2:统计今天到现在为止从搜索引擎引流过来的实战课程的访问量

Hbase 表设计:创建表

create 'imooc_course_search_clickcount','info'

设计rowkey:根据我们的业务需求进行设计


  
  1. /**
  2. * 从搜索引擎过来的实战课程点击数实体类
  3. * @author DuanZhaoXu
  4. * @ClassName:
  5. * @Description:
  6. * @date 2019年01月23日 17:06:16
  7. */
  8. case class CourseSearchClickCount(day_search_course:String,click_count:Long) {
  9. }

 


  
  1. /**
  2. * @author DuanZhaoXu
  3. * @ClassName:
  4. * @Description:
  5. * @date 2019年01月22日 15:57:02
  6. */
  7. object CourseSearchClickCountDao {
  8. val tableName = "imooc_course_search_clickcount"
  9. val cf = "info"
  10. val qualifer = "click_count"
  11. /**
  12. * 保存数据到hbase中
  13. * @param list
  14. */
  15. def save(list:ListBuffer[CourseSearchClickCount]): Unit ={
  16. val table = HBaseUtils.getInstance().getTable(tableName)
  17. for(ele <- list){
  18. //保存的基础上自动累加值大小 比如第一次访问量为8 ,第二次访问量为10,再次保存时访问量就
  19. //自动累加为18 了。
  20. table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),Bytes.toBytes(cf),Bytes.toBytes(qualifer),ele.click_count)
  21. }
  22. }
  23. /**
  24. * 根据rowkey查询值
  25. * @param day_course
  26. * @return
  27. */
  28. def count(day_search_course:String): Long ={
  29. val table = HBaseUtils.getInstance().getTable(tableName)
  30. val get = new Get(Bytes.toBytes(day_search_course))
  31. val value = table.get(get).getValue(cf.getBytes(),qualifer.getBytes())
  32. if(value == null ){
  33. 0L
  34. }else{
  35. Bytes.toLong(value)
  36. }
  37. }
  38. def main(args: Array[String]): Unit = {
  39. val list =new ListBuffer[CourseSearchClickCount]
  40. list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
  41. list.append(CourseSearchClickCount("20171111_www.sougou.com_9",9))
  42. list.append(CourseSearchClickCount("20171111_www.google.com_1",100))
  43. save(list)
  44. println(count("20171111_www.baidu.com_8")+","+count("20171111_www.sougou.com_9")+","+count("20171111_www.google.com_1"))
  45. }
  46. }

  
  1. package com.dzx.scala.dzxbootscala.spark
  2. import com.dzx.scala.dzxbootscala.spark.dao.{CourseClickCountDao, CourseSearchClickCountDao}
  3. import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount, CourseSearchClickCount}
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.streaming.kafka.KafkaUtils
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. import scala.collection.mutable.ListBuffer
  8. object ImoocStatStreamingApp {
  9. def main(args: Array[String]): Unit = {
  10. //args : hadoop000:2181 test streamingtopic 1
  11. def main(args: Array[String]): Unit = {
  12. if(args.length!=4){
  13. System.err.print("必须为四个参数")
  14. System.exit(1)
  15. }
  16. val Array(zkQuorum,group,topics,numThreads) =args
  17. val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[5]")
  18. val ssc = new StreamingContext(sparkConf, Seconds(60))
  19. //todo spark streaming 对接kafka
  20. val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
  21. val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
  22. // todo 测试为什么要取第二个
  23. // messages.map(_._2).count().print()
  24. //messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  25. val logs = messages.map(_._2)
  26. val cleanData = logs.map(line =>{
  27. //infos(2) = "GET /class/130.html HTTP/1.1"
  28. val infos= line.split("\t")
  29. val url =infos(2).split(" ")(1)
  30. var courseId = 0
  31. if(url.startsWith("/class")){
  32. val courseIdHtml = url.split("/")(2)
  33. //获取到 课程编号
  34. courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
  35. }
  36. ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
  37. }).filter(clicklog=>clicklog.courseId!=0)
  38. cleanData.print()
  39. // /**
  40. // * 数据清洗结果类似如下:
  41. // * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
  42. // * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
  43. // * 到数据清洗完为止,日志中只包含了实战课程的日志
  44. // * /
  45. //功能1:今天到现在为止,实战课程 的访问量
  46. //yyyyMMdd courseId
  47. //使用 数据库来进行 存储我们的统计结果
  48. // spark streaming 把统计结果写入到数据库里面
  49. // 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
  50. // 选择什么数据库作为统计结果的存储呢
  51. // RDBMS:MySQL,oracle ...
  52. // day course_id click_count
  53. // 20171111 1 10
  54. // 20171111 2 10
  55. // 下一个批次数据进来之后
  56. // click_count + 下一个批次统计结果
  57. // NoSql : HBase、Redis
  58. // Hbase: 一个api就能搞定,非常方便
  59. // 这就是 我们为什么要选择Hbase的一个原因所在
  60. // 前提:启动 hdfs ,zk,hbase
  61. //hbase表设计
  62. // create 'imooc_course_clickcount', 'info'
  63. // 查看表结构
  64. //desc 'imooc_course_clickcount'
  65. // RowKey设计
  66. // day_courseid
  67. //如何 使用 scala 操作 hbase
  68. //功能1:今天到现在为止,实战课程 的访问量
  69. cleanData.map(x=>{
  70. (x.time.substring(0,8)+"_"+x.courseId,1)
  71. }).reduceByKey(_+_).foreachRDD(rdd=>{
  72. rdd.foreachPartition(partitionRecords =>{
  73. val list = new ListBuffer[CourseClickCount]
  74. partitionRecords.foreach(pair=>{
  75. list.append(CourseClickCount(pair._1,pair._2))
  76. })
  77. CourseClickCountDao.save(list)
  78. })
  79. })
  80. cleanData.map(x=>{
  81. /**
  82. * https://www.sougou.com/web?query=Spark sql实战
  83. */
  84. val referer = x.referer.replace("//","/")
  85. val splits = referer.split("/")
  86. var host = ""
  87. if(splits.length>2){
  88. host=splits(1)
  89. }
  90. (host,x.courseId,x.time)
  91. }).filter(_._1 !="").map(x=>{
  92. (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
  93. }).reduceByKey(_+_).foreachRDD(rdd=>{
  94. rdd.foreachPartition(partitionRecords =>{
  95. val list = new ListBuffer[CourseSearchClickCount]
  96. partitionRecords.foreach(pair=>{
  97. list.append(CourseSearchClickCount(pair._1,pair._2))
  98. })
  99. CourseSearchClickCountDao.save(list)
  100. })
  101. })
  102. ssc.start()
  103. ssc.awaitTermination()
  104. }
  105. }
  106. }

执行 hbase  shell  命令, truncate ‘imooc_course_search_clickcount’ 先清空 imooc_course_search_clickcount表中的数据

,执行 date 命令,查看时间是否过去了一分钟,查看kafka 消费者控制台是否 接收到 了数据,之后执行 scan ‘imooc_course_search_clickcount’  命令 查看 hbase 中是否 插入 这一批次的数据。

五、将项目打包运行到服务器环境中
 

将 ImoocStatStreamingApp.java文件中的

.setMaster("local[5]") 注释掉,因为这个参数一般再提交spark 任务的时候去指定


mvn  clean package -DskipTests 打包项目中可能会报错误,原因是因为 scala文件中import 了java 中的类。

解决方法 :将 pom.xml中的 <build>标签中的 

<sourceDirectory>src/main/scala</sourceDirectory>

<testSourceDirectory>src/test/scala</testSourceDirectory>

将打包后的sparktrain-1.0.jar上传到linux服务器,

启动spark 任务


  
  1. ./spark-submit --master local[5] \
  2. --jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
  3. --class com.imooc.spark.project.spark.ImoocStatStreamingApp \
  4. --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
  5. /home/hadoop/lib/sparktrain-1.0.jar \
  6. hadoop000:2181 test streamingtopic 1

等待一分钟之后,spark streaming 就会接收到 kafka 传来的数据,scan 'imooc_course_search_clickcount'

可看到搜索引擎引流过来的课程 点击 次数 有了变化(次数增加)。

六、spring boot 集成 hbase ,Echarts 可视化项目实战 

项目链接地址:https://gitee.com/dzxmy/imooc_web_visualization

推荐一个大数据可视化神器阿里云DataV https://data.aliyun.com/visual/datav?spm=5176.8142029.dataTechnology.6.3dbd6d3evCLMCZ

七、使用java 开发 spark 应用程序

1、使用java 读取 文件中的单词,完成词频统计


  
  1. package com.dzx.scala.dzxbootscala.java_spark;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.streaming.Duration;
  7. import org.apache.spark.streaming.Durations;
  8. import org.apache.spark.streaming.api.java.JavaPairDStream;
  9. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  10. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  11. import scala.Tuple2;
  12. import java.util.Arrays;
  13. import java.util.List;
  14. /**
  15. * @author DuanZhaoXu
  16. * @ClassName:
  17. * @Description:
  18. * @date 2019年01月25日 11:32:06
  19. */
  20. public class WordCountApp {
  21. public static void main(String[] args) throws InterruptedException {
  22. SparkSession spark = SparkSession.builder().appName("WordCountApp")
  23. .master("local[2]").getOrCreate();
  24. /**
  25. * hello.txt
  26. * hello world
  27. * world
  28. * welcome hello
  29. */
  30. JavaRDD<String> lines = spark.read().textFile("/opt/data/hello.txt").javaRDD();
  31. JavaRDD<String> words = lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator());
  32. JavaPairRDD<String,Integer> counts = words.mapToPair(word->new Tuple2<String,Integer>(word,1))
  33. .reduceByKey((x,y)->x+y);
  34. List<Tuple2<String, Integer>> output = counts.collect();
  35. for(Tuple2<String,Integer> tupln :output){
  36. System.out.println(tupln._1()+":"+tupln._2());
  37. }
  38. spark.stop();
  39. }
  40. }

2、使用java 监听 socket 网络 数据 ,完成词频统计

 


  
  1. package com.dzx.scala.dzxbootscala.java_spark;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.streaming.Duration;
  7. import org.apache.spark.streaming.Durations;
  8. import org.apache.spark.streaming.api.java.JavaPairDStream;
  9. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  10. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  11. import scala.Tuple2;
  12. import java.util.Arrays;
  13. import java.util.List;
  14. /**
  15. * @author DuanZhaoXu
  16. * @ClassName:
  17. * @Description:
  18. * @date 2019年01月25日 11:32:06
  19. */
  20. public class WordCountApp {
  21. public static void main(String[] args) throws InterruptedException {
  22. SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountApp");
  23. JavaStreamingContext jssc =new JavaStreamingContext(conf, Durations.seconds(5));
  24. // nc -lk 9999
  25. //创建一个Dstream
  26. JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  27. JavaPairDStream<String,Integer> counts = lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator()).mapToPair(word->new Tuple2<String,Integer>(word,1))
  28. .reduceByKey((x,y)->x+y);
  29. counts.print();
  30. jssc.start();
  31. jssc.awaitTermination();
  32. }
  33. }

 

 

 

 

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/86593028

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。