Spark Streaming 教程 – 使用 Apache Spark 进行情感分析

Donglian Lin 发表于 2021/12/13 21:16:02 2021/12/13
【摘要】 正如我们从情绪分析演示中看到的那样,我们可以像对“特朗普”所做的那样提取特定主题的情绪。同样,情绪分析可以被世界各地的公司用于危机管理、服务调整和目标营销。  使用 Spark Streaming 进行情感分析的公司已应用相同的方法来实现以下目标: 提升客户体验 获得竞争优势 获得商业智能 振兴一个失败的品牌

Spark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。Spark Streaming 可用于流式传输实时数据,并且可以实时进行处理。Spark Streaming 不断增长的用户群由 Uber、Netflix 和 Pinterest 等家喻户晓的名字组成。

在实时数据分析方面,Spark Streaming 提供了一个单一平台来摄取数据以进行快速实时处理,Spark 认证同样证明了您的技能。通过这个博客,我将向您介绍 Spark Streaming 这个令人兴奋的新领域,我们将通过一个完整的用例, 使用 Spark Streaming 进行Twitter 情绪分析

以下是本博客将涵盖的主题:

  1. 什么是流媒体?
  2. 为什么是 Spark Streaming?
  3. Spark Streaming 概述
  4. Spark 流功能
  5. Spark Streaming Fundamentals
    5.1  Streaming Context
    5.2  DStream
    5.3 缓存/持久化
    5.4 累加器、广播变量和检查点
  6. 用例 – Twitter 情绪分析

什么是流媒体?

数据流是一种传输数据的技术,以便它可以作为稳定和连续的流进行处理。随着互联网的发展,流媒体技术变得越来越重要。

什么是流 - Spark Streaming - Edureka

图:什么是流媒体?

为什么是 Spark Streaming?

我们可以使用 Spark Streaming 从 Twitter、股票市场和地理系统等各种来源流式传输实时数据,并执行强大的分析来帮助企业。 

Spark Streaming - Spark Streaming - Edureka

图:为什么是 Spark Streaming?

Spark Streaming 概述

Spark Streaming用于处理实时流数据。它是对核心 Spark API 的有用补充。Spark Streaming 支持实时数据流的高吞吐量和容错流处理。

Spark Streaming 概述 - Spark Streaming - Edureka

图: Spark Streaming 中的流 

Spark 流功能

  1. 扩展: Spark Streaming 可以轻松扩展到数百个节点。
  2. 速度:它实现了低延迟。
  3. 容错: Spark 能够有效地从故障中恢复。
  4. 集成:  Spark 集成了批处理和实时处理。
  5. 业务分析: Spark Streaming用于跟踪客户的行为,可用于业务分析。

Spark Streaming 工作流

Spark Streaming 工作流有四个高级阶段。第一个是从各种来源流式传输数据。这些源可以是用于实时流式传输的流数据源,例如 Akka、Kafka、Flume、AWS 或 Parquet。第二种来源包括 HBase、MySQL、PostgreSQL、Elastic Search、Mongo DB 和用于静态/批处理流的 Cassandra。一旦发生这种情况,Spark 可用于通过其 MLlib API 对数据执行机器学习。此外,Spark SQL 用于对这些数据执行进一步的操作。最后,流输出可以存储到各种数据存储系统中,如 HBase、Cassandra、MemSQL、Kafka、Elastic Search、HDFS 和本地文件系统。

概述 - Spark Streaming - Edureka

图: Spark Streaming 概述

Spark Streaming 基础

  1. 流上下文
  2. 数据流
  3. 缓存
  4. 累加器、广播变量和检查点

流上下文

Streaming Context使用 Spark 中的数据流。它注册一个Input DStream以产生一个Receiver对象。它是 Spark 功能的主要入口点。Spark 提供了许多可从上下文访问的源的默认实现,例如 Twitter、Akka Actor 和 ZeroMQ。

