大数据技术:Apache Spark快速入门指南

举报
Jet Ding 发表于 2020/09/28 16:19:50 2020/09/28
【摘要】 这篇文章我们来学习一下如何使用Spark, 我们首先通过Spark的交互式shell(在Python或Scala中)学习如何使用它的API,然后展示如何在Java、Scala和Python中编写应用程序。

1 引言

这篇文章我们来学习一下如何使用Spark, 我们首先通过Spark的交互式shell(在Python或Scala中)学习如何使用它的API,然后展示如何在Java、Scala和Python中编写应用程序。

 

需要注意的是,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。在Spark 2.0之后,RDD被Dataset所取代,Dataset和RDD一样是强类型的,但其底层有更丰富的优化。RDD接口仍然支持,你可以在RDD编程指南中获得更详细的参考。不过,我们强烈建议你改用Dataset,它的性能比RDD更好。

2 使用

2.1  安全配置

值得注意的是,缺省情况下,Spark的安全设置是不设限制的。这可能意味着你在默认情况下容易受到攻击。

2.2   用Spark Shell进行互动分析

2.2.1   基本操作

Spark的shell提供了一种简单的学习和使用API的方法,同时也是一个强大的交互式分析数据的工具。可以用Scala(在Java虚拟机上运行,因此是使用现有Java库的好方法)或Python。可以在Spark目录下启动:

 

Scala:

./bin/spark-shell

 

Python:

./bin/pyspark

 

或者如果在你当前的环境中是用pip安装的PySpark, 你可以直接调用:

pyspark

 

Spark的主要抽象是一个分布式的项目集合,称为Dataset。Dataset可以从Hadoop InputFormats(如HDFS文件)或通过转换其他Dataset来创建。由于Python的动态特性,我们不需要Dataset在Python中是强类型的。因此,Python中所有的Dataset都是Dataset[Row],为了与Pandas和R中的数据框架概念保持一致,我们将Spark源码目录下的README文件中的文本制作一个新的DataFrame:

 

>>> textFile = spark.read.text("README.md")

 

你可以直接从DataFrame中取值,通过调用一些操作,或者转换DataFrame来获取一个新的值。

 

>>> textFile.count() # DataFrame中的行数

126

 

>>> textFile.first() # DataFrame中的第一行

Row(value=u'# Apache Spark')

 

现在让我们将这个DataFrame转换为一个新的DataFrame。我们调用filter来返回一个新的DataFrame,其中包含文件中行的子集。

 

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

 

我们可以把转化和操作链在一起:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # 有多少行包含 "Spark"

15

 

2.2.2    更多关于数据集操作

数据集操作和转换可以用于更复杂的计算。比方说,我们想找到字数最多的那一行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

res4: Long = 15

 

python:

>>> from pyspark.sql.functions import *

>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()

[Row(max(numWords)=15)]

 

首先将一行映射到一个整数值,创建一个新的Dataset,然后在该Dataset上调用reduce来寻找最大的数值。map和reduce的参数是Scala函数的字元(闭包),可以使用任何语言特性或Scala/Java库。例如,我们可以轻松地调用其他地方声明的函数。使用Math.max()函数来使这段代码更容易理解:

scala> import java.lang.Math

import java.lang.Math

 

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

res5: Int = 15

 

一个常见的数据流模式就是Hadoop流行的MapReduce。Spark可以轻松实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

 

 

python:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

 

在这里,我们调用flatMap将行的Dataset转化为单词的Dataset,然后结合groupByKeycount来计算文件中每个词的计数,作为一个(StringLong)对的Dataset。要在我们的shell中收集单词计数,我们可以调用collect

scala> wordCounts.collect()

