《Flink原理、实战与性能优化》 —3.3 Flink程序结构

举报
华章计算机 发表于 2020/02/17 14:18:02 2020/02/17
【摘要】 本节书摘来自华章计算机《Flink原理、实战与性能优化》 一书中第3章,第3.3节,作者是张利兵 。

3.3 Flink程序结构

和其他分布式处理引擎一样,Flink应用程序也遵循着一定的编程模式。不管是使用DataStream API还是DataSet API基本具有相同的程序结构,如代码清单3-1所示。通过流式计算的方式实现对文本文件中的单词数量进行统计,然后将结果输出在给定路径中。

代码清单3-1 Streaming WordCount实例代码

image.png

整个Flink程序一共分为5步,分别为设定Flink执行环境、创建和加载数据集、对数据集指定转换操作逻辑、指定计算结果输出位置、调用execute方法触发程序执行。对于所有的Flink应用程序基本都含有这5个步骤,下面将详细介绍每个步骤。

1. Execution Environment

运行Flink程序的第一步就是获取相应的执行环境,执行环境决定了程序执行在什么环境(例如本地运行环境或者集群运行环境)中。同时不同的运行环境决定了应用的类型,批量处理作业和流式处理作业分别使用的是不同的Execution Environment。例如StreamExecutionEnvironment是用来做流式数据处理环境,ExecutionEnvironment是批

量数据处理环境。可以使用三种方式获取Execution Environment,例如StreamExecution-

Envirenment。

//设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境

StreamExecutionEnvironment.getExecutionEnvironment

//指定并行度创建本地执行环境

StreamExecutionEnvironment.createLocalEnvironment(5)

//指定远程JobManagerIP和RPC端口以及运行程序所在jar包及其依赖包

StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost",6021,5,"/user/application.jar")

其中第三种方式可以直接从本地代码中创建与远程集群的Flink JobManager的RPC连接,通过指定应用程序所在的Jar包,将运行程序远程拷贝到JobManager节点上,然后将Flink应用程序运行在远程的环境中,本地程序相当于一个客户端。

和StreamExecutionEnvironment构建过程一样,开发批量应用需要获取Execution-

Environment来构建批量应用开发环境,如以下代码实例通过调用ExecutionEnvironment的静态方法来获取批计算环境。

//设定Flink运行环境,如果在本地启动则创建本地环境,如果是在集群上启动,则创建集群环境

ExecutionEnvironment.getExecutionEnvironment

//指定并行度创建本地执行环境

ExecutionEnvironment.createLocalEnvironment(5)

//指定远程JobManagerIP和RPC端口以及运行程序所在jar包及其依赖包

ExecutionEnvironment.createRemoteEnvironment("JobManagerHost",6021,5,"/user/application.jar")

针对Scala和Java不同的编程语言环境,Flink分别制定了不同的语言同时分别定义了不同的Execution Environment接口。StreamExecutionEnvironment Scala开发接口在org.apache.flink.streaming.api.scala包中,Java开发接口在org.apache.flink.streaming.api.java包中;ExecutionEnvironment Scala接口在org.apache.flink.api.scala包中,Java开发接口则在org.apache.flink.api.java包中。用户使用不同语言开发Flink应用时需要引入不同环境对应的执行环境。

2. 初始化数据

创建完成ExecutionEnvironment后,需要将数据引入到Flink系统中。Execution-Environment提供不同的数据接入接口完成数据的初始化,将外部数据转换成DataStream<T>或DataSet<T>数据集。如以下代码所示,通过调用readTextFile()方法读取file:///pathfile路径中的数据并转换成DataStream<String>数据集。

val text:DataStream[String] = env.readTextFile("file:///path/file")

通过读取文件并转换为DataStream[String]数据集,这样就完成了从本地文件到分布式数据集的转换,同时在Flink中提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将Flink系统和其他第三方系统连接,直接获取外部数据。

3. 执行转换操作