流上下文 - Spark Streaming - Edureka

StreamingContext 对象可以从 SparkContext 对象创建。SparkContext 表示与 Spark 集群的连接,可用于在该集群上创建 RDD、累加器和广播变量。

import org.apache.spark._
import org.apache.spark.streaming._
var ssc = new StreamingContext(sc,Seconds(1))

数据流

Discretized Stream (DStream) 是 Spark Streaming 提供的基本抽象。它是一个连续的数据流。它是从数据源或通过转换输入流生成的已处理数据流接收的。

DStream 操作 - Spark Streaming - Edureka

图: 从输入 DStream 中提取单词

在内部,一个 DStream 由一系列连续的 RDD 表示,每个 RDD 包含来自某个间隔的数据。

输入 DStreams: 输入 DStreams是表示从流源接收的输入数据流的 DStreams。 

输入 DStream - Spark Streaming - Edureka

图: Receiver 将数据发送到 Input DStream,其中每个 Batch 包含 RDD

每个输入 DStream 都与一个 Receiver 对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以供处理。

DStreams 上的转换:

任何应用在 DStream 上的操作都会转化为对底层 RDD 的操作。转换允许修改来自输入 DStream 的数据,类似于 RDD。DStreams 支持许多普通 Spark RDD 上可用的转换。 

DStream 转换 - Spark Streaming - Edureka

图:DStream 转换

 以下是 DStreams 上的一些流行转换:

map(func) map( func ) 通过将源 DStream 的每个元素传递给函数func 来返回一个新的 DStream  
flatMap(func) flatMap( func ) 与 map( func )类似,但每个输入项都可以映射到 0 个或多个输出项,并通过将每个源元素传递给函数func 来返回一个新的 DStream  
filter(func) filter( func ) 通过仅选择func返回 true的源 DStream 的记录来返回一个新的 DStream 。
reduce(func) reduce( func ) 通过使用函数func聚合源 DStream 的每个 RDD 中的元素,返回单元素 RDD 的新 DStream 。
groupBy(func) groupBy( func ) 返回新的 RDD,它基本上由一个键和该组的相应项目列表组成。

输出 DStreams: 

输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。输出操作触发所有 DStream 转换的实际执行。

输出操作 - Spark Streaming - Edureka

图:DStreams 上的输出操作

缓存

DStreams允许开发人员在内存中缓存/持久化流的数据。如果 DStream 中的数据将被多次计算,这很有用。这可以使用 DStream 上的persist()方法来完成。

缓存 - Spark Streaming - Edureka

图: 缓存到 2 个节点

对于通过网络接收数据的输入流(如Kafka、Flume、Sockets等), 默认持久性级别设置为将数据复制到两个节点以实现容错。

累加器、广播变量和检查点

累加器: 累加器是仅通过关联和交换操作添加的变量。它们用于实现计数器或总和。在 UI 中跟踪累加器对于了解运行阶段的进度很有用。Spark 本身支持数字累加器。我们可以创建命名或未命名的累加器。

广播变量: 广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起传送它的副本。它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用有效的广播算法来分发广播变量以降低通信成本。

检查点: 检查点类似于游戏中的检查点。它们使其 24/7 全天候运行,并使其能够适应与应用程序逻辑无关的故障。

检查点 - Spark Streaming - Edureka

图: Checkpoints的特点

用例 – Twitter 情绪分析

现在我们已经了解了 Spark Streaming 的核心概念,让我们使用 Spark Streaming 解决一个现实生活中的问题。

问题陈述: 设计一个 Twitter 情绪分析系统,在其中我们为危机管理、服务调整和目标营销填充实时情绪。

情感分析的应用:

  • 预测一部电影的成功
  • 预测政治竞选成功
  • 决定是否投资某家公司
  • 定向广告
  • 查看产品和服务

Spark Streaming 实现:

找到下面的伪代码:

//Import the necessary packages into the Spark Program
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
...
import java.io.File
 
