实时大数据技术flink Source处理

举报
tea_year 发表于 2025/08/31 09:37:53 2025/08/31
【摘要】 Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。由于流处理和批处理所提供的SLA(服务等级...

Flink起源于一个名为Stratosphere的研究项目,目的是建立下一代大数据分析平台,于2014年4月16日成为Apache孵化器项目。

Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。

由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。

比较典型的有:实现批处理的开源方案有MapReduce、Spark;实现流处理的开源方案有Storm;Spark的Streaming 其实本质上也是微批处理。

Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:

Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;

批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink 在流式计算方面的优势是开源领域"无人能及"的。

一、Source

Collection

一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合); # 相当于 sc.makeRDD(list)
3.env.fromSequence(开始,结束);
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;


public class SourceDemo01_Collection {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       //1.env.fromElements(可变参数);
       //2.env.fromColletion(各种集合);
       //3.env.generateSequence(开始,结束);
       //4.env.fromSequence(开始,结束);
       DataStream<String> ds1 = env.fromElements("hello hadoop spark", "hello hadoop spark", "hello hadoop", "hello");
       DataStream<String> ds2 = env.fromCollection(Arrays.asList("hello hadoop spark", "hello hadoop spark", "hello hadoop", "hello"));
       DataStream<Long> ds3 = env.fromSequence(1, 10);

       //TODO 3.transformation-数据转换处理
       //TODO 4.sink-数据输出
       ds1.print("ds1");
       ds2.print("ds2");
       ds3.print("ds3");
       //TODO 5.execute-执行
       env.execute();
  }
}


File

env.readTextFile(本地/HDFS文件/文件夹);

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceDemo02_File {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       //env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
       DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
       DataStream<String> ds2 = env.readTextFile("data/input/dir");
       DataStream<String> ds3 = env.readTextFile("data/input/wordcount.txt.gz");

       //TODO 3.transformation-数据转换处理
       //TODO 4.sink-数据输出
       ds1.print("ds1");
       ds2.print("ds2");
       ds3.print("ds3");
       
       //TODO 5.execute-执行
       env.execute();
  }
}


Socket

yum install nc

nc -lk 8888

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceDemo03_Socket {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<String> socketDS = env.socketTextStream("hadoop10", 8889);

       //TODO 3.transformation-数据转换处理
       //3.1对每一行数据进行分割并压扁
       DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
               String[] words = value.split(" ");
               for (String word : words) {
                   out.collect(word);
              }
          }
      });
       //3.2每个单词记为<单词,1>
       DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String value) throws Exception {
               return Tuple2.of(value, 1);
          }
      });
       //3.3分组
       KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
           @Override
           public String getKey(Tuple2<String, Integer> value) throws Exception {
               return value.f0;
          }
      });

       //3.4聚合
       SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

       //TODO 4.sink-数据输出
       result.print();

       //TODO 5.execute-执行
       env.execute();
  }
}


自定义Source

自定义Source随机生成数据

用户自定义的数据源需要实现下面的任一接口

SourceFunction:非并行数据源(并行度只能=1)
RichSourceFunction:多功能非并行数据源(并行度只能=1)
ParallelSourceFunction:并行数据源(并行度能够>=1)
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)

需求:

每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
要求:
- 随机生成订单ID(UUID)
- 随机生成用户ID(0-2)
- 随机生成订单金额(0-100)
- 时间戳为当前系统时间

代码实现

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;


public class SourceDemo04_Customer {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(1);

       //TODO 3.transformation-数据转换处理
       //TODO 4.sink-数据输出
       orderDS.print();

       //TODO 5.execute-执行
       env.execute();
  }

   //自定义source完成每隔1s生产一条订单信息
   public static class MyOrderSource extends RichParallelSourceFunction<Order>{
       private Boolean flag = true;
       //启动后需要一直运行的方法
       @Override
       public void run(SourceContext<Order> ctx) throws Exception {
           Random ran = new Random();
           while (flag){
               //每隔1s随机生成一条订单信息
               String id = UUID.randomUUID().toString();
               int uid = ran.nextInt(3);
               int money = ran.nextInt(101);
               long createTime = System.currentTimeMillis();
               ctx.collect(new Order(id,uid,money,createTime));
               Thread.sleep(1000);
          }
      }

       //接收到cancel命令时执行的方法
       @Override
       public void cancel() {
           flag = false;
      }
  }

   @Data
   @NoArgsConstructor
   @AllArgsConstructor
   public static class Order {
       private String id;
       private Integer userId;
       private Integer money;
       private Long createTime;

  }
}


注意:使用lombok需要导入依赖,导包,还需要安装插件

1614914646768.png


自定义Source加载MySQL数据

需求: 使用Flink自定义数据源,每隔2s加载一次MySQL表数据

CREATE TABLE `t_student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
INSERT INTO `t_student` VALUES ('6', 'rose', '20');


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


public class SourceDemo05_Customer_MySQL {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

       //TODO 2.source-加载数据
       DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);

       //TODO 3.transformation-数据转换处理
       //TODO 4.sink-数据输出
       studentDS.print();

       //TODO 5.execute-执行
       env.execute();
  }

   //自定义source完成每隔2s查询MySQL
   public static class MySQLSource  extends RichParallelSourceFunction<Student>{
       Connection conn = null;
       PreparedStatement ps = null;
       //连接JDBC一次
       @Override
       public void open(Configuration parameters) throws Exception {
           //加载驱动在DriverManager源码已经有了不用写了
           conn = DriverManager.getConnection("jdbc:mysql://hadoop10:3306/test1", "root", "123456");
           ps = conn.prepareStatement("select `id`, `name`, `age` from t_student");
      }

       private Boolean flag = true;
       @Override
       public void run(SourceContext<Student> ctx) throws Exception {
           while (flag){
               //每隔2s查询一次MySQL表
               ResultSet rs = ps.executeQuery();
               //处理结果集
               while (rs.next()){
                   int id = rs.getInt("id");
                   String name = rs.getString("name");
                   int age = rs.getInt("age");
                   ctx.collect(new Student(id,name,age));
              }
               Thread.sleep(2000);
          }
      }

       @Override
       public void cancel() {
           flag = false;
      }
       //关闭JDBC一次
       @Override
       public void close() throws Exception {
           if (conn != null) conn.close();
           if (ps != null) ps.close();
      }

  }


   @Data
   @NoArgsConstructor
   @AllArgsConstructor
   public static class Student {
       private Integer id;
       private String name;
       private Integer age;
  }
}


总结

Flink源于Stratosphere项目,2014年成为Apache孵化器项目,其核心是基于流式执行模型统一流批处理——流处理对应无界数据流,批处理为特殊有界流处理,区别于传统分框架实现方案。 Source作为Flink数据输入层,提供多类数据源:Collection(fromElements/fromCollection/fromSequence)适用于测试编造数据;File(readTextFile)支持本地、HDFS文件及压缩文件读取;Socket(socketTextStream)可实时接收网络流数据,常搭配计算逻辑实现实时处理(如Socket WordCount)。 自定义Source需实现对应接口:SourceFunction/RichSourceFunction(非并行,并行度=1)、ParallelSourceFunction/RichParallelSourceFunction(支持并行),可实现随机数据生成(如订单数据)或外部存储读取(如MySQL,通过JDBC周期性查询,需在open/close中管理连接)。使用时需结合RuntimeExecutionMode(AUTOMATIC/STREAMING/BATCH)适配流批场景,确保数据输入适配计算需求。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。