Flink Connector之Streaming File Sink
【摘要】 Flink提供了一系列Connector用于从第三方存储介质读取和转储数据。其中StreamingFileSink主要用于流式数据文件存储,可将数据写入LocalFile、HDFS、OBS/S3等对象存储系统。由于流式数据是无限的,最终写入到桶中的数据会将按一定策略进行文件划分。用户可自定义文件和桶划分策略。 桶中文件形态如下图: In-progress :当前文件正在写...
Flink提供了一系列Connector用于从第三方存储介质读取和转储数据。其中StreamingFileSink主要用于流式数据文件存储,可将数据写入LocalFile、HDFS、OBS/S3等对象存储系统。由于流式数据是无限的,最终写入到桶中的数据会将按一定策略进行文件划分。用户可自定义文件和桶划分策略。
桶中文件形态如下图:
In-progress :当前文件正在写入中
Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。
一、使用示例
DataStream<String> input = ...;
final StreamingFileSink<String> sink = StreamingFileSink
// 1)编码格式
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
// 2) 分桶策略
.withBucketAssigner(new KeyBucketAssigner())
// 3) 文件滚动策略
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
1、编码格式
支持两种编码格式,行编码格式(Row-encoded)和批量编码(Bulk-encoded)格式。
行编码格式:
需指定Encoder,如上述示例的SimpleStringEncoder,可通过实现org.apache.flink.api.common.serialization.Encoder自定义编码方式。
批量编码格式:
编码器需指定org.apache.flink.api.common.serialization. BulkWriter.Factory.
内置提供了三种Factory(以1.10.0版本为基准):
1)ParquetWriterFactory
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2)SequenceFileWriterFactory
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sequence-file</artifactId>
<version>1.10.0</version>
</dependency>
3)CompressWriterFactory
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-compress</artifactId>
<version>1.10.0</version>
</dependency>
示例:
Schema schema = ...;
DataStream<GenericRecord> stream = ...;
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
.build();
input.addSink(sink);
2、分桶策略
分桶策略用于定义根目录下的子目录结构,默认使用DateTimeBucketAssigner分桶器。 DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd--HH 。日期格式和时区都可以手动配置。
Flink 有两个内置的 BucketAssigners :
DateTimeBucketAssigner :默认基于时间的分配器
BasePathBucketAssigner :将所有部分文件(part file)存储在根目录下。
用户可通过实现BucketAssigner定制自己的分桶策略。
3、文件滚动策略
滚动策略 RollingPolicy 定义了文件分片策略,即指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
Flink 有两个内置的滚动策略:
DefaultRollingPolicy
文件滚动有三个检查触发点:
1)OnEvent
2)OnCheckpoint
3)onProcessingTime,可通过withBucketCheckInterval (默认1分钟)指定触发周期。
检查时只要符合以下三点任一条件时就会进行文件关闭操作。
1)当partFile大小超过partSize时
2)当前时间距partFile创建时间超过rollverInterval
3)文件未更新时间超过inactivityInterval
注意:该策略以及用户自定义策略都只适用于行编码器,批编码器仅能使用OnCheckpointRollingPolicy策略。
OnCheckpointRollingPolicy
checkpoint的时候就触发文件滚动。
二、流程分析
1) Open
根据bucketCheckInterval注册timer定时器,以触发onProcessintTime操作。
2) Invoke
创建或是获取数据对应的bucket对象,调用shouldRollOnEvent判断是否需要进行文件滚动,符合条件即关闭上周期文件,文件变为pending状态,并生成下周期滚动文件(in-process)写入当前数据。
3) snapshotState
同样先判断是否需要进行滚动,然后记录所有pending状态文件,方便checkpoint完整时将其状态修改为finish。将inprogress文件持久化并记录其offset。
4) notifyCheckpointComplete
将pending文件状态转为finish。
5) initializeState
主要用于故障恢复,将inprocess文件恢复到记录的offset位置,超过offset部分的数据删除,同时将记录的in-pending文件commit并置状态为finish。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)