2021年大数据Flink(十一):流批一体API Source
目录
Source
预定义Source
基于集合的Source
- API
一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.generateSequence(开始,结束);
4.env.fromSequence(开始,结束);
- 代码演示:
-
package cn.itcast.source;
-
-
import org.apache.flink.api.common.RuntimeExecutionMode;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
import java.util.Arrays;
-
-
/**
-
* Author itcast
-
* Desc
-
* 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合!
-
* 一般用于学习测试时编造数据时使用
-
* 1.env.fromElements(可变参数);
-
* 2.env.fromColletion(各种集合);
-
* 3.env.generateSequence(开始,结束);
-
* 4.env.fromSequence(开始,结束);
-
*/
-
public class SourceDemo01 {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
-
//2.source
-
// * 1.env.fromElements(可变参数);
-
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
-
// * 2.env.fromColletion(各种集合);
-
DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
-
// * 3.env.generateSequence(开始,结束);
-
DataStream<Long> ds3 = env.generateSequence(1, 10);
-
//* 4.env.fromSequence(开始,结束);
-
DataStream<Long> ds4 = env.fromSequence(1, 10);
-
//3.Transformation
-
//4.sink
-
ds1.print();
-
ds2.print();
-
ds3.print();
-
ds4.print();
-
//5.execute
-
env.execute();
-
}
-
}
-
基于文件的Source
- API
一般用于学习测试
env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
- 代码演示:
-
package cn.itcast.source;
-
-
import org.apache.flink.api.common.RuntimeExecutionMode;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
/**
-
* Author itcast
-
* Desc
-
* 1.env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
-
*/
-
public class SourceDemo02 {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
-
//2.source
-
// * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
-
DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
-
DataStream<String> ds2 = env.readTextFile("data/input/dir");
-
DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
-
DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
-
//3.Transformation
-
//4.sink
-
ds1.print();
-
ds2.print();
-
ds3.print();
-
ds4.print();
-
//5.execute
-
env.execute();
-
}
-
}
基于Socket的Source
一般用于学习测试
- 需求
1.在node1上使用nc -lk 9999 向指定端口发送数据
nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
如果没有该命令可以下安装
yum install -y nc
2.使用Flink编写流处理应用程序实时统计单词数量
- 代码实现:
-
package cn.itcast.source;
-
-
import org.apache.flink.api.common.RuntimeExecutionMode;
-
import org.apache.flink.api.common.functions.FlatMapFunction;
-
import org.apache.flink.api.common.functions.MapFunction;
-
import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.KeyedStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.util.Collector;
-
-
/**
-
* Author itcast
-
* Desc
-
* SocketSource
-
*/
-
public class SourceDemo03 {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
-
//2.source
-
DataStream<String> linesDS = env.socketTextStream("node1", 9999);
-
-
//3.处理数据-transformation
-
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
-
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
-
@Override
-
public void flatMap(String value, Collector<String> out) throws Exception {
-
//value就是一行行的数据
-
String[] words = value.split(" ");
-
for (String word : words) {
-
out.collect(word);//将切割处理的一个个的单词收集起来并返回
-
}
-
}
-
});
-
//3.2对集合中的每个单词记为1
-
DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
-
@Override
-
public Tuple2<String, Integer> map(String value) throws Exception {
-
//value就是进来一个个的单词
-
return Tuple2.of(value, 1);
-
}
-
});
-
-
//3.3对数据按照单词(key)进行分组
-
//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
-
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
-
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
-
DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
-
-
//4.输出结果-sink
-
result.print();
-
-
//5.触发执行-execute
-
env.execute();
-
}
-
}
自定义Source
随机生成数据
- API
一般用于学习测试,模拟生成一些数据
Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
SourceFunction:非并行数据源(并行度只能=1)
RichSourceFunction:多功能非并行数据源(并行度只能=1)
ParallelSourceFunction:并行数据源(并行度能够>=1)
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
- 需求
每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
要求:
- 随机生成订单ID(UUID)
- 随机生成用户ID(0-2)
- 随机生成订单金额(0-100)
- 时间戳为当前系统时间
- 代码实现
-
package cn.itcast.source;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.RuntimeExecutionMode;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-
import java.util.Random;
-
import java.util.UUID;
-
-
/**
-
* Author itcast
-
* Desc
-
*需求
-
* 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
-
* 要求:
-
* - 随机生成订单ID(UUID)
-
* - 随机生成用户ID(0-2)
-
* - 随机生成订单金额(0-100)
-
* - 时间戳为当前系统时间
-
*
-
* API
-
* 一般用于学习测试,模拟生成一些数据
-
* Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
-
* SourceFunction:非并行数据源(并行度只能=1)
-
* RichSourceFunction:多功能非并行数据源(并行度只能=1)
-
* ParallelSourceFunction:并行数据源(并行度能够>=1)
-
* RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
-
*/
-
public class SourceDemo04_Customer {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
-
//2.Source
-
DataStream<Order> orderDS = env
-
.addSource(new MyOrderSource())
-
.setParallelism(2);
-
-
//3.Transformation
-
-
//4.Sink
-
orderDS.print();
-
//5.execute
-
env.execute();
-
}
-
@Data
-
@NoArgsConstructor
-
@AllArgsConstructor
-
public static class Order {
-
private String id;
-
private Integer userId;
-
private Integer money;
-
private Long createTime;
-
}
-
public static class MyOrderSource extends RichParallelSourceFunction<Order> {
-
private Boolean flag = true;
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (flag){
-
Thread.sleep(1000);
-
String id = UUID.randomUUID().toString();
-
int userId = random.nextInt(3);
-
int money = random.nextInt(101);
-
long createTime = System.currentTimeMillis();
-
ctx.collect(new Order(id,userId,money,createTime));
-
}
-
}
-
//取消任务/执行cancle命令的时候执行
-
@Override
-
public void cancel() {
-
flag = false;
-
}
-
}
-
}
MySQL
- 需求:
实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据
那么现在先完成一个简单的需求:
从MySQL中实时加载数据
要求MySQL中的数据有变化,也能被实时加载出来
- 准备数据
-
CREATE TABLE `t_student` (
-
-
`id` int(11) NOT NULL AUTO_INCREMENT,
-
-
`name` varchar(255) DEFAULT NULL,
-
-
`age` int(11) DEFAULT NULL,
-
-
PRIMARY KEY (`id`)
-
-
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
-
-
-
-
INSERT INTO `t_student` VALUES ('1', 'jack', '18');
-
-
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
-
-
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
-
-
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
-
-
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
-
-
INSERT INTO `t_student` VALUES ('6', 'rose', '20');
- 代码实现:
-
package cn.itcast.source;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.configuration.Configuration;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-
import java.sql.Connection;
-
import java.sql.DriverManager;
-
import java.sql.PreparedStatement;
-
import java.sql.ResultSet;
-
import java.util.concurrent.TimeUnit;
-
-
/**
-
* Author itcast
-
* Desc
-
* 需求:
-
* 实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据
-
* 那么现在先完成一个简单的需求:
-
* 从MySQL中实时加载数据
-
* 要求MySQL中的数据有变化,也能被实时加载出来
-
*/
-
public class SourceDemo05_Customer_MySQL {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
-
-
//3.Transformation
-
//4.Sink
-
studentDS.print();
-
-
//5.execute
-
env.execute();
-
}
-
-
@Data
-
@NoArgsConstructor
-
@AllArgsConstructor
-
public static class Student {
-
private Integer id;
-
private String name;
-
private Integer age;
-
}
-
-
public static class MySQLSource extends RichParallelSourceFunction<Student> {
-
private Connection conn = null;
-
private PreparedStatement ps = null;
-
-
@Override
-
public void open(Configuration parameters) throws Exception {
-
//加载驱动,开启连接
-
//Class.forName("com.mysql.jdbc.Driver");
-
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
-
String sql = "select id,name,age from t_student";
-
ps = conn.prepareStatement(sql);
-
}
-
-
private boolean flag = true;
-
-
@Override
-
public void run(SourceContext<Student> ctx) throws Exception {
-
while (flag) {
-
ResultSet rs = ps.executeQuery();
-
while (rs.next()) {
-
int id = rs.getInt("id");
-
String name = rs.getString("name");
-
int age = rs.getInt("age");
-
ctx.collect(new Student(id, name, age));
-
}
-
TimeUnit.SECONDS.sleep(5);
-
}
-
}
-
@Override
-
public void cancel() {
-
flag = false;
-
}
-
@Override
-
public void close() throws Exception {
-
if (conn != null) conn.close();
-
if (ps != null) ps.close();
-
}
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116209616
- 点赞
- 收藏
- 关注作者
评论(0)