大数据技术之Flink开发实战
Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。
Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。
由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。
比较典型的有:实现批处理的开源方案有MapReduce、Spark;实现流处理的开源方案有Storm;Spark的Streaming 其实本质上也是微批处理。
Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:
Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;
批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
Flink 在流式计算方面的优势是开源领域"无人能及"的。
需求
将我们自己开发的Flink任务打包并提交到集群上运行
准备工作
注意:Flink支持如下多个层次的API
Flink编码步骤/模型
1.env-准备环境
2.source-加载数据
3.transformation-数据处理转换 [重点]
4.sink-数据输出
5.execute-执行
DataStream
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/datastream_api.html
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 演示Flink-DataStream-流批一体API完成批处理WordCount
*/
public class WordCount01 {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动
//不设置的话默认是流模式e(RuntimeExecutionMode.STREAMING)
//TODO 2.source-加载数据
DataStream<String> dataStream = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 3.transformation-数据转换处理
//3.1对每一行数据进行分割并压扁
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataStream<String> wordsDS = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2每个单词记为<单词,1>
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3分组
//注意:DataSet中分组用groupBy,DataStream中分组用keyBy
//KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = wordAndOneDS.keyBy(0);
/*
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);
//TODO 4.sink-数据输出
result.print();
//TODO 5.execute-执行
env.execute();
}
}
2.打包、上传
3.提交我们自己开发打包的任务
可以使用Session会话模式或任务分离模式
flink run -c day1.WordCount01 /opt/app/wc.jar
6.观察yarn和hdfs
获取主函数参数工具类
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.has("output")){
path = parameterTool.get("output");
}
DataStream-Lambda表达式
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Desc 演示Flink-DataStream-流批一体API完成批处理WordCount
* 使用Java8的lambda表示完成函数式风格的WordCount
*/
public class WordCount02 {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动
//不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)
//TODO 2.source-加载数据
DataStream<String> dataStream = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 3.transformation-数据转换处理
//3.1对每一行数据进行分割并压扁
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
/*DataStream<String> wordsDS = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});*/
//注意:Java8的函数的语法/lambda表达式的语法: (参数)->{函数体}
//dataStream.flatMap((value, out) -> Arrays.stream(value.split(" ")).forEach(word->out.collect(word)));
DataStream<String> wordsDS = dataStream.flatMap(
(String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING);
//3.2 每个单词记为<单词,1>
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});*/
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));
//3.3分组
//注意:DataSet中分组用groupBy,DataStream中分组用keyBy
//KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = wordAndOneDS.keyBy(0);
/*
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
/*KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});*/
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy((Tuple2<String, Integer> value) -> value.f0);
//3.4聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);
//TODO 4.sink-数据输出
result.print();
//TODO 5.execute-执行
env.execute();
}
}
总结
本文展示 Flink DataStream API 实现批处理 WordCount 的两种方式。先通过传统函数式编程,定义 FlatMap、Map 等函数完成数据切分、转换、分组聚合。再用 Lambda 表达式简化代码,需显式指定返回类型。还涉及运行模式设置、参数工具使用及任务提交方式,体现流批一体特性与编程灵活性。
- 点赞
- 收藏
- 关注作者
评论(0)