数据从外部系统读取并转换成DataStream或者DataSet数据集后,下一步就将对数据集进行各种转换操作。Flink中的Transformation操作都是通过不同的Operator来实现,每个Operator内部通过实现Function接口完成数据处理逻辑的定义。在DataStream API和DataSet API提供了大量的转换算子,例如map、flatMap、filter、keyBy等,用户只需要定义每种算子执行的函数逻辑,然后应用在数据转换操作Dperator接口中即可。如下代码实现了对输入的文本数据集通过FlatMap算子转换成数组,然后过滤非空字段,将每个单词进行统计,得到最后的词频统计结果。

val counts: DataStream[(String, Int)] = text

    .flatMap(_.toLowerCase.split(" "))//执行FlatMap转换操作

  .filter(_.nonEmpty)//执行Filter操作过滤空字段

  .map((_, 1))//执行map转换操作,转换成key-value接口

  .keyBy(0)//按照指定key对数据重分区

  .sum(1)//执行求和运算操作

在上述代码中,通过Scala接口处理数据,极大地简化数据处理逻辑的定义,只需要通过传入相应Lambada计算表达式,就能完成Function定义。特殊情况下用户也可以通过实现Function接口来完成定义数据处理逻辑。然后将定义好的Function应用在对应的算子中即可。Flink中定义Funciton的计算逻辑可以通过如下几种方式完成定义。

(1)通过创建Class实现Funciton接口

Flink中提供了大量的函数供用户使用,例如以下代码通过定义MyMapFunction Class实现MapFunction接口,然后调用DataStream的map()方法将MyMapFunction实现类传入,完成对实现将数据集中字符串记录转换成大写的数据处理。

val dataStream: DataStream[String] = env.fromElements("hello", "flink")

dataStream.map(new MyMapFunction)

class MyMapFunction extends MapFunction[String, String] {

  override def map(t: String): String = {

    t.toUpperCase()

  }

}

(2)通过创建匿名类实现Funciton接口

除了以上单独定义Class来实现Function接口之处,也可以直接在map()方法中创建匿名实现类的方式定义函数计算逻辑。

val dataStream: DataStream[String] = env.fromElements("hello", "flink")

//通过创建MapFunction匿名实现类来定义Map函数计算逻辑

  dataStream.map(new MapFunction[String, String] {

    //实现对输入字符串大写转换

    override def map(t: String): String = {

      t.toUpperCase()

    }

  })

(3)通过实现RichFunciton接口

前面提到的转换操作都实现了Function接口,例如MapFunction和FlatMap-Function接口,在Flink中同时提供了RichFunction接口,主要用于比较高级的数据处理场景,RichFunction接口中有open、close、getRuntimeContext和setRuntimeContext等方法来获取状态,缓存等系统内部数据。和MapFunction相似,RichFunction子类中也有RichMap-Function,如下代码通过实现RichMapFunction定义数据处理逻辑,具体的RichFunction的介绍读者可以参考后续章节中心介绍。

//定义匿名类实现RichMapFunction接口,完成对字符串到整形数字的转换

data.map (new RichMapFunction[String, Int] {

  def map(in: String):Int = { in.toInt }

})

4. 分区Key指定

在DataStream数据经过不同的算子转换过程中,某些算子需要根据指定的key进行转换,常见的有join、coGroup、groupBy类算子,需要先将DataStream或DataSet数据集转换成对应的KeyedStream和GroupedDataSet,主要目的是将相同key值的数据路由到相同的Pipeline中,然后进行下一步的计算操作。需要注意的是,在Flink中这种操作并不是真正意义上将数据集转换成Key-Value结构,而是一种虚拟的key,目的仅仅是帮助后面的基于Key的算子使用,分区人Key可以通过两种方式指定:

(1)根据字段位置指定

在DataStream API中通过keyBy()方法将DataStream数据集根据指定的key转换成重新分区的KeyedStream,如以下代码所示,对数据集按照相同key进行sum()聚合操作。

val dataStream: DataStream[(String, Int)] = env.fromElements(("a", 1), ("c",

  2))

//根据第一个字段重新分区,然后对第二个字段进行求和运算

Val result = dataStream.keyBy(0).sum(1)

