2021年大数据Flink(十三):流批一体API Sink

举报
Lansonli 发表于 2021/09/28 22:34:15 2021/09/28
【摘要】 目录 Sink 预定义Sink 基于控制台和文件的Sink 自定义Sink MySQL Sink       预定义Sink 基于控制台和文件的Sink API 1.ds.print 直接输出到控制台 2.ds.printToErr() 直接输出到控制台,用红色 3.ds....

目录

Sink

预定义Sink

基于控制台和文件的Sink

自定义Sink

MySQL


Sink

 

 

 

预定义Sink

基于控制台和文件的Sink

  • API

1.ds.print 直接输出到控制台

2.ds.printToErr() 直接输出到控制台,用红色

3.ds.writeAsText("本地/HDFS的path",WriteMode.OVERWRITE).setParallelism(1)

 

  • 注意:

在输出到path的时候,可以在前面设置并行度,如果

并行度>1,则path为目录

并行度=1,则path为文件名

 

  • 代码演示:

  
  1. package cn.itcast.sink;
  2. import org.apache.flink.core.fs.FileSystem;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /**
  6.  * Author itcast
  7.  * Desc
  8.  * 1.ds.print 直接输出到控制台
  9.  * 2.ds.printToErr() 直接输出到控制台,用红色
  10.  * 3.ds.collect 将分布式数据收集为本地集合
  11.  * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
  12.  */
  13. public class SinkDemo01 {
  14.     public static void main(String[] args) throws Exception {
  15.         //1.env
  16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17.         //2.source
  18.         //DataStream<String> ds = env.fromElements("hadoop", "flink");
  19.         DataStream<String> ds = env.readTextFile("data/input/words.txt");
  20.         //3.transformation
  21.         //4.sink
  22.         ds.print();
  23.         ds.printToErr();
  24.         ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
  25.         //注意:
  26.         //Parallelism=1为文件
  27.         //Parallelism>1为文件夹
  28.         //5.execute
  29.         env.execute();
  30.     }
  31. }

自定义Sink

MySQL

  • 需求:

将Flink集合中的数据通过自定义Sink保存到MySQL

 

  • 代码实现:

  
  1. package cn.itcast.sink;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  9. import java.sql.Connection;
  10. import java.sql.DriverManager;
  11. import java.sql.PreparedStatement;
  12. /**
  13.  * Author itcast
  14.  * Desc
  15.  * 使用自定义sink将数据保存到MySQL
  16.  */
  17. public class SinkDemo02_MySQL {
  18.     public static void main(String[] args) throws Exception {
  19.         //1.env
  20.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         //2.Source
  22.         DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
  23.         //3.Transformation
  24.         //4.Sink
  25.         studentDS.addSink(new MySQLSink());
  26.         //5.execute
  27.         env.execute();
  28.     }
  29.     @Data
  30.     @NoArgsConstructor
  31.     @AllArgsConstructor
  32.     public static class Student {
  33.         private Integer id;
  34.         private String name;
  35.         private Integer age;
  36.     }
  37.     public static class MySQLSink extends RichSinkFunction<Student> {
  38.         private Connection conn = null;
  39.         private PreparedStatement ps = null;
  40.         @Override
  41.         public void open(Configuration parameters) throws Exception {
  42.             //加载驱动,开启连接
  43.             //Class.forName("com.mysql.jdbc.Driver");
  44.             conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
  45.             String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
  46.             ps = conn.prepareStatement(sql);
  47.         }
  48.         @Override
  49.         public void invoke(Student value, Context context) throws Exception {
  50.             //给ps中的?设置具体值
  51.             ps.setString(1,value.getName());
  52.             ps.setInt(2,value.getAge());
  53.             //执行sql
  54.             ps.executeUpdate();
  55.         }
  56.         @Override
  57.         public void close() throws Exception {
  58.             if (conn != null) conn.close();
  59.             if (ps != null) ps.close();
  60.         }
  61.     }
  62. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116223019

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。