spark streaming 实时流处理项目实战
spark streaming 实时流处理项目实战
一、 编写python 日志产生脚本,并使用linux 的crontab 命令 一分钟执行一次
tool.lu/crontab 网站 可以测试 cron表达式
cd /home/hadoop/data/project
将以下python 脚本放到 /home/hadoop/data/project 下
-
#coding=UTF-8
-
import random
-
import time
-
-
url_paths = [
-
"class/112.html",
-
"class/128.html",
-
"class/145.html",
-
"class/146.html",
-
"class/131.html",
-
"class/130.html",
-
"learn/821",
-
"course/list"
-
]
-
-
-
ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98]
-
-
http_referers = [
-
"http://www.baidu.com/?wd={query}",
-
"https://www.sogou.com/web?query={query}",
-
"http://cn.bing.com/search?q={query}",
-
"https://search.yahoo.com/search?p={query}",
-
]
-
-
search_keyword = [
-
"Spark sql实战",
-
"Hadoop基础",
-
"Strom实战",
-
"Spark streaming实战",
-
"大数据面试"]
-
-
status_code = ["200","404","500"]
-
-
def sample_url():
-
return random.sample(url_paths,1)[0]
-
-
def sample_ip():
-
slice = random.sample(ip_slices,4)
-
return ".".join([str(item) for item in slice])
-
-
def sample_referer():
-
if random.uniform(0,1)>0.2:
-
return "-"
-
refer_str =random.sample(http_referers,1)
-
query_str =random.sample(search_keyword,1)
-
return refer_str[0].format(query=query_str[0])
-
-
def sample_status_code():
-
return random.sample(status_code,1)[0]
-
-
-
def generate_log(count=10):
-
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
-
f = open("/home/hadoop/data/project/logs/access.log","w+")
-
#f = open("D:\\access.log","w+")
-
while count >=1:
-
query_log ="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(local_time=time_str,url=sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code())
-
print(query_log)
-
f.write(query_log+"\n")
-
count = count -1
-
-
if __name__ == '__main__':
-
generate_log(50)
-
-
# python命令执行py脚本
-
#python generate_log.py
cd /home/hadoop/data/project
编写shell脚本
vim log_generator.sh
python /home/hadoop/data/project/generate_log.py
chmod u+x log_generator.sh
./log_generator.sh
crontab -e
*/1 * * * * /home/hadoop/data/project/log_generator.sh
tail -300f /home/hadoop/data/project/logs/access.log 查看日志
执行 date 命令,查看是否过了一分钟,当过去了1分钟之后,再次查看 access.log日志文件,发现有日志生成,
并且1分钟生成一次。
二、对接python 日志产生器输出的日志到flume
cd $Flume_HOME/conf
编写 配置文件 streaming_project.config
-
#执行source,sink,channel名称
-
exec-memory-logger.sources = exec-source
-
exec-memory-logger.sinks = logger-sink
-
exec-memory-logger.channels =memory-channel
-
#配置source 类型为exec ,从文件读取
-
exec-memory-logger.sources.exec-source.type = exec
-
exec-memory-logger.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
-
exec-memory-logger.sources.exec-source.shell = /bin/sh -c
-
-
#配置channel类型为memory
-
exec-memory-loggers.channels.memory-channel.type = memory
-
#配置sink的类型为logger
-
exec-memory-logger.sinks.logger-sink.type = logger
-
-
#将 source ,channel ,sink 串起来
-
exec-memory-logger.sources.exec-source.channels =memory-channel
-
exec-memory-logger.sinks.logger-sink.channel = memory-channel
-
flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/streaming_project.conf \
-Dflume.root.logger = INFO,console
等待一分钟,之后看到控制台获取到了 access.log 文件新产生的 日志信息。
三、将flume 采集的日志信息 输出到 kafka
启动 zk
./zkServer.sh start
启动 kafka
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
jps 查看 zk和kafka 是否启动成功
修改flume 的配置文件输出到kafka
vim streaming_project2.conf
-
#执行source,sink,channel名称
-
exec-memory-kafka.sources = exec-source
-
exec-memory-kafka.sinks = kafka-sink
-
exec-memory-kafka.channels =memory-channel
-
#配置source 类型为exec ,从文件读取
-
exec-memory-kafka.sources.exec-source.type = exec
-
exec-memory-kafka.sources.exec-source.command=tail-F /home/hadoop/data/project/logs/access.lg
-
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
-
-
#配置channel类型为memory
-
exec-memory-kafka.channels.memory-channel.type = memory
-
-
#配置sink的类型为kafka
-
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafak.KafkaSink
-
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
-
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
-
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
-
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
-
-
#将 source ,channel ,sink 串起来
-
exec-memory-kafka.sources.exec-source.channels =memory-channel
-
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动kafka消费者进行监听:
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic
启动flume
flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/data/project/streaming_project2.conf \
-Dflume.root.logger = INFO,console
等待一分钟后看到 kafka 消费者,输出了flume 采集到的日志信息
四、Spark streaming 对接 kafka 进行消费
功能1:今天到现在为止,实战课程 的访问量
-
/**
-
* 清洗后的日志信息
-
* @param ip 日志访问的ip
-
* @param time 日志访问的时间
-
* @param courseId 日志访问的课程编号
-
* @param statusCode 状态码
-
* @param referer 日志访问的referer信息,途径(日志从哪里过来的)
-
*/
-
case class ClickLog (ip:String,time:String,courseId:Int,statusCode:Int,referer:String){
-
-
}
-
/**
-
* 实战课程点击数
-
* @param day_course 对应的就是hbase 中的rowkey,20171111 1
-
* @param click_count 对应的20171111 1 的访问总数
-
*/
-
case class CourseClickCount(day_course:String,click_count:Long) {
-
-
-
}
-
/**
-
* hbase操作工具类
-
*/
-
public class HBaseUtils {
-
-
-
HBaseAdmin admin = null;
-
Configuration configuration = null;
-
-
private HBaseUtils() {
-
configuration = new Configuration();
-
configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
-
configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
-
-
try {
-
admin = new HBaseAdmin(configuration);
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
public static HBaseUtils instance = null;
-
-
public static synchronized HBaseUtils getInstance() {
-
if (instance == null) {
-
instance = new HBaseUtils();
-
}
-
return instance;
-
}
-
-
-
/**
-
* 根据表名获取table实例
-
*
-
* @param tableName 表名
-
* @return
-
*/
-
public HTable getTable(String tableName) {
-
HTable hTable = null;
-
try {
-
hTable = new HTable(configuration, tableName);
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
return hTable;
-
}
-
-
/**
-
* 添加一条记录到Hbase表中
-
*
-
* @param tableName 表名
-
* @param rowKey 表的rowkey
-
* @param cf columnfamily名
-
* @param column 列名
-
* @param value 写入Habse表的值
-
*/
-
public void put(String tableName, String rowKey, String cf, String column, String value) {
-
HTable table = getTable(tableName);
-
Put put = new Put(Bytes.toBytes(rowKey));
-
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
-
try {
-
table.put(put);
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
-
}
-
-
-
public static void main(String[] args) {
-
// HTable hTable = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
-
// System.out.println(hTable.getName().getNameAsString());
-
-
//测试,插入一条数据
-
String tableName = "imooc_course_clickcount";
-
String rowKey = "20171111_88";
-
String cf = "info";
-
String column = "click_count";
-
String value = "2";
-
HBaseUtils.getInstance().put(tableName, rowKey, cf, column, value);
-
//scan 'imooc_course_clickcount' 查看表的数据
-
}
-
}
-
package com.dzx.scala.dzxbootscala.spark
-
-
import com.dzx.scala.dzxbootscala.spark.dao.CourseClickCountDao
-
import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount}
-
import org.apache.spark.SparkConf
-
import org.apache.spark.streaming.kafka.KafkaUtils
-
import org.apache.spark.streaming.{Seconds, StreamingContext}
-
-
import scala.collection.mutable.ListBuffer
-
-
object ImoocStatStreamingApp {
-
-
def main(args: Array[String]): Unit = {
-
//args : hadoop000:2181 test streamingtopic 1
-
def main(args: Array[String]): Unit = {
-
if(args.length!=4){
-
System.err.print("必须为四个参数")
-
System.exit(1)
-
}
-
val Array(zkQuorum,group,topics,numThreads) =args
-
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
-
val ssc = new StreamingContext(sparkConf, Seconds(60))
-
//todo spark streaming 对接kafka
-
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
-
val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
-
// todo 测试为什么要取第二个
-
// messages.map(_._2).count().print()
-
-
-
//messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
-
val logs = messages.map(_._2)
-
-
val cleanData = logs.map(line =>{
-
//infos(2) = "GET /class/130.html HTTP/1.1"
-
val infos= line.split("\t")
-
val url =infos(2).split(" ")(1)
-
var courseId = 0
-
if(url.startsWith("/class")){
-
val courseIdHtml = url.split("/")(2)
-
//获取到 课程编号
-
courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
-
}
-
-
ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
-
}).filter(clicklog=>clicklog.courseId!=0)
-
-
// cleanData.print()
-
// /**
-
// * 数据清洗结果类似如下:
-
// * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
-
// * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
-
// * 到数据清洗完为止,日志中只包含了实战课程的日志
-
// * /
-
-
//功能1:今天到现在为止,实战课程 的访问量
-
//yyyyMMdd courseId
-
-
//使用 数据库来进行 存储我们的统计结果
-
// spark streaming 把统计结果写入到数据库里面
-
// 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
-
// 选择什么数据库作为统计结果的存储呢
-
// RDBMS:MySQL,oracle ...
-
// day course_id click_count
-
// 20171111 1 10
-
// 20171111 2 10
-
// 下一个批次数据进来之后
-
// click_count + 下一个批次统计结果
-
-
// NoSql : HBase、Redis
-
// Hbase: 一个api就能搞定,非常方便
-
// 这就是 我们为什么要选择Hbase的一个原因所在
-
-
// 前提:启动 hdfs ,zk,hbase
-
-
//hbase表设计
-
// create 'imooc_course_clickcount', 'info'
-
// 查看表结构
-
//desc 'imooc_course_clickcount'
-
// RowKey设计
-
// day_courseid
-
-
//如何 使用 scala 操作 hbase
-
cleanData.map(x=>{
-
(x.time.substring(0,8)+"_"+x.courseId,1)
-
}).reduceByKey(_+_).foreachRDD(rdd=>{
-
rdd.foreachPartition(partitionRecords =>{
-
val list = new ListBuffer[CourseClickCount]
-
-
partitionRecords.foreach(pair=>{
-
list.append(CourseClickCount(pair._1,pair._2))
-
})
-
//将数据存入到hbase
-
CourseClickCountDao.save(list)
-
})
-
})
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
-
}
使用步骤三中的flume启动命令和 kafka消费者的启动 命令,使用idea 将 需要传入的四个 参数 ,传入main方法之后执行ImoocStatStreamingApp ,执行 hbase shell ,等待一分钟,执行 scan 'imooc_course_clickcount' ,查看当前表中是否有数据进来,再次等待一分钟,发现每个 rowkey对应的课程点击数量自动进行了累加更新。
功能2:统计今天到现在为止从搜索引擎引流过来的实战课程的访问量
Hbase 表设计:创建表
create 'imooc_course_search_clickcount','info'
设计rowkey:根据我们的业务需求进行设计
-
/**
-
* 从搜索引擎过来的实战课程点击数实体类
-
* @author DuanZhaoXu
-
* @ClassName:
-
* @Description:
-
* @date 2019年01月23日 17:06:16
-
*/
-
case class CourseSearchClickCount(day_search_course:String,click_count:Long) {
-
-
}
-
/**
-
* @author DuanZhaoXu
-
* @ClassName:
-
* @Description:
-
* @date 2019年01月22日 15:57:02
-
*/
-
object CourseSearchClickCountDao {
-
-
val tableName = "imooc_course_search_clickcount"
-
val cf = "info"
-
-
val qualifer = "click_count"
-
-
/**
-
* 保存数据到hbase中
-
* @param list
-
*/
-
def save(list:ListBuffer[CourseSearchClickCount]): Unit ={
-
val table = HBaseUtils.getInstance().getTable(tableName)
-
for(ele <- list){
-
//保存的基础上自动累加值大小 比如第一次访问量为8 ,第二次访问量为10,再次保存时访问量就
-
//自动累加为18 了。
-
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),Bytes.toBytes(cf),Bytes.toBytes(qualifer),ele.click_count)
-
}
-
-
}
-
-
/**
-
* 根据rowkey查询值
-
* @param day_course
-
* @return
-
*/
-
def count(day_search_course:String): Long ={
-
val table = HBaseUtils.getInstance().getTable(tableName)
-
val get = new Get(Bytes.toBytes(day_search_course))
-
-
val value = table.get(get).getValue(cf.getBytes(),qualifer.getBytes())
-
if(value == null ){
-
0L
-
}else{
-
Bytes.toLong(value)
-
}
-
}
-
-
def main(args: Array[String]): Unit = {
-
val list =new ListBuffer[CourseSearchClickCount]
-
list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
-
list.append(CourseSearchClickCount("20171111_www.sougou.com_9",9))
-
list.append(CourseSearchClickCount("20171111_www.google.com_1",100))
-
-
save(list)
-
println(count("20171111_www.baidu.com_8")+","+count("20171111_www.sougou.com_9")+","+count("20171111_www.google.com_1"))
-
}
-
-
}
-
package com.dzx.scala.dzxbootscala.spark
-
-
import com.dzx.scala.dzxbootscala.spark.dao.{CourseClickCountDao, CourseSearchClickCountDao}
-
import com.dzx.scala.dzxbootscala.spark.entity.{ClickLog, CourseClickCount, CourseSearchClickCount}
-
import org.apache.spark.SparkConf
-
import org.apache.spark.streaming.kafka.KafkaUtils
-
import org.apache.spark.streaming.{Seconds, StreamingContext}
-
-
import scala.collection.mutable.ListBuffer
-
-
object ImoocStatStreamingApp {
-
-
def main(args: Array[String]): Unit = {
-
//args : hadoop000:2181 test streamingtopic 1
-
def main(args: Array[String]): Unit = {
-
if(args.length!=4){
-
System.err.print("必须为四个参数")
-
System.exit(1)
-
}
-
val Array(zkQuorum,group,topics,numThreads) =args
-
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[5]")
-
val ssc = new StreamingContext(sparkConf, Seconds(60))
-
//todo spark streaming 对接kafka
-
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
-
val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
-
// todo 测试为什么要取第二个
-
// messages.map(_._2).count().print()
-
-
-
//messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
-
val logs = messages.map(_._2)
-
-
val cleanData = logs.map(line =>{
-
//infos(2) = "GET /class/130.html HTTP/1.1"
-
val infos= line.split("\t")
-
val url =infos(2).split(" ")(1)
-
var courseId = 0
-
if(url.startsWith("/class")){
-
val courseIdHtml = url.split("/")(2)
-
//获取到 课程编号
-
courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
-
}
-
-
ClickLog(infos(0),DateUtil.parseToMinute(infos(1)),courseId,infos(3)toInt,infos(4))
-
}).filter(clicklog=>clicklog.courseId!=0)
-
-
cleanData.print()
-
// /**
-
// * 数据清洗结果类似如下:
-
// * ClickLog(46.30.10.167,20171022151701,128,200,http://www.baidu.com?q=imooc)
-
// * ClickLog(192.168.19.200,20171022151701,138,200,http://www.baidu.com?q=imooc)
-
// * 到数据清洗完为止,日志中只包含了实战课程的日志
-
// * /
-
-
//功能1:今天到现在为止,实战课程 的访问量
-
//yyyyMMdd courseId
-
-
//使用 数据库来进行 存储我们的统计结果
-
// spark streaming 把统计结果写入到数据库里面
-
// 可视化前端根据:yyyyMMdd coureseId 把数据库里面的统计结果展示出来
-
// 选择什么数据库作为统计结果的存储呢
-
// RDBMS:MySQL,oracle ...
-
// day course_id click_count
-
// 20171111 1 10
-
// 20171111 2 10
-
// 下一个批次数据进来之后
-
// click_count + 下一个批次统计结果
-
-
// NoSql : HBase、Redis
-
// Hbase: 一个api就能搞定,非常方便
-
// 这就是 我们为什么要选择Hbase的一个原因所在
-
-
// 前提:启动 hdfs ,zk,hbase
-
-
//hbase表设计
-
// create 'imooc_course_clickcount', 'info'
-
// 查看表结构
-
//desc 'imooc_course_clickcount'
-
// RowKey设计
-
// day_courseid
-
-
//如何 使用 scala 操作 hbase
-
//功能1:今天到现在为止,实战课程 的访问量
-
cleanData.map(x=>{
-
(x.time.substring(0,8)+"_"+x.courseId,1)
-
}).reduceByKey(_+_).foreachRDD(rdd=>{
-
rdd.foreachPartition(partitionRecords =>{
-
val list = new ListBuffer[CourseClickCount]
-
-
partitionRecords.foreach(pair=>{
-
list.append(CourseClickCount(pair._1,pair._2))
-
})
-
CourseClickCountDao.save(list)
-
})
-
})
-
-
-
cleanData.map(x=>{
-
/**
-
* https://www.sougou.com/web?query=Spark sql实战
-
*/
-
val referer = x.referer.replace("//","/")
-
val splits = referer.split("/")
-
-
var host = ""
-
if(splits.length>2){
-
host=splits(1)
-
}
-
(host,x.courseId,x.time)
-
}).filter(_._1 !="").map(x=>{
-
(x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
-
}).reduceByKey(_+_).foreachRDD(rdd=>{
-
rdd.foreachPartition(partitionRecords =>{
-
val list = new ListBuffer[CourseSearchClickCount]
-
-
partitionRecords.foreach(pair=>{
-
list.append(CourseSearchClickCount(pair._1,pair._2))
-
})
-
CourseSearchClickCountDao.save(list)
-
})
-
})
-
-
ssc.start()
-
ssc.awaitTermination()
-
}
-
}
-
}
执行 hbase shell 命令, truncate ‘imooc_course_search_clickcount’ 先清空 imooc_course_search_clickcount表中的数据
,执行 date 命令,查看时间是否过去了一分钟,查看kafka 消费者控制台是否 接收到 了数据,之后执行 scan ‘imooc_course_search_clickcount’ 命令 查看 hbase 中是否 插入 这一批次的数据。
五、将项目打包运行到服务器环境中
将 ImoocStatStreamingApp.java文件中的
.setMaster("local[5]") 注释掉,因为这个参数一般再提交spark 任务的时候去指定
mvn clean package -DskipTests 打包项目中可能会报错误,原因是因为 scala文件中import 了java 中的类。
解决方法 :将 pom.xml中的 <build>标签中的
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
将打包后的sparktrain-1.0.jar上传到linux服务器,
启动spark 任务
-
./spark-submit --master local[5] \
-
--jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
-
--class com.imooc.spark.project.spark.ImoocStatStreamingApp \
-
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
-
/home/hadoop/lib/sparktrain-1.0.jar \
-
hadoop000:2181 test streamingtopic 1
等待一分钟之后,spark streaming 就会接收到 kafka 传来的数据,scan 'imooc_course_search_clickcount'
可看到搜索引擎引流过来的课程 点击 次数 有了变化(次数增加)。
六、spring boot 集成 hbase ,Echarts 可视化项目实战
项目链接地址:https://gitee.com/dzxmy/imooc_web_visualization
推荐一个大数据可视化神器阿里云DataV https://data.aliyun.com/visual/datav?spm=5176.8142029.dataTechnology.6.3dbd6d3evCLMCZ
七、使用java 开发 spark 应用程序
1、使用java 读取 文件中的单词,完成词频统计
-
package com.dzx.scala.dzxbootscala.java_spark;
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.sql.SparkSession;
-
import org.apache.spark.streaming.Duration;
-
import org.apache.spark.streaming.Durations;
-
import org.apache.spark.streaming.api.java.JavaPairDStream;
-
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
import scala.Tuple2;
-
-
import java.util.Arrays;
-
import java.util.List;
-
-
/**
-
* @author DuanZhaoXu
-
* @ClassName:
-
* @Description:
-
* @date 2019年01月25日 11:32:06
-
*/
-
public class WordCountApp {
-
-
-
public static void main(String[] args) throws InterruptedException {
-
SparkSession spark = SparkSession.builder().appName("WordCountApp")
-
.master("local[2]").getOrCreate();
-
/**
-
* hello.txt
-
* hello world
-
* world
-
* welcome hello
-
*/
-
JavaRDD<String> lines = spark.read().textFile("/opt/data/hello.txt").javaRDD();
-
-
JavaRDD<String> words = lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator());
-
-
JavaPairRDD<String,Integer> counts = words.mapToPair(word->new Tuple2<String,Integer>(word,1))
-
.reduceByKey((x,y)->x+y);
-
-
List<Tuple2<String, Integer>> output = counts.collect();
-
-
for(Tuple2<String,Integer> tupln :output){
-
System.out.println(tupln._1()+":"+tupln._2());
-
}
-
spark.stop();
-
}
-
}
2、使用java 监听 socket 网络 数据 ,完成词频统计
-
package com.dzx.scala.dzxbootscala.java_spark;
-
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.sql.SparkSession;
-
import org.apache.spark.streaming.Duration;
-
import org.apache.spark.streaming.Durations;
-
import org.apache.spark.streaming.api.java.JavaPairDStream;
-
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
import scala.Tuple2;
-
-
import java.util.Arrays;
-
import java.util.List;
-
-
/**
-
* @author DuanZhaoXu
-
* @ClassName:
-
* @Description:
-
* @date 2019年01月25日 11:32:06
-
*/
-
public class WordCountApp {
-
-
-
public static void main(String[] args) throws InterruptedException {
-
-
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountApp");
-
-
JavaStreamingContext jssc =new JavaStreamingContext(conf, Durations.seconds(5));
-
// nc -lk 9999
-
//创建一个Dstream
-
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
-
-
JavaPairDStream<String,Integer> counts = lines.flatMap(line-> Arrays.asList(line.split("\t")).iterator()).mapToPair(word->new Tuple2<String,Integer>(word,1))
-
.reduceByKey((x,y)->x+y);
-
-
counts.print();
-
-
jssc.start();
-
jssc.awaitTermination();
-
}
-
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/86593028
- 点赞
- 收藏
- 关注作者
评论(0)