实时大数据技术flink
【摘要】 在 Apache Flink 中,Transformation(转换) 是流处理和批处理作业的核心逻辑单元,负责将输入的数据流(DataStream)或数据集(DataSet,批处理专用) 按照业务规则转换为新的输出数据流 / 数据集。它本质上是 “数据处理逻辑的定义”,Flink 会根据 Transformation 构建执行计划(StreamGraph → JobGraph → Exec...
在 Apache Flink 中,Transformation(转换) 是流处理和批处理作业的核心逻辑单元,负责将输入的数据流(DataStream)或数据集(DataSet,批处理专用) 按照业务规则转换为新的输出数据流 / 数据集。它本质上是 “数据处理逻辑的定义”,Flink 会根据 Transformation 构建执行计划(StreamGraph → JobGraph → ExecutionGraph),最终在集群中分布式执行。
基础操作
需求: 对流数据中的单词进行统计,排除敏感词TMD
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformationDemo01_Basic {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStreamSource<String> socketDS = env.socketTextStream("hadoop10", 8889);
//TODO 3.transformation-数据转换处理
SingleOutputStreamOperator<Tuple2<String, Integer>> result = socketDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for(String word:words){
out.collect(word)
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
/*if(value.equals("TMD")){
return false;
}
return true;*/
return !value.equals("TMD");
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})//.keyBy(t->t.f0);
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})//.sum(1);
/*public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}*/
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//t1和t2就是需要进行聚合的数据,如进来的t1为(hello,2),t2为(hello,1),聚合结果为(hello,3)
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
});
//TODO 4.sink-数据输出
result.print();
//TODO 5.execute-执行
env.execute();
}
}
union和connect
union和connect-合并和连接
union可以合并多个同类型的流
connect可以连接2个不同类型的流(最后需要处理后再输出)
package transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* Desc 演示Flink-DataStream-Transformation
*/
public class TransformationDemo02_Union_Connect {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
//TODO 3.transformation-数据转换处理
DataStream<String> result1 = ds1.union(ds2);//两个流直接合并成一个流
//ds1.union(ds3);//报错,union只能合并同类型的流
//DataStream<String> result = result1.union(ds2);//union可以合并多个流
ConnectedStreams<String, Long> result2 = ds1.connect(ds3);//connect可以连接不同类型的流
//ConnectedStreams<String, String> result2 = ds1.connect(ds2);
//result2.connect(ds2);//报错,connect只能连接2个流
//TODO 4.sink-数据输出
result1.print();//union之后的流可以直接输出
//result2.print();//报错,connect连接的流不能支持输出
/*public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}*/
SingleOutputStreamOperator<String> result3 = result2.map(new CoMapFunction<String, Long, String>() {
@Override
public String map1(String value) throws Exception {
return "String:" + value;
}
@Override
public String map2(Long value) throws Exception {
return "Long:" + value;
}
});
result3.print();
//TODO 5.execute-执行
env.execute();
}
}
Side Outputs侧道输出
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* Desc 演示Flink-DataStream-Transformation
*/
public class TransformationDemo03_SideOutputs {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//TODO 3.transformation-数据转换处理
//创建2个侧道输出标签-用来存放奇数和偶数
OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));
OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数",TypeInformation.of(Integer.class));
//对流中的数据进行处理判断并贴上不同的标签中
SingleOutputStreamOperator<Integer> processDS = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {//偶数
ctx.output(tag_even, value);
} else {//奇数
ctx.output(tag_odd, value);
}
}
});
//取出贴上了偶数标签的数据
DataStream<Integer> evenDS = processDS.getSideOutput(tag_even);
//取出贴上了奇数标签的数据
DataStream<Integer> oddDS = processDS.getSideOutput(tag_odd);
//TODO 4.sink-数据输出
evenDS.print("偶数:");
oddDS.print("奇数:");
//TODO 5.execute-执行
env.execute();
}
}
分区
重平衡分区-掌握
数据倾斜
可以怎么处理? -- key加随机前后缀/reparation/coalsce/自定义分区.....
在Flink中很简单直接调用一个API:rebalance即可,会尽量的让数据重新平衡/均匀分布
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationDemo04_Rebalance {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<Long> longDS = env.fromSequence(0, 100);
//下面的操作相当于将数据随机分配给各个分区/线程执行,有可能出现数据倾斜
DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long num) throws Exception {
return num > 10;
}
});
//TODO 3.transformation-数据转换处理
//result1有可能出现数据倾斜
DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
//为了获取分区编号,需要使用Rich
//return Tuple2.of(分区编号,1);
int partitionId = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(partitionId, 1);
}
}).keyBy(t -> t.f0).sum(1);//求的每个分区的数据量/数据条数
//执行了rebalance之后再去求每个分区的数据量/数据条数,就会均匀的分布
DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
//为了获取分区编号,需要使用Rich
//return Tuple2.of(分区编号,1);
int partitionId = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(partitionId, 1);
}
}).keyBy(t -> t.f0).sum(1);//求的每个分区的数据量/数据条数
//TODO 4.sink-数据输出
//result1.print("result1");
result2.print("result2");
//TODO 5.execute-执行
env.execute();
}
}
其他分区
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformationDemo05_Other {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
//TODO 3.transformation-数据转换处理
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//各种分区
DataStream<Tuple2<String, Integer>> result1 = wordAndOneDS.global();
DataStream<Tuple2<String, Integer>> result2 = wordAndOneDS.broadcast();
DataStream<Tuple2<String, Integer>> result3 = wordAndOneDS.forward();
DataStream<Tuple2<String, Integer>> result4 = wordAndOneDS.shuffle();
DataStream<Tuple2<String, Integer>> result5 = wordAndOneDS.rebalance();
DataStream result6 = wordAndOneDS.partitionCustom(new MyPartitioner(), new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//TODO 4.sink-数据输出
result6.print();
//TODO 5.execute-执行
env.execute();
}
/*public interface Partitioner<K> extends java.io.Serializable, Function {
int partition(K key, int numPartitions);
}*/
public static class MyPartitioner<String> implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
//int partitionNum = Math.abs(key.hashCode()) % numPartitions;//自定义的hash分区
if(key.equals("hello")){ //city.equals("北京") || 河南 || 山东 {random.nextInt(10)} else{10}
return 0;
}else {
return 1;
}
//或者你自己想怎么分就怎么分
}
}
}
总结
Flink 的 Transformation 是数据处理核心,基于惰性执行构建处理逻辑,按功能可分为多类并通过实例体现应用场景。
基础操作中,flatMap拆分数据、filter排除敏感词(如 TMD)、map转为键值对、keyBy按 key 分区后,用reduce或sum聚合,实现带过滤的单词统计。
多流处理里,union合并多个同类型流(如两个字符串流),connect连接两个不同类型流(如字符串与长整型流),需通过CoMapFunction分别处理后统一输出格式。
侧输出流(Side Outputs)借助ProcessFunction和OutputTag,将数据按条件分流(如奇偶分拆),实现主流外的额外数据输出。
分区转换可解决数据倾斜,rebalance轮询均匀分配数据;另有global(全发至首个分区)、broadcast(广播至所有分区)、shuffle(随机分区)等,还可通过Partitioner自定义分区规则(如指定 “hello” 到 0 号分区),适配不同分布式处理需求。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)