大数据Flink进阶(七):Flink批和流案例总结

举报
Lansonli 发表于 2023/03/30 08:37:44 2023/03/30
【摘要】 Flink批和流案例总结关于Flink 批数据处理和流式数据处理案例有以下几个点需要注意:一、Flink程序编写流程总结编写Flink代码要符合一定的流程,Flink代码编写流程如下:a. 获取flink的执行环境,批和流不同,Execution Environment。 b. 加载数据数据-- soure。 c. 对加载的数据进行转换-- transformation。 d. 对结果进行保...

Flink批和流案例总结

关于Flink 批数据处理和流式数据处理案例有以下几个点需要注意:

一、Flink程序编写流程总结

编写Flink代码要符合一定的流程,Flink代码编写流程如下:

a. 获取flink的执行环境,批和流不同,Execution Environment。 b. 加载数据数据-- soure。 c. 对加载的数据进行转换-- transformation。 d. 对结果进行保存或者打印-- sink。 e. 触发flink程序的执行 --env.execute()

在Flink批处理过程中不需要执行execute触发执行,在流式处理过程中需要执行env.execute触发程序执行。

二、关于Flink的批处理和流处理上下文环境

创建Flink批和流上下文环境有以下三种方式,批处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalEnvironment localEnv = ExecutionEnvironment.createLocalEnvironment(10);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
ExecutionEnvironment remoteEnv = ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

流处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(5);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

同样在Scala api 中批和流创建Flink 上下文环境也有以上三种方式,在实际开发中建议批处理使用"ExecutionEnvironment.getExecutionEnvironment()"方式创建。流处理使用"StreamExecutionEnvironment.getExecution-Environment()"方式创建。

三、Flink批和流 Java 和 Scala导入包不同

在编写Flink Java api代码和Flink Scala api代码处理批或者流数据时,引入的ExecutionEnvironment或StreamExecutionEnvironment包不同,在编写代码时导入错误的包会导致编程有问题。

批处理不同API引入ExecutionEnvironment如下:

//Flink Java api 引入的包
import org.apache.flink.api.java.ExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.api.scala.ExecutionEnvironment

流处理不同API引入StreamExecutionEnvironment如下:

//Flink Java api 引入的包
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

四、Flink Java Api中创建 Tuple 方式

在Flink Java api中创建Tuple2时,可以通过new Tuple2方式也可以通过Tuple2.of方式,两者本质一样。

五、Flink Scala api需要导入隐式转换

在Flink Scala api中批处理和流处理代码编写过程中需要导入对应的隐式转换来推断函数操作后的类型,在批和流中导入隐式转换不同,具体如下:

//Scala 批处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.api.scala._
//Scala 流处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.streaming.api.scala._

六、关于Flink Java api 中的 returns 方法

Flink Java api中可以使用Lambda表达式,当涉及到使用泛型Java会擦除泛型类型信息,需要最后调用returns方法指定类型,明确声明类型,告诉系统函数生成的数据集或者数据流的类型。

七、批和流对数据进行分组方法不同

批和流处理中都是通过readTextFile来读取数据文件,对数据进行转换处理后,Flink批处理过程中通过groupBy指定按照什么规则进行数据分组,groupBy中可以根据字段位置指定key(例如:groupBy(0)),如果数据是POJO自定义类型也可以根据字段名称指定key(例如:groupBy("name")),对于复杂的数据类型也可以通过定义key的选择器KeySelector来实现分组的key。

Flink流处理过程中通过keyBy指定按照什么规则进行数据分组,keyBy中也有以上三种方式指定分组key,建议使用通过KeySelector来选择key,其他方式已经过时。

八、关于DataSet Api (Legacy)软弃用

Flink架构可以处理批和流,Flink 批处理数据需要使用到Flink中的DataSet API,此API 主要是支持Flink针对批数据进行操作,本质上Flink处理批数据也是看成一种特殊的流处理(有界流),所以没有必要分成批和流两套API,从Flink1.12版本往后,Dataset API 已经标记为Legacy(已过时),已被官方软弃用,官方建议使用Table API 或者SQL 来处理批数据,我们也可以使用带有Batch执行模式的DataStream API来处理批数据,在未来Flink版本中DataSet API 将会被删除。关于这些API 具体使用后续文章会进行讲解。

关于Flink集群提交任务及Flink flink-conf.yaml配置文件在下个章节集群搭建会进行介绍。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。