object twitterSentiment {
 
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + "<access token> <access token secret> [<filters>]")
System.exit(1)
}
 
StreamingExamples.setStreamingLogLevels()
//Passing our Twitter keys and tokens as arguments for authorization
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
 
// Set the system properties so that Twitter4j library used by twitter stream
// Use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
...
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
 
val sparkConf = new SparkConf().setAppName("twitterSentiment").setMaster("local[2]")
val ssc = new Streaming Context
val stream = TwitterUtils.createStream(ssc, None, filters)
 
//Input DStream transformation using flatMap
val tags = stream.flatMap { status => Get Text From The Hashtags }
 
//RDD transformation using sortBy and then map function
tags.countByValue()
.foreachRDD { rdd =>
val now = Get current time of each Tweet
rdd
.sortBy(_._2)
.map(x => (x, now))
//Saving our output at ~/twitter/ directory
.saveAsTextFile(s"~/twitter/$now")
}
 
//DStream transformation using filter and map functions
val tweets = stream.filter {t =>
val tags = t. Split On Spaces .filter(_.startsWith("#")). Convert To Lower Case
tags.exists { x => true }
}
 
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tagss = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tagss.toString())
}
 
data.print()
//Saving our output at ~/ with filenames starting like twitters
data.saveAsTextFiles("~/twitters","20000")
 
ssc.start()
ssc.awaitTermination()
 }
}

结果:

以下是运行 Twitter Sentiment Streaming 程序时在 Eclipse IDE 中显示的结果。

Eclipse 输出 - Spark Streaming - Edureka

图: Eclipse IDE 中的情绪分析输出

正如我们在屏幕截图中看到的,所有推文根据推文内容的情绪分为正面、中立和负面。

推文情绪的输出根据它们的创建时间存储到文件夹和文件中。此输出可以根据需要存储在本地文件系统或 HDFS 上。输出目录如下所示:

输出目录 - Spark Streaming - Edureka

图: “twitter”项目文件夹中的输出文件夹

在这里,在 twitter 目录中,我们可以找到 Twitter 用户的用户名以及每条推文的时间戳,如下所示:

输出用户名 - Spark Streaming - Edureka

图: 包含带有时间戳的 Twitter 用户名的输出文件

现在我们已经获得了 Twitter 用户名和时间戳,让我们看看存储在主目录中的情绪和推文。在这里,每条推文都跟随着情感情绪。存储的这种情绪进一步用于分析公司的大量见解。

输出推文和情绪 - Spark Streaming - Edureka

图:包含带有情绪的推文的输出文件

调整代码:

现在,让我们稍微修改我们的代码以获得特定主题标签(主题)的情绪。目前,美国总统唐纳德特朗普在新闻频道和在线社交媒体上流行。让我们看看与关键字“特朗普”相关的情绪。 

唐纳德特朗普的情绪 - Spark Streaming - Edureka

图: 对带有“特朗普”关键字的推文进行情绪分析

前进:

正如我们从情绪分析演示中看到的那样,我们可以像对“特朗普”所做的那样提取特定主题的情绪。同样,情绪分析可以被世界各地的公司用于危机管理、服务调整和目标营销。 

使用 Spark Streaming 进行情感分析的公司已应用相同的方法来实现以下目标:

  1. 提升客户体验
  2. 获得竞争优势
  3. 获得商业智能
  4. 振兴一个失败的品牌

至此,Spark Streaming 教程博客到此结束。到目前为止,您一定已经对什么是 Spark Streaming 有了充分的了解。Twitter 情绪分析用例将使您有必要的信心来处理您在 Spark Streaming 和 Apache Spark 中遇到的任何未来项目。实践是掌握任何主题的关键,我希望这篇博客能引起您对 Apache Spark 进一步探索的兴趣。

我们建议从 Edureka 的以下 Spark Streaming YouTube 教程开始:

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。