Flink Scala项目快速入门
1 Scala概述
根据百度百科的定义,Scala是一门多范式(Multi-paradigm)的编程语言,一种类似Java 的编程语言,设计初衷是实现可伸缩的语言 、并集成面向对象编程和函数式编程的各种特性。Scala 运行在 Java 虚拟机JVM上,并兼容现有的 Java 程序。Scala 源代码被编译成 Java 字节码,并可以调用现有的 Java 类库。因此,其可以利用目前大量的Java生态资源。Scala得以提供一些出众的特性,包括:
1) 面向对象编程
2) 函数式编程
3) 更高层的并发模型
Scala把Erlang语言特征中的基于Actor的并发模型带进了JVM上。开发人员可以利用Scala的Actor模型在JVM上设计具伸缩性的并发应用程序,它会自动获得多核心处理器带来的优势,而不必依照复杂的Java线程模型来编写程序。
2 前提条件
本示例需要提前安装几个软件,具体如下所示:
JDK1.8
Scala2.11
Maven3.6
IDEA
3 Flink Scala项目构建
当前的很多构建工具,可以提供项目模板来让开发人员快速初始化项目文件,这对于降低学习难度,提高项目开发效率来说,起到积极的作用。
Flink项目的初始化,可以借助Maven工具来构建。在之前JDK和Maven环境搭建完成,并正确设置环境变量后,即可以用mvn archetype:generate快速生成项目文件。当前支持Scala和Java两种项目模板。本文则介绍Scala项目模板。
首先打开命令行CMD窗体,并切换到项目的根目录中,执行如下命令:
mvn archetype:generate ^
-DarchetypeGroupId=org.apache.flink ^
-DarchetypeArtifactId=flink-quickstart-scala ^
-DarchetypeVersion=1.12.0 ^
-DgroupId=com.myflink ^
-DartifactId=flink-scala ^
-Dpackage=com.example ^
-DinteractiveMode=false
上述脚本每行的^符号为Windows操作系统命令行的换行符,而Linux操作系统下为\符号。当前这里换行是为了更加的清晰,即也可以写成一行。其中:
-DarchetypeArtifactId=flink-quickstart-scala
表示基于的项目模板为flink-quickstart-scala,则表示为Scala版本的Flink项目。
-DarchetypeVersion=1.12.0
表示Flink版本为1.12.0。
-DgroupId=com.myflink
代表组织和整个项目的唯一标志。
-DartifactId=flink-scala
具体项目的名称,也是生成项目文件夹的名称。
-Dpackage=com.example
项目源码的包名。
-DinteractiveMode=false
表示不启用交互模式,这样提示信息更少。
成功创建项目文件,则提示如图所示。
其中有一个BatchJob.scala文件,它是批处理示例文件,而StreamingJob.scala是流处理示例文件。此处修改BatchJob.scala文件,给出一个用scala语言编写统计单词个数的示例代码,如下所示。
01 package com.example
02 import org.apache.flink.api.java.utils.ParameterTool
03 import org.apache.flink.api.scala._
04 import org.apache.flink.core.fs.FileSystem.WriteMode
05 //object可以直接运行
06 object BatchJob {
07 //启动函数
08 def main(args: Array[String]) {
09 //参数处理,如 --参数名 参数值
10 val params: ParameterTool = ParameterTool.fromArgs(args)
11 //获取批处理执行环境
12 val env = ExecutionEnvironment.getExecutionEnvironment
13 //演示数据
14 val mytxt = env.fromElements(
15 "Hello Word",
16 "Hello Flink",
17 "Apache Flink")
18 //单词统计
19 val wc = mytxt.flatMap(line => line.split("\\s"))
20 .map { (_, 1) }
21 .groupBy(0)
22 .sum(1)
23 //可以写入一个文件,便于查看
24 wc.setParallelism(1)
25 //--output xxx
26 if (params.has("output")) {
27 //WriteMode.OVERWRITE覆盖模式
28 wc.writeAsCsv(params.get("output"), "\n", ",",WriteMode.OVERWRITE)
29 env.execute("Scala WordCount Demo")
30 } else {
31 //便于IDEA调试,实际部署一般不用
32 wc.print()
33 }
34 }
35 }
其中创建的scala项目模板中,pom.xml给出了一些核心依赖库,其中${flink.version}是一个变量,代表Flink版本,这样在升级时,只需要在定义${flink.version}的地方进行一处修改即可,即<flink.version>1.12.0</flink.version>。同理,${scala.binary.version}变量代表了编译Flink Scala库时,依赖的Scala SDK版本,如2.11。下面摘抄pom.xml核心库对应的依赖配置,具体如下所示:
01 <dependency>
02 <groupId>org.apache.flink</groupId>
03 <artifactId>flink-scala_${scala.binary.version}</artifactId>
04 <version>${flink.version}</version>
05 <scope>provided</scope>
06 </dependency>
07 <dependency>
08 <groupId>org.apache.flink</groupId>
09 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
10 <version>${flink.version}</version>
11 <scope>provided</scope>
12 </dependency>
13 <dependency>
14 <groupId>org.apache.flink</groupId>
15 <artifactId>flink-clients_${scala.binary.version}</artifactId>
16 <version>${flink.version}</version>
17 <scope>provided</scope>
18 </dependency>
19 <!-- Scala Library, provided by Flink as well. -->
20 <dependency>
21 <groupId>org.scala-lang</groupId>
22 <artifactId>scala-library</artifactId>
23 <version>${scala.version}</version>
24 <scope>provided</scope>
25 </dependency>
其中,默认的<scope>provided</scope>代表依赖的库范围是provided,即表示打包的时候并不将其一同打包到jar包中。但这个在开发的时候也带来了一些不便。这个后面会提到,在IDEA工具中调试Flink程序时,需要在项目配置处开启Include dependencies with “Provided”scope选项,否则会出现找不到相关类的错误。 当正确配置后,IDEA工具会自动在BatchJob.scala文件中识别到可执行的入口函数,这个功能非常实用,我们可以选择运行BatchJob或者调式BatchJob程序。这里选择Debug ‘BatchJob’进行程序调试,如果正确运行的话,会在IDEA Console控制台输出如下信息,如图所示。
IDEA导入模板构建的Flink项目,在默认情况下,直接调试可能会提示类找不到的错误,这可能是由于pom.xml中关于多个Flink核心依赖库配置为<scope>provided</scope>,可将该行进行注释后再试。或者可以在项目的运行和调试配置处开启Include dependencies with “Provided” scope选项。这个具体配置界面如图所示。
IDEA中配置的Include dependencies with “Provided” scope选项只针对某一个具体的主类,因此当运行其他主类时,需要重新配置。
- 点赞
- 收藏
- 关注作者
评论(0)