2021年大数据Flink(十一):流批一体API Source

举报
Lansonli 发表于 2021/09/29 01:14:13 2021/09/29
【摘要】 目录 Source 预定义Source 基于集合的Source 基于文件的Source ​​​​​​​基于Socket的Source 自定义Source 随机生成数据 ​​​​​​​MySQL Source     预定义Source 基于集合的Source API 一般用于学习测试...

目录

Source

预定义Source

基于集合的Source

基于文件的Source

​​​​​​​基于Socket的Source

自定义Source

随机生成数据

​​​​​​​MySQL


Source

 

 

预定义Source

基于集合的Source

  • API

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.generateSequence(开始,结束);

4.env.fromSequence(开始,结束);

 

  • 代码演示:

      package cn.itcast.source;
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import java.util.Arrays;
      /**
       * Author itcast
       * Desc
       * 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合!
       * 一般用于学习测试时编造数据时使用
       * 1.env.fromElements(可变参数);
       * 2.env.fromColletion(各种集合);
       * 3.env.generateSequence(开始,结束);
       * 4.env.fromSequence(开始,结束);
       */
      public class SourceDemo01 {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
              //2.source
              // * 1.env.fromElements(可变参数);
              DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
              // * 2.env.fromColletion(各种集合);
              DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
              // * 3.env.generateSequence(开始,结束);
              DataStream<Long> ds3 = env.generateSequence(1, 10);
              //* 4.env.fromSequence(开始,结束);
              DataStream<Long> ds4 = env.fromSequence(1, 10);
              //3.Transformation
              //4.sink
              ds1.print();
              ds2.print();
              ds3.print();
              ds4.print();
              //5.execute
              env.execute();
          }
      }
  
 

 

​​​​​​​基于文件的Source

  • API

一般用于学习测试

env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以

 

  • 代码演示:

      package cn.itcast.source;
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      /**
       * Author itcast
       * Desc
       * 1.env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
       */
      public class SourceDemo02 {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
              //2.source
              // * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
              DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
              DataStream<String> ds2 = env.readTextFile("data/input/dir");
              DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
              DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
              //3.Transformation
              //4.sink
              ds1.print();
              ds2.print();
              ds3.print();
              ds4.print();
              //5.execute
              env.execute();
          }
      }
  
 

 

​​​​​​​基于Socket的Source

一般用于学习测试

  • 需求

1.在node1上使用nc -lk 9999 向指定端口发送数据

nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

如果没有该命令可以下安装

yum install -y nc
 

 

2.使用Flink编写流处理应用程序实时统计单词数量

 

  • 代码实现:

      package cn.itcast.source;
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.api.common.functions.FlatMapFunction;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.KeyedStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.util.Collector;
      /**
       * Author itcast
       * Desc
       * SocketSource
       */
      public class SourceDemo03 {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
              //2.source
              DataStream<String> linesDS = env.socketTextStream("node1", 9999);
              //3.处理数据-transformation
              //3.1每一行数据按照空格切分成一个个的单词组成一个集合
              DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
                  @Override
                  public void flatMap(String value, Collector<String> out) throws Exception {
                      //value就是一行行的数据
                      String[] words = value.split(" ");
                      for (String word : words) {
                          out.collect(word);//将切割处理的一个个的单词收集起来并返回
                      }
                  }
              });
              //3.2对集合中的每个单词记为1
              DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String value) throws Exception {
                      //value就是进来一个个的单词
                      return Tuple2.of(value, 1);
                  }
              });
              //3.3对数据按照单词(key)进行分组
              //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
              KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
              //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
              DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
              //4.输出结果-sink
              result.print();
              //5.触发执行-execute
              env.execute();
          }
      }
  
 

 

自定义Source

随机生成数据

  • API

一般用于学习测试,模拟生成一些数据

Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

SourceFunction:非并行数据源(并行度只能=1)

RichSourceFunction:多功能非并行数据源(并行度只能=1)

ParallelSourceFunction:并行数据源(并行度能够>=1)

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口

  • 需求

每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

要求:

- 随机生成订单ID(UUID)

- 随机生成用户ID(0-2)

- 随机生成订单金额(0-100)