在DataSet API中,如果对数据根据某一条件聚合数据,对数据进行聚合时候,也需要对数据进行重新分区。如以下代码所示,使用DataSet API对数据集根据第一个字段作为GroupBy的key,然后对第二个字段进行求和运算。

val dataSet = env.fromElements(("hello", 1), ("flink", 3))

  //根据第一个字段进行数据重分区

  val groupedDataSet:GroupedDataSet[(String,Int)] = dataSet.groupBy(0)

  //求取相同key值下第二个字段的最大值

  groupedDataSet.max(1)

(2)根据字段名称指定

KeyBy和GroupBy的Key除了能够通过字段位置来指定之外,也可以根据字段的名称来指定。使用字段名称需要DataStream中的数据结构类型必须是Tuple类或者POJOs类的。如以下代码所示,通过指定name字段名称来确定groupby的key字段。

val personDataSet = env.fromElements(new Persion("Alex", 18),new

Persion("Peter", 43))

//指定name字段名称来确定groupby字段

personDataSet.groupBy("name").max(1)

如果程序中使用Tuple数据类型,通常情况下字段名称从1开始计算,字段位置索引从0开始计算,以下代码中两种方式是等价的。

val personDataStream = env.fromElements(("Alex", 18),("Peter", 43))

//通过名称指定第一个字段名称

personDataStream.keyBy("_1")

//通过位置指定第一个字段

personDataStream.keyBy(0)

如果在Flink中使用嵌套的复杂数据结构,可以通过字段名称指定Key,例如:

class CompelexClass(var nested: NestedClass, var tag: String) {

  def this() { this(null, "") }

}

class NestedClass (

    var id: Int,

    tuple: (Long, Long, String)){

  def this() { this(0, (0, 0, "")) }

}

通过调用“nested”获取整个NestedClass对象里所有的字段,调用“tag”获取CompelexClass中tag字段,调用“nested.id”获取NestedClass中的id字段,调用“nested.tuple._1”获取NestedClass中tuple元祖的第一个字段。由此可以看出,Flink能够支持在复杂数据结构中灵活地获取字段信息,这也是非 Key-Value的数据结构所具有的优势。

(3)通过Key选择器指定

另外一种方式是通过定义Key Selector来选择数据集中的Key,如下代码所示,定义KeySelector,然后复写getKey方法,从Person对象中获取name为指定的Key。

case class Person(name: String, age: Int)

val person= env.fromElements(Person("hello",1), Person("flink",4))

//定义KeySelector,实现getKey方法从case class中获取Key

val keyed: KeyedStream[WC]= person.keyBy(new KeySelector[Person, String]() {

  override def getKey(person: Person): String = person.word

})

5. 输出结果

数据集经过转换操作之后,形成最终的结果数据集,一般需要将数据集输出在外部系统中或者输出在控制台之上。在Flink DataStream和DataSet接口中定义了基本的数据输出方法,例如基于文件输出writeAsText(),基于控制台输出print()等。同时Flink在系统中定义了大量的Connector,方便用户和外部系统交互,用户可以直接通过调用addSink()添加输出系统定义的DataSink类算子,这样就能将数据输出到外部系统。以下实例调用DataStream API中的writeAsText()和print()方法将数据集输出在文件和客户端中。

//将数据输出到文件中

counts.writeAsText("file://path/to/savefile")

//将数据输出控制台

counts.print()

6. 程序触发

所有的计算逻辑全部操作定义好之后,需要调用ExecutionEnvironment的execute()方法来触发应用程序的执行,其中execute()方法返回的结果类型为JobExecutionResult,里面包含了程序执行的时间和累加器等指标。需要注意的是,execute方法调用会因为应用的类型有所不同,DataStream流式应用需要显性地指定execute()方法运行程序,如果不调用则Flink流式程序不会执行,但对于DataSet API输出算子中已经包含对execute()方法的调用,则不需要显性调用execute()方法,否则会出现程序异常。

//调用StreamExecutionEnvironment的execute方法执行流式应用程序

env.execute("App Name");


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。