实时大数据技术flink

举报
tea_year 发表于 2025/08/31 10:00:02 2025/08/31
【摘要】 在 Apache Flink 中,Transformation(转换) 是流处理和批处理作业的核心逻辑单元,负责将输入的数据流(DataStream)或数据集(DataSet,批处理专用) 按照业务规则转换为新的输出数据流 / 数据集。它本质上是 “数据处理逻辑的定义”,Flink 会根据 Transformation 构建执行计划(StreamGraph → JobGraph → Exec...

在 Apache Flink 中,Transformation(转换) 是流处理和批处理作业的核心逻辑单元,负责将输入的数据流(DataStream)或数据集(DataSet,批处理专用) 按照业务规则转换为新的输出数据流 / 数据集。它本质上是 “数据处理逻辑的定义”,Flink 会根据 Transformation 构建执行计划(StreamGraph → JobGraph → ExecutionGraph),最终在集群中分布式执行。

基础操作

需求: 对流数据中的单词进行统计,排除敏感词TMD

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class TransformationDemo01_Basic {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStreamSource<String> socketDS = env.socketTextStream("hadoop10", 8889);
       //TODO 3.transformation-数据转换处理
       SingleOutputStreamOperator<Tuple2<String, Integer>> result = socketDS.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
               String[] words = value.split(" ");
               for(String word:words){
                   out.collect(word)
              }
          }
      }).filter(new FilterFunction<String>() {
           @Override
           public boolean filter(String value) throws Exception {
               /*if(value.equals("TMD")){
                   return false;
               }
              return true;*/
               return !value.equals("TMD");
          }
      }).map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String value) throws Exception {
               return Tuple2.of(value, 1);

          }
      })//.keyBy(t->t.f0);
              .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                   @Override
                   public String getKey(Tuple2<String, Integer> value) throws Exception {
                       return value.f0;
                  }
              })//.sum(1);
               /*public interface ReduceFunction<T> extends Function, Serializable {
                   T reduce(T value1, T value2) throws Exception;
               }*/
              .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                   //t1和t2就是需要进行聚合的数据,如进来的t1为(hello,2),t2为(hello,1),聚合结果为(hello,3)
                   @Override
                   public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                       return Tuple2.of(t1.f0, t1.f1 + t2.f1);
                  }
              });

       //TODO 4.sink-数据输出
       result.print();

       //TODO 5.execute-执行
       env.execute();
  }
}


union和connect

union和connect-合并和连接

union可以合并多个同类型的流

connect可以连接2个不同类型的流(最后需要处理后再输出)

package transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
* Desc 演示Flink-DataStream-Transformation
*/
public class TransformationDemo02_Union_Connect {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
       DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
       DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);

       //TODO 3.transformation-数据转换处理
       DataStream<String> result1 = ds1.union(ds2);//两个流直接合并成一个流
       //ds1.union(ds3);//报错,union只能合并同类型的流
       //DataStream<String> result = result1.union(ds2);//union可以合并多个流


       ConnectedStreams<String, Long> result2 = ds1.connect(ds3);//connect可以连接不同类型的流
       //ConnectedStreams<String, String> result2 = ds1.connect(ds2);
       //result2.connect(ds2);//报错,connect只能连接2个流

       //TODO 4.sink-数据输出
       result1.print();//union之后的流可以直接输出
       //result2.print();//报错,connect连接的流不能支持输出
       /*public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
      OUT map1(IN1 value) throws Exception;
      OUT map2(IN2 value) throws Exception;
       }*/
       SingleOutputStreamOperator<String> result3 = result2.map(new CoMapFunction<String, Long, String>() {
           @Override
           public String map1(String value) throws Exception {
               return "String:" + value;
          }

           @Override
           public String map2(Long value) throws Exception {
               return "Long:" + value;
          }
      });
       result3.print();

       //TODO 5.execute-执行
       env.execute();
  }
}


Side Outputs侧道输出

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
* Desc 演示Flink-DataStream-Transformation
*/
public class TransformationDemo03_SideOutputs {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
       
       //TODO 3.transformation-数据转换处理
       //创建2个侧道输出标签-用来存放奇数和偶数
       OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));
       OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数",TypeInformation.of(Integer.class));
       //对流中的数据进行处理判断并贴上不同的标签中
       SingleOutputStreamOperator<Integer> processDS = ds.process(new ProcessFunction<Integer, Integer>() {
           @Override
           public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
               if (value % 2 == 0) {//偶数
                   ctx.output(tag_even, value);
              } else {//奇数
                   ctx.output(tag_odd, value);
              }
          }
      });
       //取出贴上了偶数标签的数据
       DataStream<Integer> evenDS = processDS.getSideOutput(tag_even);
       //取出贴上了奇数标签的数据
       DataStream<Integer> oddDS = processDS.getSideOutput(tag_odd);


       //TODO 4.sink-数据输出
       evenDS.print("偶数:");
       oddDS.print("奇数:");

       //TODO 5.execute-执行
       env.execute();
  }
}



