开始编写第一个Spark程序

举报
皮皮猫 发表于 2020/05/26 22:08:57 2020/05/26
【摘要】 作为当前最主流的大数据计算引擎之一,Spark提供了Scala、Java、Python、R及SQL等多种语言的高级API,用户可以通过这些API,快速高效地实现自己的大规模数据分析应用。本文将基于Scala语言,介绍如何编写你的第一个Spark程序,并运行到DLI之上。下面,跟着本文,开启你的Spark之旅吧。

为了从低信息密度的大批量文字中提取关键信息,或者为了展示文章的主题信息,我们往往会提取文档的关键词,并制作关键词词云。提取关键词的关键步骤就是对给定预料集中出现的各单词进行词频统计。本文将利用Spark提供的并行计算API,完成对大批量文档的词频统计。在正式编码之前,我们先看看Spark提供的大规模分布式运算API抽象:

RDD/SparkContext与Datasets/SparkSession

1. RDD与SparkContext

Spark以弹性分布式数据集(Resilient Distributed Dataset, RDD)这一核心概念,提供了对物理上分散的大规模数据集的统一抽象,并基于这一统一数据抽象,实现了map、filter、groupByKey、join等一系列数据分布式运算算子。而Spark应用,就是基于这一系列运算算子及其组合实现业务逻辑。

而对于RDD这一抽象,Spark又是以SparkContext为主要入口对外呈现的。因而,如下图1所示,用户Spark应用的main方法逻辑一般会先引入SparkContext,然后通过它生成输入数据的RDD抽象,然后进一步基于RDD算子,实现到集群的分布式运行。SparkContext作为Spark计算引擎的主要功能入口,除了生成RDD之外,还提供了包括集群配置、作业提交及注册监听器等大量接口,用于实现用户应用程序与集群的交互。更为详细的API介绍可参考: RDD Programming Guide

图1 Spark架构图

2. Datasets与SparkSession

Spark 1.6版本SQL模块在RDD基础之上,引入额外的结构化信息,提供了Dataset/DataFrames这一新的结构化数据抽象,并基于此提供了一套分布式数据运算算子。由于包含额外的结构化信息,Spark引擎得以对运算进行进一步的优化,因而具有更好的运算性能。与SparkContext相对应地,Spark在2.0版本中,提供了SparkSession这一Datasets API的编程入口。更为详细的API介绍可参考: Spark SQL, DataFrames and Datasets Guide

对于上述两套接口,本文以RDD/SparkContext为例,采用Scala语言和sbt构建工具,介绍词频统计应用的实现。而词频统计Datasets/SparkSession版本的实现可参见: Spark Session API和Dataset API

引入Spark依赖

我们的WordCount应用依赖于RDD/SparkContext提供的API,而RDD/SparkContext是在core模块提供的,所以在 build.sbt 中引入spark-core依赖:

// build.sbt
name := "Spark Word Count Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"

编写代码

Spark应用通常包括以下步骤:

  1. 创建SparkContext
  2. 调用SparkContext接口,生成输入数据集的RDD
  3. 对输入数据集RDD,调用RDD转换算子,构造数据处理的DAG执行图
  4. 调用RDD action算子,启动数据处理,完成数据集的分布式运算并输出结果
  5. Stop SparkContext

基于此,词频统计应用可实现如下:

/* WordCount.scala */
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]) {
    // 1. Create [[SparkContext]] instance
    val conf: SparkConf = new SparkConf().setAppName("Word Count")
    val sparkContext: SparkContext = new SparkContext(conf)
       
    val inputFilePath = args(0)
    val outputFilePath = args(1)
    
    // 2. Generate RDD
    val lines = sparkContext.textFile(inputFilePath)
    
    // 3. RDD transformations
    val wordCounts = lines.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _)
    
    // 4. RDD action, start to process
    wordCounts.saveAsTextFile(outputFilePath)
    
    // 5. Stop
    sparkContext.stop()
  }
}

应用构建及本地验证

  • 应用构建

1. 生成符合sbt构建工具要求的项目目录:

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/WordCount.scala

2. 调用 sbt package 命令,构建应用:

# Package a jar containing your application
$ sbt package
...
[info] Packaging ./target/scala-2.11/spark-word-count-project_2.11-1.0.jar
...
  • 本地验证

在部署应用到生产集群之前,可先进行本地验证,本地验证可调用 spark-submit 脚本,以local模式运行:

$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "WordCount" \
  --master local[*] \
  target/scala-2.11/spark-word-count-project_2.11-1.0.jar
  doucments_test  # Input path argument
  words_count_results_test  # Output path argument

生产部署

本地验证通过后,便可以将应用正式部署到生产集群。不过,Spark生产集群的部署运维又是一个让人头疼的事情。华为云数据湖探索(Data Lake Insight, DLI)基于Apache Spark/Flink生态,提供了完全托管的大数据处理分析服务。借助DLI服务,你只需要关注应用的处理逻辑,提供构建好的应用程序包,就可以轻松完成你的大规模数据处理分析任务,即开即用,按需计费。具体部署操作可参考:数据湖探索服务 用户指南

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。