Flink实例:Flink流处理程序编程模型

举报
TiAmoZhang 发表于 2023/05/24 09:49:46 2023/05/24
【摘要】 在深入了解Flink实时数据处理程序的开发之前,先通过一个简单示例来了解使用Flink的DataStream API构建有状态流应用程序的过程。

01、流数据类型

Flink以一种独特的方式处理数据类型和序列化,它包含自己的类型描述符、泛型类型提取和类型序列化框架。基于Java和Scala语言,Flink实现了一套自己的一套类型系统,它支持很多种类的类型,包括

(1) 基本类型。

(2) 数组类型。

(3) 复合类型。

(4) 辅助类型。

(5) 通用类型。

详细的Flink类型系统如图1所示。

640.png

■ 图1 Flink类型系统

Flink针对Java和Scala的DataStream API要求流数据的内容必须是可序列化的。Flink内置了以下类型数据的序列化器:

(1) 基本数据类型:String、Long、Integer、Boolean、Array。

(2) 复合数据类型:Tuple、POJO、Scala case class。

对于其他类型,Flink会返回Kryo。也可以在Flink中使用其他序列化器。Avro尤其得到了很好的支持。

1.java DataStream API使用的流数据类型

对于Java API,Flink定义了自己的Tuple1到Tuple25类型来表示元组类型,代码如下:

Tuple2<String, Integer> person = new Tuple2<>("王老五", 35);

//索引基于0
String name = person.f0;
Integer age = person.f1;

在Java中,POJO(plain old Java Object)是这样的Java类:

(1) 有一个无参的默认构造器。

(2) 所有的字段要么是public的,要么有一个默认的getter和setter。

例如,定义一个名为Person的POJO类,代码如下:

//定义一个Person POJO类
public class Person{
    public String name;
    public Integer age;

    public Person() {};

    public Person(String name, Integer age) {
        this.name = name;
        this.age = age;
    };
}

//创建一个实例
Person person = new Person("王老五", 35);


2.Scala DataStream API使用的流数据类型

对于元组,使用Scala自己的Tuple类型就好,代码如下:

val person = ("王老五", 35)

//索引基于1
val name = person._1
val age = person._2


对于对象类型,使用case class(相当于Java中的JavaBean),代码如下:

case class Person(name: String, age:Int)

val person = Person("王老五", 35)

3.Flink类型系统

对于创建的任意一个POJO类型,看起来它是一个普通的Java Bean,在Java中,可以使用Class来描述该类型,但其实在Flink引擎中,它被描述为PojoTypeInfo,而PojoTypeInfo是TypeInformation的子类。

TypeInformation是Flink类型系统的核心类。Flink使用TypeInformation来描述所有Flink支持的数据类型,就像Java中的Class类型一样。每种Flink支持的数据类型都对应的是TypeInformation的子类。例如POJO类型对应的是PojoTypeInfo、基础数据类型数组对应的是BasicArrayTypeInfo、Map类型对应的是MapTypeInfo、值类型对应的是ValueTypeInfo。

除了对类型的描述,TypeInformation还提供了序列化的支持。在TypeInformation中有一种方法:createSerializer方法,它用来创建序列化器,序列化器中定义了一系列的方法,其中,通过serialize和deserialize方法,可以将指定类型进行序列化,并且Flink的这些序列化器会以稠密的方式来将对象写入内存中。Flink中也提供了非常丰富的序列化器。在我们基于Flink类型系统支持的数据类型进行编程时,Flink在运行时会推断出数据类型的信息,程序员在基于Flink编程时,几乎是不需要关心类型和序列化的。

4.类型与Lambda表达式支持

在编译时,编译器能够从Java源代码中读取完整的类型信息,并强制执行类型的约束,但生成class字节码时,会将参数化类型信息删除。这就是类型擦除。类型擦除可以确保不会为泛型创建新的Java类,泛型是不会产生额外的开销的。也就是说,泛型只是在编译器编译时能够理解该类型,但编译后执行时,泛型是会被擦除掉的。