分区

重平衡分区-掌握

数据倾斜

1614928256752.png

可以怎么处理? -- key加随机前后缀/reparation/coalsce/自定义分区.....

在Flink中很简单直接调用一个API:rebalance即可,会尽量的让数据重新平衡/均匀分布

1614928416676.png

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformationDemo04_Rebalance {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<Long> longDS = env.fromSequence(0, 100);
       //下面的操作相当于将数据随机分配给各个分区/线程执行,有可能出现数据倾斜
       DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
           @Override
           public boolean filter(Long num) throws Exception {
               return num > 10;
          }
      });

       //TODO 3.transformation-数据转换处理
       //result1有可能出现数据倾斜
       DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
           @Override
           public Tuple2<Integer, Integer> map(Long value) throws Exception {
               //为了获取分区编号,需要使用Rich
               //return Tuple2.of(分区编号,1);
               int partitionId = getRuntimeContext().getIndexOfThisSubtask();
               return Tuple2.of(partitionId, 1);
          }
      }).keyBy(t -> t.f0).sum(1);//求的每个分区的数据量/数据条数

       //执行了rebalance之后再去求每个分区的数据量/数据条数,就会均匀的分布
       DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
              .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                   @Override
                   public Tuple2<Integer, Integer> map(Long value) throws Exception {
                       //为了获取分区编号,需要使用Rich
                       //return Tuple2.of(分区编号,1);
                       int partitionId = getRuntimeContext().getIndexOfThisSubtask();
                       return Tuple2.of(partitionId, 1);
                  }
              }).keyBy(t -> t.f0).sum(1);//求的每个分区的数据量/数据条数


       //TODO 4.sink-数据输出
       //result1.print("result1");
       result2.print("result2");

       //TODO 5.execute-执行
       env.execute();
  }
}


其他分区

1614929242516.png

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class TransformationDemo05_Other {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
       //TODO 3.transformation-数据转换处理
       SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
               String[] words = value.split(" ");
               for (String word : words) {
                   out.collect(Tuple2.of(word, 1));
              }
          }
      });

       //各种分区
       DataStream<Tuple2<String, Integer>> result1 = wordAndOneDS.global();
       DataStream<Tuple2<String, Integer>> result2 = wordAndOneDS.broadcast();
       DataStream<Tuple2<String, Integer>> result3 = wordAndOneDS.forward();
       DataStream<Tuple2<String, Integer>> result4 = wordAndOneDS.shuffle();
       DataStream<Tuple2<String, Integer>> result5 = wordAndOneDS.rebalance();
       DataStream result6 = wordAndOneDS.partitionCustom(new MyPartitioner(), new KeySelector<Tuple2<String, Integer>, String>() {
           @Override
           public String getKey(Tuple2<String, Integer> value) throws Exception {
               return value.f0;
          }
      });

       //TODO 4.sink-数据输出
       result6.print();

       //TODO 5.execute-执行
       env.execute();
  }
   /*public interface Partitioner<K> extends java.io.Serializable, Function {
  int partition(K key, int numPartitions);
   }*/
   public static class MyPartitioner<String> implements Partitioner<String> {
       @Override
       public int partition(String key, int numPartitions) {
           //int partitionNum = Math.abs(key.hashCode()) % numPartitions;//自定义的hash分区
           if(key.equals("hello")){ //city.equals("北京") || 河南 || 山东 {random.nextInt(10)} else{10}
               return 0;
          }else {
               return 1;
          }
           //或者你自己想怎么分就怎么分
      }
  }
}

总结

Flink 的 Transformation 是数据处理核心,基于惰性执行构建处理逻辑,按功能可分为多类并通过实例体现应用场景。
基础操作中,flatMap拆分数据、filter排除敏感词(如 TMD)、map转为键值对、keyBy按 key 分区后,用reduce或sum聚合,实现带过滤的单词统计。
多流处理里,union合并多个同类型流(如两个字符串流),connect连接两个不同类型流(如字符串与长整型流),需通过CoMapFunction分别处理后统一输出格式。
侧输出流(Side Outputs)借助ProcessFunction和OutputTag,将数据按条件分流(如奇偶分拆),实现主流外的额外数据输出。
分区转换可解决数据倾斜,rebalance轮询均匀分配数据;另有global(全发至首个分区)、broadcast(广播至所有分区)、shuffle(随机分区)等,还可通过Partitioner自定义分区规则(如指定 “hello” 到 0 号分区),适配不同分布式处理需求。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。