2021年大数据Flink(十三):流批一体API Sink
【摘要】
目录
Sink
预定义Sink
基于控制台和文件的Sink
自定义Sink
MySQL
Sink
预定义Sink
基于控制台和文件的Sink
API
1.ds.print 直接输出到控制台
2.ds.printToErr() 直接输出到控制台,用红色
3.ds....
目录
Sink
预定义Sink
基于控制台和文件的Sink
- API
1.ds.print 直接输出到控制台
2.ds.printToErr() 直接输出到控制台,用红色
3.ds.writeAsText("本地/HDFS的path",WriteMode.OVERWRITE).setParallelism(1)
- 注意:
在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名
- 代码演示:
-
package cn.itcast.sink;
-
-
import org.apache.flink.core.fs.FileSystem;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-
/**
-
* Author itcast
-
* Desc
-
* 1.ds.print 直接输出到控制台
-
* 2.ds.printToErr() 直接输出到控制台,用红色
-
* 3.ds.collect 将分布式数据收集为本地集合
-
* 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
-
*/
-
public class SinkDemo01 {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-
//2.source
-
//DataStream<String> ds = env.fromElements("hadoop", "flink");
-
DataStream<String> ds = env.readTextFile("data/input/words.txt");
-
-
//3.transformation
-
//4.sink
-
ds.print();
-
ds.printToErr();
-
ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
-
//注意:
-
//Parallelism=1为文件
-
//Parallelism>1为文件夹
-
-
//5.execute
-
env.execute();
-
}
-
}
自定义Sink
MySQL
- 需求:
将Flink集合中的数据通过自定义Sink保存到MySQL
- 代码实现:
-
package cn.itcast.sink;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.configuration.Configuration;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-
import java.sql.Connection;
-
import java.sql.DriverManager;
-
import java.sql.PreparedStatement;
-
-
/**
-
* Author itcast
-
* Desc
-
* 使用自定义sink将数据保存到MySQL
-
*/
-
public class SinkDemo02_MySQL {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
-
//3.Transformation
-
//4.Sink
-
studentDS.addSink(new MySQLSink());
-
-
//5.execute
-
env.execute();
-
}
-
@Data
-
@NoArgsConstructor
-
@AllArgsConstructor
-
public static class Student {
-
private Integer id;
-
private String name;
-
private Integer age;
-
}
-
-
public static class MySQLSink extends RichSinkFunction<Student> {
-
private Connection conn = null;
-
private PreparedStatement ps = null;
-
-
@Override
-
public void open(Configuration parameters) throws Exception {
-
//加载驱动,开启连接
-
//Class.forName("com.mysql.jdbc.Driver");
-
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
-
String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
-
ps = conn.prepareStatement(sql);
-
}
-
-
@Override
-
public void invoke(Student value, Context context) throws Exception {
-
//给ps中的?设置具体值
-
ps.setString(1,value.getName());
-
ps.setInt(2,value.getAge());
-
//执行sql
-
ps.executeUpdate();
-
}
-
-
@Override
-
public void close() throws Exception {
-
if (conn != null) conn.close();
-
if (ps != null) ps.close();
-
}
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116223019
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)