Spark 与 Python 简介 – PySpark 初学者

举报
Donglian Lin 发表于 2021/12/06 16:53:07 2021/12/06
【摘要】 我希望你喜欢这个 Spark with Python 博客。如果您正在阅读本文,恭喜您!您不再是 PySpark 的新手。现在就在您的系统上试试这个简单的例子。

在处理和使用大数据方面, Apache Spark是使用最广泛的框架之一,而Python是用于数据分析、机器学习等的最广泛使用的编程语言之一。那么,为什么不一起使用它们呢?这就是Spark with Python也称为PySpark出现的地方

Apache Spark 简介

Apache Spark 是由 Apache 软件基金会开发的用于实时处理的开源集群计算框架。Spark 提供了一个接口,用于对具有隐式数据并行性容错性的整个集群进行编程

以下是 Apache Spark 的一些特性,这些特性使其比其他框架更具优势:

Spark 特性 - Spark with Python - Edureka

  • 速度:比传统的大规模数据处理框架快 100 倍。
  • 强大的缓存:简单的编程层提供了强大的缓存和磁盘持久化能力。
  • 部署: 可以通过 Mesos、Hadoop 通过 Yarn 或 Spark 自己的集群管理器进行部署。
  • 实时: 由于内存计算,实时计算和低延迟。
  • Polyglot:这是该框架最重要的特性之一,因为它可以用 Scala、Java、Python 和 R 进行编程。

为什么选择 Python?

虽然 Spark 是用 Scala 设计的,这使它比 Python 快了近 10 倍,但 Scala 只有在使用的内核数量较少时才能更快。由于现在大部分的分析和处理都需要大量的内核,所以Scala的性能优势并没有那么大。

对于程序员来说,Python相对容易 学习,因为它的语法和标准库。此外,它是一种动态类型语言,这意味着 RDD 可以保存多种类型的对象。

尽管 Scala 有SparkMLlib,但它没有足够的库和工具用于机器学习和 NLP目的。此外,Scala 缺乏数据可视化。

使用 Python (PySpark) 设置 Spark

我希望你们知道如何下载火花并安装它所以,一旦你解压了 spark 文件,安装了它并将它的路径添加到.bashrc文件中,你需要输入source .bashrc

export SPARK_HOME = /usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7/bin

要打开 pyspark shell,您需要输入命令  ./bin/pyspark

Pyspark shell - Spark with python - Edureka

Spark in Industry

Apache Spark 由于其惊人的功能,如内存处理多语言快速处理,正被全球许多公司用于各种行业的各种目的:

使用 Spark 的公司 - Spark with Python - Edureka

雅虎使用 Apache Spark 的机器学习功能来个性化其新闻、网页以及目标广告。他们使用 Spark 和 python 来找出什么样的新闻——用户有兴趣阅读和分类新闻故事,以找出什么样的用户有兴趣阅读每一类新闻。

TripAdvisor使用 apache spark 通过比较数百个网站来为客户找到最优惠的酒店价格,从而为数百万旅客提供建议。以可读格式阅读和处理酒店评论所需的时间是在 Apache Spark 的帮助下完成的。

作为全球最大的电子商务平台之一,阿里巴巴 运行着一些世界上最大的 Apache Spark 作业,以分析其电子商务平台上数百 PB 的数据。

PySpark SparkContext 和数据流

用 Python 谈论 Spark,使用 RDD 是由库 Py4j 实现的。PySpark Shell 将 Python API 链接到 Spark 核心并初始化 Spark 上下文。Spark Context是任何Spark 应用程序的核心。

  1. Spark 上下文设置内部服务并建立与 Spark 执行环境的连接。
  2. 驱动程序中的 sparkcontext 对象协调所有分布式进程并允许资源分配。
  3. Cluster Managers 提供Executors,它是带有逻辑的JVM 进程。
  4. SparkContext 对象将应用程序发送给执行程序。
  5. SparkContext 在每个执行器中执行任务。

Pyspark Sparkcontext - Spark 与 Python - Edureka

PySpark KDD 用例

现在让我们来看看KDD'99 Cup(国际知识发现和数据挖掘工具大赛)的一个用例。这里我们取一小部分数据集,因为原始数据集太大了

import urllib
f = urllib.urlretrieve ("<a href="http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz">http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz</a>", "kddcup.data_10_percent.gz")

创建 RDD:
现在我们可以使用这个文件来创建我们的 RDD

data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

过滤:

假设我们要计算有多少正常。我们在数据集中的交互。我们可以按如下方式过滤我们的 raw_data RDD。

normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

数数:

现在我们可以计算新 RDD 中有多少元素。

from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))

输出:

有 97278 次“正常”交互
计数在 5.951 秒内完成

映射:

在这种情况下,我们希望将数据文件读取为 CSV 格式的文件。我们可以通过将 lambda 函数应用于 RDD 中的每个元素来做到这一点,如下所示。这里我们将使用map() 和 take() 转换。

from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])

输出:

解析在 1.715 秒内完成
[u'0', 
 u'tcp', 
 u'http', 
 u'SF', 
 u'181', 
 u'5450', 
 u'0', 
 u'0', 
. 
. 
 你很正常。”]

拆分:

现在我们希望 RDD 中的每个元素都作为一个键值对,其中键是标签(例如 normal),值是代表 CSV 格式文件中行的整个元素列表。我们可以进行如下操作。这里我们使用line.split() 和 map()。

def parse_interaction(line):
elems = line.split(",")
tag = elems[41]
return (tag, elems)
 
key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])
输出: 
(u'normal.', 
 [u'0', 
  u'tcp', 
  u'http', 
  u'SF', 
  u'181', 
  u'5450', 
  u'0', 
  u'0', 
  u'0.00' ,
  u'1.00' ,
。
。
。
。
  u'normal。'])

收集行动:

在这里,我们将使用 collect() 操作。它将把 RDD 的所有元素都放入内存中。因此,在处理大型 RDD 时必须小心使用。

t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))

输出:

17.927 秒内收集的数据

当然,这比我们之前使用的任何其他操作花费的时间更长。每个拥有 RDD 片段的 Spark 工作节点都必须进行协调,以便检索其部分,然后将所有内容减少到一起。

作为结合前面所有内容的最后一个示例,我们希望将所有normal 交互收集 为键值对。

# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
 
# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)
 
# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")
 
# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)

输出:

12.485秒采集数据
正常互动97278次

就是这样,伙计们!

我希望你喜欢这个 Spark with Python 博客。如果您正在阅读本文,恭喜您!您不再是 PySpark 的新手。现在就在您的系统上试试这个简单的例子

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。