res6: Array[(StringInt)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

 

python:

>>> wordCounts.collect()

[Row(word=u'online'count=1), Row(word=u'graphs'count=1), ...]

 

2.2.3    缓存功能

Spark还支持将数据集拉入集群范围内的内存缓存。当数据被反复访问时,这一点非常有用,比如在查询一个小的 "热"数据集或运行PageRank这样的迭代算法时。作为一个简单的例子,让我们将我们的 linesWithSpark 数据集标记为缓存:

scala> linesWithSpark.cache()

res7: linesWithSpark.type = [value: string]

 

scala> linesWithSpark.count()

res8: Long = 15

 

scala> linesWithSpark.count()

res9: Long = 15

 

Python:

>>> linesWithSpark.cache()

 

>>> linesWithSpark.count()

15

 

>>> linesWithSpark.count()

15

 

使用Spark来探索和缓存一个100行的文本文件可能看起来很傻。有趣的是,这些相同的功能可以用于非常大的数据集,甚至当它们在几十个或几百个节点上呈条状排列时。你也可以通过将bin/spark-shell或者bin/pyspark连接到集群来交互式地实现这一点,正如RDD编程指南中所描述的那样。

2.2.4    自成一体的应用

假设我们使用Spark API来编写一个自足的应用程序,我们通过Scala(使用sbt)、Java(使用Maven)和Python(pip)来讲解一个简单的应用程序。 

2.2.4.1   Scala

我们将在Scala中创建一个非常简单的Spark应用程序, 比如叫SimpleApp.scala。

/* SimpleApp.scala */

import org.apache.spark.sql.SparkSession

 

object SimpleApp {

  def main(argsArray[String]) {

    val logFile = "YOUR_SPARK_HOME/README.md" // 指定一个你自己系统上的文件

    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()

    val logData = spark.read.textFile(logFile).cache()

    val numAs = logData.filter(line => line.contains("a")).count()

    val numBs = logData.filter(line => line.contains("b")).count()

    println(s"Lines with a: $numAs, Lines with b: $numBs")

    spark.stop()

  }

}

 

请注意,应用程序应该定义一个main()方法,而不是扩展scala.App。否则scala.App的子类可能无法正常工作。

这个程序只是统计Spark README中包含 "a "的行数和包含 "b "的行数。需要注意的是,你需要用Spark的安装位置替换YOUR_SPARK_HOME。与前面的例子不同,Spark shell初始化自己的SparkSession,我们初始化一个SparkSession作为程序的一部分。

我们调用SparkSession.builder来构造一个SparkSession,然后设置应用程序名称,最后调用getOrCreate来获取SparkSession实例。

我们的应用程序依赖于Spark API,所以我们还会包含一个sbt配置文件,即build.sbt,它解释了Spark是一个依赖关系。这个文件还添加了一个Spark依赖的仓库:

name := "Simple Project"

 

version := "1.0"

 

scalaVersion := "2.12.10"

 

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

 

为了让sbt正常工作,我们需要按照典型的目录结构来布局SimpleApp.scala和build.sbt。一旦这一切就绪,我们就可以创建一个包含应用程序代码的JAR包,然后使用spark-submit脚本来运行我们的程序。

你的目录布局大体是这样的

$ find .

.

./build.sbt

./src

./src/main

./src/main/scala

./src/main/scala/SimpleApp.scala

 

包装一个包含应用程序的jar

$ sbt package

...

[info] Packaging {.}/{.}/target/scala-2.12/simple-project_2.12-1.0.jar

 

使用spark-submit来运行你的应用程序。

YOUR_SPARK_HOME/bin/spark-submit \

  --class "SimpleApp" \

  -master local[4] \

  target/scala-2.12/simple-project_2.12-1.0.jar

...

Lines with a: 46Lines with b: 23

2.2.4.2   Java

这个例子将使用Maven来编译应用程序JAR,但任何类似的构建系统都可以使用。

我们创建一个非常简单的Spark应用,SimpleApp.java:

/* SimpleApp.java */

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.Dataset;

 

public class SimpleApp {

  public static void main(String[] args) {

    String logFile = "YOUR_SPARK_HOME/README.md"// 指定一个你自己系统上的文件

    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();

    Dataset<StringlogData = spark.read().textFile(logFile).cache();

 

    long numAs = logData.filter(s -> s.contains("a")).count();

    long numBs = logData.filter(s -> s.contains("b")).count();

 

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

 

    spark.stop();

  }

}

 

这个程序只是计算Spark README中包含'a'的行数和包含'b'的行数。需要注意的是,你需要用Spark的安装位置替换YOUR_SPARK_HOME。与前面的Spark shell的例子不同,Spark shell会初始化自己的SparkSession,我们将SparkSession初始化为程序的一部分。

 

为了构建程序,我们还编写了一个Maven pom.xml文件,将Spark列为依赖关系。请注意,Spark构件是有Scala版本标签的。

 

    <dependency> <!-- Spark dependency -->

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-sql_2.12</artifactId>

      <version>3.0.0</version>

      <scope>provided</scope>

    </dependency>

 

按照规范的Maven目录结构来布置这些文件:

 

$ find .

./pom.xml

./src

./src/main

./src/main/java

./src/main/java/SimpleApp.java

 

现在,我们可以使用Maven打包应用程序,并用./bin/spark-submit执行。

将包含应用程序的JAR打包

$ mvn package

...

[INFO构建jar{.}/{.}/target/simple-project-1.0.jar

 

使用spark-submit来运行你的应用程序。

YOUR_SPARK_HOME/bin/spark-submit \

  --class "SimpleApp"\

  -master local[4]\

  target/simple-project-1.0.jar

...

Lines with a: 46Lines with b: 23

 

2.2.4.3   Python

现在我们将展示如何使用Python API (PySpark)编写一个应用程序。

如果你正在构建一个打包的PySpark应用程序或库,你可以将它添加到你的setup.py文件中:

 

    install_requires=[

        'pyspark=={site.SPARK_VERSION}'

    ]

 

作为一个例子,我们将创建一个简单的Spark应用程序,SimpleApp.py。

 

"""SimpleApp.py"""

from pyspark.sql import SparkSession

 

logFile = "YOUR_SPARK_HOME/README.md"  指定一个你自己系统上的文件

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

logData = spark.read.text(logFile).cache()

 

numAs = logData.filter(logData.value.contains('a')).count()

numBs = logData.filter(logData.value.contains('b')).count()

 

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

 

spark.stop()

 

这个程序只是计算一个文本文件中包含'a'的行数和包含'b'的行数。注意,你需要将YOUR_SPARK_HOME替换为Spark的安装位置。与Scala和Java的例子一样,我们使用SparkSession来创建Datasets。对于使用自定义类或第三方库的应用程序,我们也可以通过其-py-files参数将代码依赖关系打包成.zip文件添加到spark-submit中。SimpleApp非常简单,我们不需要指定任何代码依赖。

我们可以使用bin/spark-submit脚本来运行这个应用程序。

使用spark-submit来运行你的应用程序

YOUR_SPARK_HOME/bin/spark-submit \

  --master local[4] \

  SimpleApp.py

...

Lines with a: 46Lines with b: 23

 

如果你的环境中安装了PySpark pip (例如,pip install pyspark),你可以用常规的Python解释器来运行你的应用程序,或者使用提供的'spark-submit'。

 

使用Python解释器来运行你的应用程序

$ python SimpleApp.py

...

Lines with a: 46Lines with b: 23

 

3 参考

https://spark.apache.org/downloads.html

https://spark.apache.org/docs/latest/security.html

https://spark.apache.org/docs/latest/quick-start.html

https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://spark.apache.org/docs/latest/cluster-overview.html

https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples

https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples

https://github.com/apache/spark/tree/master/examples/src/main/python

https://github.com/apache/spark/tree/master/examples/src/main/r

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。