Flink Scala项目快速入门

举报
jackwangcumt 发表于 2021/11/25 08:16:14 2021/11/25
【摘要】 开发人员可以利用Scala的Actor模型在JVM上设计具伸缩性的并发应用程序,它会自动获得多核心处理器带来的优势,而不必依照复杂的Java线程模型来编写程序。本文重点讲解如何在用Maven来创建Scala语言的Flink入门项目模板,并在IDEA中进行调试运行。

1 Scala概述


      根据百度百科的定义,Scala是一门多范式(Multi-paradigm)的编程语言,一种类似Java 的编程语言,设计初衷是实现可伸缩的语言 、并集成面向对象编程和函数式编程的各种特性。Scala 运行在 Java 虚拟机JVM上,并兼容现有的 Java 程序。Scala 源代码被编译成 Java 字节码,并可以调用现有的 Java 类库。因此,其可以利用目前大量的Java生态资源。Scala得以提供一些出众的特性,包括:
1) 面向对象编程
2) 函数式编程
3) 更高层的并发模型
      Scala把Erlang语言特征中的基于Actor的并发模型带进了JVM上。开发人员可以利用Scala的Actor模型在JVM上设计具伸缩性的并发应用程序,它会自动获得多核心处理器带来的优势,而不必依照复杂的Java线程模型来编写程序。

2 前提条件


     本示例需要提前安装几个软件,具体如下所示:
JDK1.8
Scala2.11
Maven3.6
IDEA

3 Flink Scala项目构建


      当前的很多构建工具,可以提供项目模板来让开发人员快速初始化项目文件,这对于降低学习难度,提高项目开发效率来说,起到积极的作用。
Flink项目的初始化,可以借助Maven工具来构建。在之前JDK和Maven环境搭建完成,并正确设置环境变量后,即可以用mvn archetype:generate快速生成项目文件。当前支持Scala和Java两种项目模板。本文则介绍Scala项目模板。

      首先打开命令行CMD窗体,并切换到项目的根目录中,执行如下命令:

 mvn archetype:generate                            ^
      -DarchetypeGroupId=org.apache.flink            ^
      -DarchetypeArtifactId=flink-quickstart-scala      ^
      -DarchetypeVersion=1.12.0                     ^
      -DgroupId=com.myflink                         ^
      -DartifactId=flink-scala                        ^
      -Dpackage=com.example                       ^
      -DinteractiveMode=false 

上述脚本每行的^符号为Windows操作系统命令行的换行符,而Linux操作系统下为\符号。当前这里换行是为了更加的清晰,即也可以写成一行。其中:
-DarchetypeArtifactId=flink-quickstart-scala
表示基于的项目模板为flink-quickstart-scala,则表示为Scala版本的Flink项目。
-DarchetypeVersion=1.12.0
表示Flink版本为1.12.0。
-DgroupId=com.myflink
代表组织和整个项目的唯一标志。
-DartifactId=flink-scala
具体项目的名称,也是生成项目文件夹的名称。
-Dpackage=com.example
项目源码的包名。
-DinteractiveMode=false
表示不启用交互模式,这样提示信息更少。
成功创建项目文件,则提示如图所示。

1.jpg
其中有一个BatchJob.scala文件,它是批处理示例文件,而StreamingJob.scala是流处理示例文件。此处修改BatchJob.scala文件,给出一个用scala语言编写统计单词个数的示例代码,如下所示。

01    package com.example
02    import org.apache.flink.api.java.utils.ParameterTool
03    import org.apache.flink.api.scala._
04    import org.apache.flink.core.fs.FileSystem.WriteMode
05    //object可以直接运行
06    object BatchJob {
07      //启动函数
08      def main(args: Array[String]) {
09        //参数处理,如 --参数名 参数值
10        val params: ParameterTool = ParameterTool.fromArgs(args)
11        //获取批处理执行环境
12        val env = ExecutionEnvironment.getExecutionEnvironment
13        //演示数据
14        val mytxt = env.fromElements(
15          "Hello Word",
16          "Hello Flink",
17          "Apache Flink")
18        //单词统计
19        val wc = mytxt.flatMap(line => line.split("\\s"))
20          .map { (_, 1) }
21          .groupBy(0)
22          .sum(1)
23        //可以写入一个文件,便于查看
24        wc.setParallelism(1)
25        //--output xxx
26        if (params.has("output")) {
27          //WriteMode.OVERWRITE覆盖模式
28          wc.writeAsCsv(params.get("output"), "\n", ",",WriteMode.OVERWRITE)
29          env.execute("Scala WordCount Demo")
30        } else {
31          //便于IDEA调试,实际部署一般不用
32          wc.print()
33        }
34      }
35    }

       其中创建的scala项目模板中,pom.xml给出了一些核心依赖库,其中${flink.version}是一个变量,代表Flink版本,这样在升级时,只需要在定义${flink.version}的地方进行一处修改即可,即<flink.version>1.12.0</flink.version>。同理,${scala.binary.version}变量代表了编译Flink Scala库时,依赖的Scala SDK版本,如2.11。下面摘抄pom.xml核心库对应的依赖配置,具体如下所示:

01    <dependency>
02    	 <groupId>org.apache.flink</groupId>
03    	 <artifactId>flink-scala_${scala.binary.version}</artifactId>
04    	 <version>${flink.version}</version>
05    	 <scope>provided</scope>
06    </dependency>
07    <dependency>
08    	 <groupId>org.apache.flink</groupId>
09    	 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
10    	 <version>${flink.version}</version>
11    	 <scope>provided</scope>
12    </dependency>
13    <dependency>
14    	<groupId>org.apache.flink</groupId>
15    	<artifactId>flink-clients_${scala.binary.version}</artifactId>
16    	<version>${flink.version}</version>
17    	<scope>provided</scope>
18    </dependency>
19    <!-- Scala Library, provided by Flink as well. -->
20    <dependency>
21    	<groupId>org.scala-lang</groupId>
22    	<artifactId>scala-library</artifactId>
23    	<version>${scala.version}</version>
24    	<scope>provided</scope>
25    </dependency>

       其中,默认的<scope>provided</scope>代表依赖的库范围是provided,即表示打包的时候并不将其一同打包到jar包中。但这个在开发的时候也带来了一些不便。这个后面会提到,在IDEA工具中调试Flink程序时,需要在项目配置处开启Include dependencies with “Provided”scope选项,否则会出现找不到相关类的错误。        当正确配置后,IDEA工具会自动在BatchJob.scala文件中识别到可执行的入口函数,这个功能非常实用,我们可以选择运行BatchJob或者调式BatchJob程序。这里选择Debug ‘BatchJob’进行程序调试,如果正确运行的话,会在IDEA Console控制台输出如下信息,如图所示。

2.jpg
 IDEA导入模板构建的Flink项目,在默认情况下,直接调试可能会提示类找不到的错误,这可能是由于pom.xml中关于多个Flink核心依赖库配置为<scope>provided</scope>,可将该行进行注释后再试。或者可以在项目的运行和调试配置处开启Include dependencies with “Provided” scope选项。这个具体配置界面如图所示。

3.jpg
IDEA中配置的Include dependencies with “Provided” scope选项只针对某一个具体的主类,因此当运行其他主类时,需要重新配置。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200