开始编写第一个Spark程序
为了从低信息密度的大批量文字中提取关键信息,或者为了展示文章的主题信息,我们往往会提取文档的关键词,并制作关键词词云。提取关键词的关键步骤就是对给定预料集中出现的各单词进行词频统计。本文将利用Spark提供的并行计算API,完成对大批量文档的词频统计。在正式编码之前,我们先看看Spark提供的大规模分布式运算API抽象:
RDD/SparkContext与Datasets/SparkSession
1. RDD与SparkContext
Spark以弹性分布式数据集(Resilient Distributed Dataset, RDD)这一核心概念,提供了对物理上分散的大规模数据集的统一抽象,并基于这一统一数据抽象,实现了map、filter、groupByKey、join等一系列数据分布式运算算子。而Spark应用,就是基于这一系列运算算子及其组合实现业务逻辑。
而对于RDD这一抽象,Spark又是以SparkContext为主要入口对外呈现的。因而,如下图1所示,用户Spark应用的main方法逻辑一般会先引入SparkContext,然后通过它生成输入数据的RDD抽象,然后进一步基于RDD算子,实现到集群的分布式运行。SparkContext作为Spark计算引擎的主要功能入口,除了生成RDD之外,还提供了包括集群配置、作业提交及注册监听器等大量接口,用于实现用户应用程序与集群的交互。更为详细的API介绍可参考: RDD Programming Guide。
图1 Spark架构图
2. Datasets与SparkSession
Spark 1.6版本SQL模块在RDD基础之上,引入额外的结构化信息,提供了Dataset/DataFrames这一新的结构化数据抽象,并基于此提供了一套分布式数据运算算子。由于包含额外的结构化信息,Spark引擎得以对运算进行进一步的优化,因而具有更好的运算性能。与SparkContext相对应地,Spark在2.0版本中,提供了SparkSession这一Datasets API的编程入口。更为详细的API介绍可参考: Spark SQL, DataFrames and Datasets Guide。
对于上述两套接口,本文以RDD/SparkContext为例,采用Scala语言和sbt构建工具,介绍词频统计应用的实现。而词频统计Datasets/SparkSession版本的实现可参见: Spark Session API和Dataset API。
引入Spark依赖
我们的WordCount应用依赖于RDD/SparkContext提供的API,而RDD/SparkContext是在core模块提供的,所以在 build.sbt 中引入spark-core依赖:
// build.sbt name := "Spark Word Count Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
编写代码
Spark应用通常包括以下步骤:
1. 创建SparkContext 2. 调用SparkContext接口,生成输入数据集的RDD 3. 对输入数据集RDD,调用RDD转换算子,构造数据处理的DAG执行图 4. 调用RDD action算子,启动数据处理,完成数据集的分布式运算并输出结果 5. Stop SparkContext
基于此,词频统计应用可实现如下:
/* WordCount.scala */ import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]) { // 1. Create [[SparkContext]] instance val conf: SparkConf = new SparkConf().setAppName("Word Count") val sparkContext: SparkContext = new SparkContext(conf) val inputFilePath = args(0) val outputFilePath = args(1) // 2. Generate RDD val lines = sparkContext.textFile(inputFilePath) // 3. RDD transformations val wordCounts = lines.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _) // 4. RDD action, start to process wordCounts.saveAsTextFile(outputFilePath) // 5. Stop sparkContext.stop() } }
应用构建及本地验证
应用构建
1. 生成符合sbt构建工具要求的项目目录:
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/WordCount.scala
2. 调用 sbt package 命令,构建应用:
# Package a jar containing your application $ sbt package ... [info] Packaging ./target/scala-2.11/spark-word-count-project_2.11-1.0.jar ...
本地验证
在部署应用到生产集群之前,可先进行本地验证,本地验证可调用 spark-submit 脚本,以local模式运行:
$ YOUR_SPARK_HOME/bin/spark-submit \ --class "WordCount" \ --master local[*] \ target/scala-2.11/spark-word-count-project_2.11-1.0.jar doucments_test # Input path argument words_count_results_test # Output path argument
生产部署
本地验证通过后,便可以将应用正式部署到生产集群。不过,Spark生产集群的部署运维又是一个让人头疼的事情。华为云数据湖探索(Data Lake Insight, DLI)基于Apache Spark/Flink生态,提供了完全托管的大数据处理分析服务。借助DLI服务,你只需要关注应用的处理逻辑,提供构建好的应用程序包,就可以轻松完成你的大规模数据处理分析任务,即开即用,按需计费。具体部署操作可参考:数据湖探索服务 用户指南。
- 点赞
- 收藏
- 关注作者
评论(0)