- 时间戳为当前系统时间

 

  • 代码实现

      package cn.itcast.source;
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
      import java.util.Random;
      import java.util.UUID;
      /**
       * Author itcast
       * Desc
       *需求
       * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
       * 要求:
       * - 随机生成订单ID(UUID)
       * - 随机生成用户ID(0-2)
       * - 随机生成订单金额(0-100)
       * - 时间戳为当前系统时间
       *
       * API
       * 一般用于学习测试,模拟生成一些数据
       * Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
       * SourceFunction:非并行数据源(并行度只能=1)
       * RichSourceFunction:多功能非并行数据源(并行度只能=1)
       * ParallelSourceFunction:并行数据源(并行度能够>=1)
       * RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
       */
      public class SourceDemo04_Customer {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
              //2.Source
              DataStream<Order> orderDS = env
                      .addSource(new MyOrderSource())
                      .setParallelism(2);
              //3.Transformation
              //4.Sink
              orderDS.print();
              //5.execute
              env.execute();
          }
          @Data
          @NoArgsConstructor
          @AllArgsConstructor
          public static class Order {
              private String id;
              private Integer userId;
              private Integer money;
              private Long createTime;
          }
          public static class MyOrderSource extends RichParallelSourceFunction<Order> {
              private Boolean flag = true;
              @Override
              public void run(SourceContext<Order> ctx) throws Exception {
                  Random random = new Random();
                  while (flag){
                      Thread.sleep(1000);
                      String id = UUID.randomUUID().toString();
                      int userId = random.nextInt(3);
                      int money = random.nextInt(101);
                      long createTime = System.currentTimeMillis();
                      ctx.collect(new Order(id,userId,money,createTime));
                  }
              }
              //取消任务/执行cancle命令的时候执行
              @Override
              public void cancel() {
                  flag = false;
              }
          }
      }
  
 

 

​​​​​​​MySQL

  • 需求:

实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据

那么现在先完成一个简单的需求:

从MySQL中实时加载数据

要求MySQL中的数据有变化,也能被实时加载出来

 

  • 准备数据

      CREATE TABLE `t_student` (
          `id` int(11) NOT NULL AUTO_INCREMENT,
          `name` varchar(255) DEFAULT NULL,
          `age` int(11) DEFAULT NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
      INSERT INTO `t_student` VALUES ('1', 'jack', '18');
      INSERT INTO `t_student` VALUES ('2', 'tom', '19');
      INSERT INTO `t_student` VALUES ('3', 'rose', '20');
      INSERT INTO `t_student` VALUES ('4', 'tom', '19');
      INSERT INTO `t_student` VALUES ('5', 'jack', '18');
      INSERT INTO `t_student` VALUES ('6', 'rose', '20');
  
 

 

  • 代码实现:

      package cn.itcast.source;
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
      import java.sql.Connection;
      import java.sql.DriverManager;
      import java.sql.PreparedStatement;
      import java.sql.ResultSet;
      import java.util.concurrent.TimeUnit;
      /**
       * Author itcast
       * Desc
       * 需求:
       * 实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据
       * 那么现在先完成一个简单的需求:
       * 从MySQL中实时加载数据
       * 要求MySQL中的数据有变化,也能被实时加载出来
       */
      public class SourceDemo05_Customer_MySQL {
          public static void main(String[] args) throws Exception {
              //1.env
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //2.Source
              DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
              //3.Transformation
              //4.Sink
              studentDS.print();
              //5.execute
              env.execute();
          }
          @Data
          @NoArgsConstructor
          @AllArgsConstructor
          public static class Student {
              private Integer id;
              private String name;
              private Integer age;
          }
          public static class MySQLSource extends RichParallelSourceFunction<Student> {
              private Connection conn = null;
              private PreparedStatement ps = null;
              @Override
              public void open(Configuration parameters) throws Exception {
                  //加载驱动,开启连接
                  //Class.forName("com.mysql.jdbc.Driver");
                  conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
                  String sql = "select id,name,age from t_student";
                  ps = conn.prepareStatement(sql);
              }
              private boolean flag = true;
              @Override
              public void run(SourceContext<Student> ctx) throws Exception {
                  while (flag) {
                      ResultSet rs = ps.executeQuery();
                      while (rs.next()) {
                          int id = rs.getInt("id");
                          String name = rs.getString("name");
                          int age = rs.getInt("age");
                          ctx.collect(new Student(id, name, age));
                      }
                      TimeUnit.SECONDS.sleep(5);
                  }
              }
              @Override
              public void cancel() {
                  flag = false;
              }
              @Override
              public void close() throws Exception {
                  if (conn != null) conn.close();
                  if (ps != null) ps.close();
              }
          }
      }
  
 

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116209616

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。