为了全球说明,请看下面的代码:

public static <T> boolean hasItems(T [] items, T item){
    for (T i : items){
         if(i.equals(item)){
              return true;
         }
    }
    return false;
}


以上是一段Java的泛型方法,但在编译后,编译器会将未绑定类型的T擦除掉,替换为Object。也就是编译之后的代码如下:

public static Object boolean hasItems(Object [] items, Object item){
    for (Object i : items){
         if(i.equals(item)){
              return true;
         }
    }
    return false;
}


泛型只是能够防止在运行时出现类型错误,但运行时会出现以下异常,而且Flink以非常友好的方式提示:

could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.


就是因为Java编译器类型擦除的原因,所以Flink根本无法推断出来算子(例如flatMap)要输出的类型是什么,所以在Flink中使用Lambda表达式时,为了防止因类型擦除而出现运行时错误,需要指定TypeInformation或者TypeHint。

创建TypeInformation,代码如下:

.returns(TypeInformation.of(String.class))


创建TypeHint,代码如下:

.returns(new TypeHint<String>() {})

02、流应用程序实现

Flink程序的基本构建块是stream和transformation(流和转换)。从概念上讲,stream是数据记录的流(可能永远不会结束),transformation是一个运算,它接受一个或多个流作为输入,经过处理/计算后生成一个或多个输出流。

下面实现一个完整的、可工作的Flink流应用程序示例。

【示例3-1】将有关人员的记录流作为输入,并从中筛选出未成年人信息。

Scala代码如下:

(1) 在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板(Flink项目创建过程,可参见2.2节)。

(2) 设置依赖。在pom.xml文件中添加如下依赖内容:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>


(3) 创建主程序StreamingJobDemo1,编辑流处理代码如下:

//第3章/StreamingJobDemo1.scala

import org.apache.flink.streaming.api.scala._

object StreamingJobDemo1 {
//定义事件类
  case class Person(name:String, age:Integer)

  def main(args: Array[String]) {

//设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//读取数据源,构造数据流
    val peoples = env.fromElements(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35)
    )

//对数据流执行filter转换
    val adults = peoples.filter(_.age>18)

//输出结果
    adults.print

//执行
    env.execute("Flink Streaming Job")
  }
}


执行以上代码,输出结果如下:

7> Person(张三,21)
1> Person(王老五,35)


Java代码如下:

(1) 在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-Java项目模板(Flink项目创建过程,可参见2.2节)。

(2) 设置依赖。在pom.xml文件中添加如下依赖内容:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-Java</artifactId>
  <version>1.13.2</version>
  <scope>provided</scope>
</dependency>
dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-Java_2.12</artifactId>
  <version>1.13.2</version>
  <scope>provided</scope>
</dependency>


(3) 创建一个POJO类,用来表示流中的数据,代码如下:

//第3章/Person.java

//POJO类,表示人员信息实体
public class Person {
  public String name; //存储姓名
  public Integer age; //存储年龄

  //空构造器
  public Person() {};

  //构造器,初始化属性
  public Person(String name, Integer age) {
    this.name = name;
    this.age = age;
  };

  //用于调试时输出信息
  public String toString() {
    return this.name.toString() + ": age " + this.age.toString();
  };
}


(4) 打开项目中的StreamingJob对象文件,编辑流处理代码如下:

//第3章/StreamingJobDemo1.java

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class StreamingJobDemo1 {

  public static void main(String[] args) throws Exception {
  //获得流执行环境
         final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

//读取数据源,构造DataStream
         DataStream<Person> personDS = env.fromElements(
    new Person("张三", 21),
    new Person("李四", 16),
    new Person("王老五", 35)
         );

//执行转换运算(这里是过滤年龄不小于18岁的人)
//注意,这里使用了匿名函数
         DataStream<Person> adults = personDS.filter(
new FilterFunction<Person>() {
              @Override
      public boolean filter(Person person) throws Exception {
      return person.age >= 18;
      }
          });

  //将结果输出到控制台
          adults.print();

//触发流程序开始执行
          env.execute("stream demo");
  }
}


