2021年大数据Flink(四十七):扩展阅读  File Sink

举报
Lansonli 发表于 2021/09/27 23:44:06 2021/09/27
【摘要】 目录 扩展阅读  File Sink 介绍 案例演示 扩展阅读  File Sink 介绍 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html &n...

目录

扩展阅读  File Sink

介绍

案例演示


扩展阅读  File Sink

介绍

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

 

新的 Data Sink API (Beta)

之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。

 

这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的 FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也将逐步迁移到新的接口。

Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。

 

​​​​​​​案例演示


  
  1. package cn.lanson.extend;
  2. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
  3. import org.apache.flink.connector.file.sink.FileSink;
  4. import org.apache.flink.core.fs.Path;
  5. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
  9. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
  10. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
  11. import java.util.concurrent.TimeUnit;
  12. /**
  13.  * Author lanson
  14.  * Desc
  15.  */
  16. public class FileSinkDemo {
  17.     public static void main(String[] args) throws Exception {
  18.         //1.env
  19.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20.         env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
  21.         env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
  22.         //2.source
  23.         DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
  24.         //3.sink
  25.         //设置sink的前缀和后缀
  26.         //文件的头和文件扩展名
  27.         //prefix-xxx-.txt
  28.         OutputFileConfig config = OutputFileConfig
  29.                 .builder()
  30.                 .withPartPrefix("prefix")
  31.                 .withPartSuffix(".txt")
  32.                 .build();
  33.         //设置sink的路径
  34.         String outputPath = "hdfs://node1:8020/FlinkFileSink/parquet";
  35.         final FileSink<String> sink = FileSink
  36.                 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
  37.                 .withBucketAssigner(new DateTimeBucketAssigner<>())
  38.                 .withRollingPolicy(
  39.                         DefaultRollingPolicy.builder()
  40.                                 .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  41.                                 .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  42.                                 .withMaxPartSize(1024 * 1024 * 1024)
  43.                                 .build())
  44.                 .withOutputFileConfig(config)
  45.                 .build();
  46.         lines.sinkTo(sink).setParallelism(1);
  47.         env.execute();
  48.     }
  49. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116948121

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。