2021年大数据Flink(十一):流批一体API Source

举报
Lansonli 发表于 2021/09/29 01:14:13 2021/09/29
【摘要】 目录 Source 预定义Source 基于集合的Source 基于文件的Source ​​​​​​​基于Socket的Source 自定义Source 随机生成数据 ​​​​​​​MySQL Source     预定义Source 基于集合的Source API 一般用于学习测试...

目录

Source

预定义Source

基于集合的Source

基于文件的Source

​​​​​​​基于Socket的Source

自定义Source

随机生成数据

​​​​​​​MySQL


Source

 

 

预定义Source

基于集合的Source

  • API

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.generateSequence(开始,结束);

4.env.fromSequence(开始,结束);

 

  • 代码演示:

  
  1. package cn.itcast.source;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import java.util.Arrays;
  6. /**
  7.  * Author itcast
  8.  * Desc
  9.  * 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合!
  10.  * 一般用于学习测试时编造数据时使用
  11.  * 1.env.fromElements(可变参数);
  12.  * 2.env.fromColletion(各种集合);
  13.  * 3.env.generateSequence(开始,结束);
  14.  * 4.env.fromSequence(开始,结束);
  15.  */
  16. public class SourceDemo01 {
  17.     public static void main(String[] args) throws Exception {
  18.         //1.env
  19.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  21.         //2.source
  22.         // * 1.env.fromElements(可变参数);
  23.         DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
  24.         // * 2.env.fromColletion(各种集合);
  25.         DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
  26.         // * 3.env.generateSequence(开始,结束);
  27.         DataStream<Long> ds3 = env.generateSequence(1, 10);
  28.         //* 4.env.fromSequence(开始,结束);
  29.         DataStream<Long> ds4 = env.fromSequence(1, 10);
  30.         //3.Transformation
  31.         //4.sink
  32.         ds1.print();
  33.         ds2.print();
  34.         ds3.print();
  35.         ds4.print();
  36.         //5.execute
  37.         env.execute();
  38.     }
  39. }

 

​​​​​​​基于文件的Source

  • API

一般用于学习测试

env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以

 

  • 代码演示:

  
  1. package cn.itcast.source;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /**
  6.  * Author itcast
  7.  * Desc
  8.  * 1.env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
  9.  */
  10. public class SourceDemo02 {
  11.     public static void main(String[] args) throws Exception {
  12.         //1.env
  13.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  15.         //2.source
  16.         // * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
  17.         DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
  18.         DataStream<String> ds2 = env.readTextFile("data/input/dir");
  19.         DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
  20.         DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
  21.         //3.Transformation
  22.         //4.sink
  23.         ds1.print();
  24.         ds2.print();
  25.         ds3.print();
  26.         ds4.print();
  27.         //5.execute
  28.         env.execute();
  29.     }
  30. }

 

​​​​​​​基于Socket的Source

一般用于学习测试

  • 需求

1.在node1上使用nc -lk 9999 向指定端口发送数据

nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

如果没有该命令可以下安装

yum install -y nc
 

 

2.使用Flink编写流处理应用程序实时统计单词数量

 

  • 代码实现:

  
  1. package cn.itcast.source;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.util.Collector;
  10. /**
  11.  * Author itcast
  12.  * Desc
  13.  * SocketSource
  14.  */
  15. public class SourceDemo03 {
  16.     public static void main(String[] args) throws Exception {
  17.         //1.env
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.         //2.source
  21.         DataStream<String> linesDS = env.socketTextStream("node1", 9999);
  22.         //3.处理数据-transformation
  23.         //3.1每一行数据按照空格切分成一个个的单词组成一个集合
  24.         DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
  25.             @Override
  26.             public void flatMap(String value, Collector<String> out) throws Exception {
  27.                 //value就是一行行的数据
  28.                 String[] words = value.split(" ");
  29.                 for (String word : words) {
  30.                     out.collect(word);//将切割处理的一个个的单词收集起来并返回
  31.                 }
  32.             }
  33.         });
  34.         //3.2对集合中的每个单词记为1
  35.         DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  36.             @Override
  37.             public Tuple2<String, Integer> map(String value) throws Exception {
  38.                 //value就是进来一个个的单词
  39.                 return Tuple2.of(value, 1);
  40.             }
  41.         });
  42.         //3.3对数据按照单词(key)进行分组
  43.         //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
  44.         KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
  45.         //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
  46.         DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
  47.         //4.输出结果-sink
  48.         result.print();
  49.         //5.触发执行-execute
  50.         env.execute();
  51.     }
  52. }

 

自定义Source

随机生成数据

  • API

一般用于学习测试,模拟生成一些数据

Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

SourceFunction:非并行数据源(并行度只能=1)

RichSourceFunction:多功能非并行数据源(并行度只能=1)

ParallelSourceFunction:并行数据源(并行度能够>=1)

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口

  • 需求

每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

要求:

- 随机生成订单ID(UUID)

- 随机生成用户ID(0-2)

- 随机生成订单金额(0-100)