(5) 执行以上程序,输出结果如下。

张三: age 21
王老五: age 35


注意/


Flink将批处理程序作为流程序的一种特殊情况执行,其中流是有界的(有限数量的元素)。DataSet在内部被视为数据流,因此,上述概念同样适用于批处理程序,也适用于流程序,只有少数例外:

(1) 批处理程序的容错不使用检查点。错误恢复是通过完全重放流实现的,这使恢复的成本更高,但是因为它避免了检查点,所以使常规处理更轻量。

(2) DataSet API中的有状态运算使用简化的in-memory/out-of-核数据结构,而不是key-value索引。

(3) DataSet API引入了特殊的同步(基于superstep)迭代,这只可能在有界流上实现。

03、流应用程序剖析

所有的Flink应用程序都以特定的步骤来工作,这些工作步骤如图2所示。

640.png

■ 图2 Flink应用程序工作步骤

也就是说,每个Flink程序都由相同的基本部分组成:

(1) 获取一个执行环境。

(2) 加载/创建初始数据。

(3) 指定对该数据的转换。

(4) 指定计算结果放在哪里。

(5) 触发程序执行。

1.获取一个执行环境

Flink应用程序从其main()方法中生成一个或多个Flink作业(job)。这些作业可以在本地JVM(LocalEnvironment)中执行,也可以在具有多台机器的集群的远程设置中执行(RemoteEnvironment)。对于每个程序,ExecutionEnvironment提供了控制作业执行(例如设置并行性或容错/检查点参数)和与外部环境交互(数据访问)的方法。

每个Flink应用程序都需要一个执行环境(本例中为env)。流应用程序需要的执行环境使用的是StreamExecutionEnvironment。为了开始编写Flink程序,用户首先需要获得一个现有的执行环境,如果没有,就需要先创建一个。根据目的不同,Flink支持以下几种方式:

(1) 获得一个已经存在的Flink环境。

(2) 创建本地环境。

(3) 创建远程环境。

Flink流程序的入口点是StreamExecutionEnvironment类的一个实例,它定义了程序执行的上下文。StreamExecutionEnvironment是所有Flink程序的基础。可以通过一些静态方法获得一个StreamExecutionEnvironment的实例,代码如下:
StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.createLocalEnvironment()
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)


要获得执行环境,通常只需调用getExecutionEnvironment()方法。这将根据上下文选择正确的执行环境。如果正在IDE中的本地环境上执行,则它将启动一个本地执行环境。如果是从程序中创建了一个JAR文件,并通过命令行调用它,则Flink集群管理器将执行main()方法,getExecutionEnvironment()将返回用于在集群上以分布式方式执行程序的执行环境。

在上面的示例程序中,使用以下语句来获得流程序的执行环境。

Scala代码如下:

//设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment


Java代码如下:

//获得流执行环境
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();


StreamExecutionEnvironment包含ExecutionConfig,可使用它为运行时设置特定于作业的配置值。例如,如果要设置自动水印发送间隔,可以像下面这样在代码进行配置。

Scala代码如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(long milliseconds)


Java代码如下:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(long milliseconds);


2.加载/创建初始数据

执行环境可以从多种数据源读取数据,包括文本文件、CSV文件、Socket套接字数据等,也可以使用自定义的数据输入格式。例如,要将文本文件读取为行序列,代码如下:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file://path/to/file");


数据被逐行读取内存后,Flink会将它们组织到DataStream中,这是Flink中用来表示流数据的特殊类。

示例程序【示例1】中,使用fromElements()方法读取集合数据,并将读取的数据存储为DataStream类型。

Scala代码如下:

