Flink Connector之Streaming File Sink

举报
kyo 发表于 2020/07/20 11:04:10 2020/07/20
【摘要】 Flink提供了一系列Connector用于从第三方存储介质读取和转储数据。其中StreamingFileSink主要用于流式数据文件存储,可将数据写入LocalFile、HDFS、OBS/S3等对象存储系统。由于流式数据是无限的,最终写入到桶中的数据会将按一定策略进行文件划分。用户可自定义文件和桶划分策略。 桶中文件形态如下图: In-progress :当前文件正在写...

    Flink提供了一系列Connector用于从第三方存储介质读取和转储数据。其中StreamingFileSink主要用于流式数据文件存储,可将数据写入LocalFileHDFSOBS/S3等对象存储系统。由于流式数据是无限的,最终写入到桶中的数据会将按一定策略进行文件划分。用户可自定义文件和桶划分策略。

 桶中文件形态如下图:

 image.png

 

  •      In-progress :当前文件正在写入中

  •      Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

  •      Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

    处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

    重要: 使用 StreamingFileSink 时需要启用 Checkpoint 如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 'pending' 状态,下游系统无法安全地读取。

一、使用示例

DataStream<String> input = ...;
 
final StreamingFileSink<String> sink = StreamingFileSink
// 1)编码格式
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
// 2) 分桶策略
.withBucketAssigner(new KeyBucketAssigner())
// 3) 文件滚动策略
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
         .build();


1、编码格式

支持两种编码格式,行编码格式(Row-encoded)和批量编码(Bulk-encoded)格式。

  • 行编码格式:

    需指定Encoder,如上述示例的SimpleStringEncoder,可通过实现org.apache.flink.api.common.serialization.Encoder自定义编码方式。

  • 批量编码格式:

编码器需指定org.apache.flink.api.common.serialization. BulkWriter.Factory.

内置提供了三种Factory(以1.10.0版本为基准):

      1)ParquetWriterFactory

     <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.11</artifactId>
    <version>1.10.0</version>
   </dependency>

      2)SequenceFileWriterFactory

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sequence-file</artifactId>
      <version>1.10.0</version>
    </dependency>

      3)CompressWriterFactory

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-compress</artifactId>
    <version>1.10.0</version>
    </dependency>

      示例:

    Schema schema = ...;    
    DataStream<GenericRecord> stream = ...;
     
    final StreamingFileSink<GenericRecord> sink = StreamingFileSink
             .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
             .build();
     
    input.addSink(sink);


2、分桶策略

 分桶策略用于定义根目录下的子目录结构,默认使用DateTimeBucketAssigner桶器。 DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd--HH 。日期格式和时区都可以手动配置。

    Flink 有两个内置的 BucketAssigners

  • DateTimeBucketAssigner :默认基于时间的分配器

  • BasePathBucketAssigner :将所有部分文件(part file)存储在根目录下。

 用户可通过实现BucketAssigner定制自己的分桶策略。

 image.png


3、文件滚动策略

  滚动策略 RollingPolicy 定义了文件分片策略,即指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

     Flink 有两个内置的滚动策略:

  •      DefaultRollingPolicy

     文件滚动有三个检查触发点:

     1OnEvent  

     2OnCheckpoint 

     3onProcessingTime,可通过withBucketCheckInterval (默认1分钟)指定触发周期。

    检查时只要符合以下三点任一条件时就会进行文件关闭操作。

    1)当partFile大小超过partSize时

    2)当前时间距partFile创建时间超过rollverInterval

    3)文件未更新时间超过inactivityInterval

 image.png

 注意:该策略以及用户自定义策略都只适用于行编码器,批编码器仅能使用OnCheckpointRollingPolicy策略。


  • OnCheckpointRollingPolicy

      checkpoint的时候就触发文件滚动。


二、流程分析

1)  Open

根据bucketCheckInterval注册timer定时器,以触发onProcessintTime操作。

2)  Invoke

创建或是获取数据对应的bucket对象,调用shouldRollOnEvent判断是否需要进行文件滚动,符合条件即关闭上周期文件,文件变为pending状态,并生成下周期滚动文件(in-process)写入当前数据。

3)  snapshotState

同样先判断是否需要进行滚动,然后记录所有pending状态文件,方便checkpoint完整时将其状态修改为finish。将inprogress文件持久化并记录其offset

4)  notifyCheckpointComplete

pending文件状态转为finish

5)  initializeState

  主要用于故障恢复,将inprocess文件恢复到记录的offset位置,超过offset部分的数据删除,同时将记录的in-pending文件commit并置状态为finish


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。