大数据技术之Flink开发实战

举报
tea_year 发表于 2025/08/31 09:27:33 2025/08/31
【摘要】 Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。由于流处理和批处理所提供的SLA(服务等级...

Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。

Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。

由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。

比较典型的有:实现批处理的开源方案有MapReduce、Spark;实现流处理的开源方案有Storm;Spark的Streaming 其实本质上也是微批处理。

Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:

Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;

批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink 在流式计算方面的优势是开源领域"无人能及"的。

需求

将我们自己开发的Flink任务打包并提交到集群上运行


准备工作

注意:Flink支持如下多个层次的API

1614842027260.png


Flink编码步骤/模型

1.env-准备环境

2.source-加载数据

3.transformation-数据处理转换 [重点]

4.sink-数据输出

5.execute-执行


image-20230607223104552.png


DataStream

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/datastream_api.html

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 演示Flink-DataStream-流批一体API完成批处理WordCount
*/
public class WordCount01 {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流
       //env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动
       //不设置的话默认是流模式e(RuntimeExecutionMode.STREAMING)

       //TODO 2.source-加载数据
       DataStream<String> dataStream = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");

       //TODO 3.transformation-数据转换处理
       //3.1对每一行数据进行分割并压扁
       /*
       public interface FlatMapFunction<T, O> extends Function, Serializable {
           void flatMap(T value, Collector<O> out) throws Exception;
        }
        */
       DataStream<String> wordsDS = dataStream.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
               String[] words = value.split(" ");
               for (String word : words) {
                   out.collect(word);
              }
          }
      });
       //3.2每个单词记为<单词,1>
       /*
       public interface MapFunction<T, O> extends Function, Serializable {
           O map(T value) throws Exception;
        }
        */
       DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String value) throws Exception {
               return Tuple2.of(value, 1);
          }
      });
       //3.3分组
       //注意:DataSet中分组用groupBy,DataStream中分组用keyBy
       //KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = wordAndOneDS.keyBy(0);
       /*
       public interface KeySelector<IN, KEY> extends Function, Serializable {
           KEY getKey(IN value) throws Exception;
       }
        */
       KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
           @Override
           public String getKey(Tuple2<String, Integer> value) throws Exception {
               return value.f0;
          }
      });

       //3.4聚合
       SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

       //TODO 4.sink-数据输出
       result.print();

       //TODO 5.execute-执行
       env.execute();
  }
}

2.打包、上传

3.提交我们自己开发打包的任务

可以使用Session会话模式或任务分离模式

flink run -c day1.WordCount01 /opt/app/wc.jar

6.观察yarn和hdfs


获取主函数参数工具类

ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.has("output")){
path = parameterTool.get("output");
}



DataStream-Lambda表达式

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
* Desc 演示Flink-DataStream-流批一体API完成批处理WordCount
* 使用Java8的lambda表示完成函数式风格的WordCount
*/
public class WordCount02 {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流
       //env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动
       //不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)

       //TODO 2.source-加载数据
       DataStream<String> dataStream = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");

       //TODO 3.transformation-数据转换处理
       //3.1对每一行数据进行分割并压扁
       /*
       public interface FlatMapFunction<T, O> extends Function, Serializable {
           void flatMap(T value, Collector<O> out) throws Exception;
        }
        */
       /*DataStream<String> wordsDS = dataStream.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
               String[] words = value.split(" ");
               for (String word : words) {
                   out.collect(word);
               }
           }
       });*/
       //注意:Java8的函数的语法/lambda表达式的语法: (参数)->{函数体}
       //dataStream.flatMap((value, out) -> Arrays.stream(value.split(" ")).forEach(word->out.collect(word)));
       DataStream<String> wordsDS = dataStream.flatMap(
              (String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
      ).returns(Types.STRING);


       //3.2 每个单词记为<单词,1>
       /*
       public interface MapFunction<T, O> extends Function, Serializable {
           O map(T value) throws Exception;
        }
        */
       /*DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String value) throws Exception {
               return Tuple2.of(value, 1);
           }
       });*/
       DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(
              (String value) -> Tuple2.of(value, 1)
      ).returns(Types.TUPLE(Types.STRING, Types.INT));

       //3.3分组
       //注意:DataSet中分组用groupBy,DataStream中分组用keyBy
       //KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = wordAndOneDS.keyBy(0);
       /*
       public interface KeySelector<IN, KEY> extends Function, Serializable {
           KEY getKey(IN value) throws Exception;
       }
        */
       /*KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
           @Override
           public String getKey(Tuple2<String, Integer> value) throws Exception {
               return value.f0;
           }
       });*/
       KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy((Tuple2<String, Integer> value) -> value.f0);

       //3.4聚合
       SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

       //TODO 4.sink-数据输出
       result.print();

       //TODO 5.execute-执行
       env.execute();
  }
}



总结

本文展示 Flink DataStream API 实现批处理 WordCount 的两种方式。先通过传统函数式编程,定义 FlatMap、Map 等函数完成数据切分、转换、分组聚合。再用 Lambda 表达式简化代码,需显式指定返回类型。还涉及运行模式设置、参数工具使用及任务提交方式,体现流批一体特性与编程灵活性。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。