Spark Streaming 教程 – 使用 Apache Spark 进行情感分析
Spark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。Spark Streaming 可用于流式传输实时数据,并且可以实时进行处理。Spark Streaming 不断增长的用户群由 Uber、Netflix 和 Pinterest 等家喻户晓的名字组成。
在实时数据分析方面,Spark Streaming 提供了一个单一平台来摄取数据以进行快速实时处理,Spark 认证同样证明了您的技能。通过这个博客,我将向您介绍 Spark Streaming 这个令人兴奋的新领域,我们将通过一个完整的用例, 使用 Spark Streaming 进行Twitter 情绪分析。
以下是本博客将涵盖的主题:
- 什么是流媒体?
- 为什么是 Spark Streaming?
- Spark Streaming 概述
- Spark 流功能
- Spark Streaming Fundamentals
5.1 Streaming Context
5.2 DStream
5.3 缓存/持久化
5.4 累加器、广播变量和检查点 - 用例 – Twitter 情绪分析
什么是流媒体?
数据流是一种传输数据的技术,以便它可以作为稳定和连续的流进行处理。随着互联网的发展,流媒体技术变得越来越重要。
图:什么是流媒体?
为什么是 Spark Streaming?
我们可以使用 Spark Streaming 从 Twitter、股票市场和地理系统等各种来源流式传输实时数据,并执行强大的分析来帮助企业。
图:为什么是 Spark Streaming?
Spark Streaming 概述
Spark Streaming用于处理实时流数据。它是对核心 Spark API 的有用补充。Spark Streaming 支持实时数据流的高吞吐量和容错流处理。
图: Spark Streaming 中的流
Spark 流功能
- 扩展: Spark Streaming 可以轻松扩展到数百个节点。
- 速度:它实现了低延迟。
- 容错: Spark 能够有效地从故障中恢复。
- 集成: Spark 集成了批处理和实时处理。
- 业务分析: Spark Streaming用于跟踪客户的行为,可用于业务分析。
Spark Streaming 工作流
Spark Streaming 工作流有四个高级阶段。第一个是从各种来源流式传输数据。这些源可以是用于实时流式传输的流数据源,例如 Akka、Kafka、Flume、AWS 或 Parquet。第二种来源包括 HBase、MySQL、PostgreSQL、Elastic Search、Mongo DB 和用于静态/批处理流的 Cassandra。一旦发生这种情况,Spark 可用于通过其 MLlib API 对数据执行机器学习。此外,Spark SQL 用于对这些数据执行进一步的操作。最后,流输出可以存储到各种数据存储系统中,如 HBase、Cassandra、MemSQL、Kafka、Elastic Search、HDFS 和本地文件系统。
图: Spark Streaming 概述
Spark Streaming 基础
流上下文
Streaming Context使用 Spark 中的数据流。它注册一个Input DStream以产生一个Receiver对象。它是 Spark 功能的主要入口点。Spark 提供了许多可从上下文访问的源的默认实现,例如 Twitter、Akka Actor 和 ZeroMQ。
StreamingContext 对象可以从 SparkContext 对象创建。SparkContext 表示与 Spark 集群的连接,可用于在该集群上创建 RDD、累加器和广播变量。
import org.apache.spark._
import org.apache.spark.streaming._
var ssc = new StreamingContext(sc,Seconds(1))
数据流
Discretized Stream (DStream) 是 Spark Streaming 提供的基本抽象。它是一个连续的数据流。它是从数据源或通过转换输入流生成的已处理数据流接收的。
图: 从输入 DStream 中提取单词
在内部,一个 DStream 由一系列连续的 RDD 表示,每个 RDD 包含来自某个间隔的数据。
输入 DStreams: 输入 DStreams是表示从流源接收的输入数据流的 DStreams。
图: Receiver 将数据发送到 Input DStream,其中每个 Batch 包含 RDD
每个输入 DStream 都与一个 Receiver 对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以供处理。
DStreams 上的转换:
任何应用在 DStream 上的操作都会转化为对底层 RDD 的操作。转换允许修改来自输入 DStream 的数据,类似于 RDD。DStreams 支持许多普通 Spark RDD 上可用的转换。
图:DStream 转换
以下是 DStreams 上的一些流行转换:
map(func) | map( func ) 通过将源 DStream 的每个元素传递给函数func 来返回一个新的 DStream 。 |
flatMap(func) | flatMap( func ) 与 map( func )类似,但每个输入项都可以映射到 0 个或多个输出项,并通过将每个源元素传递给函数func 来返回一个新的 DStream 。 |
filter(func) | filter( func ) 通过仅选择func返回 true的源 DStream 的记录来返回一个新的 DStream 。 |
reduce(func) | reduce( func ) 通过使用函数func聚合源 DStream 的每个 RDD 中的元素,返回单元素 RDD 的新 DStream 。 |
groupBy(func) | groupBy( func ) 返回新的 RDD,它基本上由一个键和该组的相应项目列表组成。 |
输出 DStreams:
输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。输出操作触发所有 DStream 转换的实际执行。
图:DStreams 上的输出操作
缓存
DStreams允许开发人员在内存中缓存/持久化流的数据。如果 DStream 中的数据将被多次计算,这很有用。这可以使用 DStream 上的persist()方法来完成。
图: 缓存到 2 个节点
对于通过网络接收数据的输入流(如Kafka、Flume、Sockets等), 默认持久性级别设置为将数据复制到两个节点以实现容错。
累加器、广播变量和检查点
累加器: 累加器是仅通过关联和交换操作添加的变量。它们用于实现计数器或总和。在 UI 中跟踪累加器对于了解运行阶段的进度很有用。Spark 本身支持数字累加器。我们可以创建命名或未命名的累加器。
广播变量: 广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起传送它的副本。它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用有效的广播算法来分发广播变量以降低通信成本。
检查点: 检查点类似于游戏中的检查点。它们使其 24/7 全天候运行,并使其能够适应与应用程序逻辑无关的故障。
图: Checkpoints的特点
用例 – Twitter 情绪分析
现在我们已经了解了 Spark Streaming 的核心概念,让我们使用 Spark Streaming 解决一个现实生活中的问题。
问题陈述: 设计一个 Twitter 情绪分析系统,在其中我们为危机管理、服务调整和目标营销填充实时情绪。
情感分析的应用:
- 预测一部电影的成功
- 预测政治竞选成功
- 决定是否投资某家公司
- 定向广告
- 查看产品和服务
Spark Streaming 实现:
找到下面的伪代码:
//Import the necessary packages into the Spark Program
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
...
import java.io.File
object twitterSentiment {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + "<access token> <access token secret> [<filters>]")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
//Passing our Twitter keys and tokens as arguments for authorization
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// Use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
...
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("twitterSentiment").setMaster("local[2]")
val ssc = new Streaming Context
val stream = TwitterUtils.createStream(ssc, None, filters)
//Input DStream transformation using flatMap
val tags = stream.flatMap { status => Get Text From The Hashtags }
//RDD transformation using sortBy and then map function
tags.countByValue()
.foreachRDD { rdd =>
val now = Get current time of each Tweet
rdd
.sortBy(_._2)
.map(x => (x, now))
//Saving our output at ~/twitter/ directory
.saveAsTextFile(s"~/twitter/$now")
}
//DStream transformation using filter and map functions
val tweets = stream.filter {t =>
val tags = t. Split On Spaces .filter(_.startsWith("#")). Convert To Lower Case
tags.exists { x => true }
}
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tagss = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tagss.toString())
}
data.print()
//Saving our output at ~/ with filenames starting like twitters
data.saveAsTextFiles("~/twitters","20000")
ssc.start()
ssc.awaitTermination()
}
}
结果:
以下是运行 Twitter Sentiment Streaming 程序时在 Eclipse IDE 中显示的结果。
图: Eclipse IDE 中的情绪分析输出
正如我们在屏幕截图中看到的,所有推文根据推文内容的情绪分为正面、中立和负面。
推文情绪的输出根据它们的创建时间存储到文件夹和文件中。此输出可以根据需要存储在本地文件系统或 HDFS 上。输出目录如下所示:
图: “twitter”项目文件夹中的输出文件夹
在这里,在 twitter 目录中,我们可以找到 Twitter 用户的用户名以及每条推文的时间戳,如下所示:
图: 包含带有时间戳的 Twitter 用户名的输出文件
现在我们已经获得了 Twitter 用户名和时间戳,让我们看看存储在主目录中的情绪和推文。在这里,每条推文都跟随着情感情绪。存储的这种情绪进一步用于分析公司的大量见解。
图:包含带有情绪的推文的输出文件
调整代码:
现在,让我们稍微修改我们的代码以获得特定主题标签(主题)的情绪。目前,美国总统唐纳德特朗普在新闻频道和在线社交媒体上流行。让我们看看与关键字“特朗普”相关的情绪。
图: 对带有“特朗普”关键字的推文进行情绪分析
前进:
正如我们从情绪分析演示中看到的那样,我们可以像对“特朗普”所做的那样提取特定主题的情绪。同样,情绪分析可以被世界各地的公司用于危机管理、服务调整和目标营销。
使用 Spark Streaming 进行情感分析的公司已应用相同的方法来实现以下目标:
- 提升客户体验
- 获得竞争优势
- 获得商业智能
- 振兴一个失败的品牌
至此,Spark Streaming 教程博客到此结束。到目前为止,您一定已经对什么是 Spark Streaming 有了充分的了解。Twitter 情绪分析用例将使您有必要的信心来处理您在 Spark Streaming 和 Apache Spark 中遇到的任何未来项目。实践是掌握任何主题的关键,我希望这篇博客能引起您对 Apache Spark 进一步探索的兴趣。
我们建议从 Edureka 的以下 Spark Streaming YouTube 教程开始:
- 点赞
- 收藏
- 关注作者
评论(0)