- 时间戳为当前系统时间

 

  • 代码实现

  
  1. package cn.itcast.source;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.RuntimeExecutionMode;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  9. import java.util.Random;
  10. import java.util.UUID;
  11. /**
  12.  * Author itcast
  13.  * Desc
  14.  *需求
  15.  * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
  16.  * 要求:
  17.  * - 随机生成订单ID(UUID)
  18.  * - 随机生成用户ID(0-2)
  19.  * - 随机生成订单金额(0-100)
  20.  * - 时间戳为当前系统时间
  21.  *
  22.  * API
  23.  * 一般用于学习测试,模拟生成一些数据
  24.  * Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
  25.  * SourceFunction:非并行数据源(并行度只能=1)
  26.  * RichSourceFunction:多功能非并行数据源(并行度只能=1)
  27.  * ParallelSourceFunction:并行数据源(并行度能够>=1)
  28.  * RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
  29.  */
  30. public class SourceDemo04_Customer {
  31.     public static void main(String[] args) throws Exception {
  32.         //1.env
  33.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  35.         //2.Source
  36.         DataStream<Order> orderDS = env
  37.                 .addSource(new MyOrderSource())
  38.                 .setParallelism(2);
  39.         //3.Transformation
  40.         //4.Sink
  41.         orderDS.print();
  42.         //5.execute
  43.         env.execute();
  44.     }
  45.     @Data
  46.     @NoArgsConstructor
  47.     @AllArgsConstructor
  48.     public static class Order {
  49.         private String id;
  50.         private Integer userId;
  51.         private Integer money;
  52.         private Long createTime;
  53.     }
  54.     public static class MyOrderSource extends RichParallelSourceFunction<Order> {
  55.         private Boolean flag = true;
  56.         @Override
  57.         public void run(SourceContext<Order> ctx) throws Exception {
  58.             Random random = new Random();
  59.             while (flag){
  60.                 Thread.sleep(1000);
  61.                 String id = UUID.randomUUID().toString();
  62.                 int userId = random.nextInt(3);
  63.                 int money = random.nextInt(101);
  64.                 long createTime = System.currentTimeMillis();
  65.                 ctx.collect(new Order(id,userId,money,createTime));
  66.             }
  67.         }
  68.         //取消任务/执行cancle命令的时候执行
  69.         @Override
  70.         public void cancel() {
  71.             flag = false;
  72.         }
  73.     }
  74. }

 

​​​​​​​MySQL

  • 需求:

实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据

那么现在先完成一个简单的需求:

从MySQL中实时加载数据

要求MySQL中的数据有变化,也能被实时加载出来

 

  • 准备数据

  
  1. CREATE TABLE `t_student` (
  2.     `id` int(11) NOT NULL AUTO_INCREMENT,
  3.     `name` varchar(255) DEFAULT NULL,
  4.     `age` int(11) DEFAULT NULL,
  5.     PRIMARY KEY (`id`)
  6. ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
  7. INSERT INTO `t_student` VALUES ('1', 'jack', '18');
  8. INSERT INTO `t_student` VALUES ('2', 'tom', '19');
  9. INSERT INTO `t_student` VALUES ('3', 'rose', '20');
  10. INSERT INTO `t_student` VALUES ('4', 'tom', '19');
  11. INSERT INTO `t_student` VALUES ('5', 'jack', '18');
  12. INSERT INTO `t_student` VALUES ('6', 'rose', '20');

 

  • 代码实现:

  
  1. package cn.itcast.source;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  9. import java.sql.Connection;
  10. import java.sql.DriverManager;
  11. import java.sql.PreparedStatement;
  12. import java.sql.ResultSet;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15.  * Author itcast
  16.  * Desc
  17.  * 需求:
  18.  * 实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据
  19.  * 那么现在先完成一个简单的需求:
  20.  * 从MySQL中实时加载数据
  21.  * 要求MySQL中的数据有变化,也能被实时加载出来
  22.  */
  23. public class SourceDemo05_Customer_MySQL {
  24.     public static void main(String[] args) throws Exception {
  25.         //1.env
  26.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  27.         //2.Source
  28.         DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
  29.         //3.Transformation
  30.         //4.Sink
  31.         studentDS.print();
  32.         //5.execute
  33.         env.execute();
  34.     }
  35.     @Data
  36.     @NoArgsConstructor
  37.     @AllArgsConstructor
  38.     public static class Student {
  39.         private Integer id;
  40.         private String name;
  41.         private Integer age;
  42.     }
  43.     public static class MySQLSource extends RichParallelSourceFunction<Student> {
  44.         private Connection conn = null;
  45.         private PreparedStatement ps = null;
  46.         @Override
  47.         public void open(Configuration parameters) throws Exception {
  48.             //加载驱动,开启连接
  49.             //Class.forName("com.mysql.jdbc.Driver");
  50.             conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
  51.             String sql = "select id,name,age from t_student";
  52.             ps = conn.prepareStatement(sql);
  53.         }
  54.         private boolean flag = true;
  55.         @Override
  56.         public void run(SourceContext<Student> ctx) throws Exception {
  57.             while (flag) {
  58.                 ResultSet rs = ps.executeQuery();
  59.                 while (rs.next()) {
  60.                     int id = rs.getInt("id");
  61.                     String name = rs.getString("name");
  62.                     int age = rs.getInt("age");
  63.                     ctx.collect(new Student(id, name, age));
  64.                 }
  65.                 TimeUnit.SECONDS.sleep(5);
  66.             }
  67.         }
  68.         @Override
  69.         public void cancel() {
  70.             flag = false;
  71.         }
  72.         @Override
  73.         public void close() throws Exception {
  74.             if (conn != null) conn.close();
  75.             if (ps != null) ps.close();
  76.         }
  77.     }
  78. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116209616

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。