//读取数据源,构造数据流
val personDS = env.fromElements(
      Person("张三", 21),
      Person("李四", 16),
      Person("王老五", 35)
    )


Java代码如下:

//读取数据源,构造DataStream
DataStream<Person> personDS = env.fromElements(
        new Person("张三", 21),
        new Person("李四", 16),
        new Person("王老五", 35));


3.对数据进行转换

每个Flink程序都对分布式数据集合执行转换。Flink的DataStream API提供了多种数据转换功能,包括过滤、映射、连接、分组和聚合。例如,下面是一个map转换应用,通过将原始集合中的每个字符串转换为整数来创建一个新的DataStream,代码如下:

示例程序【示例1】中使用了filter过滤转换,将原始数据集转换为只包含成年人信息的新DataStream流,代码如下:

DataStream<String> input = env.fromElements("12","3","25","5","32","6");

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});


Scala代码如下:

//对数据流执行filter转换
val adults = personDS.filter(_.age>18)


Java代码如下:

//对数据流执行filter转换
DataStream<Person> adults = flintstones.filter(
    new FilterFunction<Person>() {
        @Override
        public boolean filter(Person person) throws Exception {
            return person.age >= 18;
        }
    });


这里不必了解每个转换的具体含义,后面我们会详细介绍它们。需要强调的是,Flink中的转换是惰性的,在调用sink操作之前不会真正执行。

4.指定计算结果放在哪里

一旦有了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统。例如,将计算结果打印输出到屏幕上。

Scala代码如下:

//输出结果
adults.print


Java代码如下:

//输出结果
adults.print();


Flink中的接收器(sink)操作触发流的执行,以生成程序所需的结果,例如将结果保存到文件系统或将其打印到标准输出。上面的示例使用adults.print()将结果打印到任务管理器日志中(在IDE中运行时,任务管理器日志将显示在IDE的控制台中)。这将对流的每个元素调用其toString()方法。

5.触发流程序执行

一旦写好了程序处理逻辑,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。所有的Flink程序都是延迟执行的:当程序的主方法执行时,数据加载和转换不会直接发生,而是创建每个运算并添加到程序的执行计划中。当执行环境上的execute()调用显式触发执行时,这些操作才实际上被执行。程序是在本地执行还是提交到集群中执行取决于ExecutionEnvironment的类型。

延迟计算可以让用户构建复杂的程序,然后Flink将其作为一个整体计划的单元执行。在示例程序【示例3-1】中,使用如下代码来触发流处理程序的执行。

Scala代码如下:

//触发流程序执行
env.execute("Flink Streaming Job") //参数是程序名称,会显示在Web UI界面上


Java代码如下:

//触发流程序执行
env.execute("Flink Streaming Job"); //参数是程序名称,会显示在Web UI界面上


在应用程序中执行的DataStream API调用将构建一个附加到StreamExecutionEnvironment的作业图(Job Graph)。调用env.execute()时,此图被打包并发送到Flink Master,该Master并行化作业并将其片段分发给TaskManagers以供执行。作业的每个并行片段将在一个task slot(任务槽)中执行,如图3所示。

640.png

图3 Flink流应用程序执行原理

这个分布式运行时要求Flink应用程序是可序列化的。它还要求集群中的每个节点都可以使用所有依赖项。

StreamExecutionEnvironment上的execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。注意,如果不调用execute(),应用程序将不会运行。

如果不想等待作业完成,可以通过调用StreamExecutionEnvironment上的executeAysnc()来触发异步作业执行。它将返回一个JobClient,可以使用它与刚才提交的作业进行通信。例如,下面的示例代码演示了如何通过executeAsync()实现execute()的语义。

Scala代码如下:

val jobClient = evn.executeAsync
val jobExecutionResult =
jobClient.getJobExecutionResult(userClassloader).get


Java代码如下:

final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult =
jobClient.getJobExecutionResult(userClassloader).get();

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。