Flink Connector之Streaming File Sink
Flink提供了一系列Connector用于从第三方存储介质读取和转储数据。其中StreamingFileSink主要用于流式数据文件存储,可将数据写入LocalFile、HDFS、OBS/S3等对象存储系统。由于流式数据是无限的,最终写入到桶中的数据会将按一定策略进行文件划分。用户可自定义文件和桶划分策略。
桶中文件形态如下图:
In-progress :当前文件正在写入中
Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。
一、使用示例
DataStream<String> input = ...; final StreamingFileSink<String> sink = StreamingFileSink // 1)编码格式 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) // 2) 分桶策略 .withBucketAssigner(new KeyBucketAssigner()) // 3) 文件滚动策略 .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build();
1、编码格式
支持两种编码格式,行编码格式(Row-encoded)和批量编码(Bulk-encoded)格式。
行编码格式:
需指定Encoder,如上述示例的SimpleStringEncoder,可通过实现org.apache.flink.api.common.serialization.Encoder自定义编码方式。
批量编码格式:
编码器需指定org.apache.flink.api.common.serialization. BulkWriter.Factory.
内置提供了三种Factory(以1.10.0版本为基准):
1)ParquetWriterFactory
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.11</artifactId> <version>1.10.0</version> </dependency>
2)SequenceFileWriterFactory
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sequence-file</artifactId> <version>1.10.0</version> </dependency>
3)CompressWriterFactory
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-compress</artifactId> <version>1.10.0</version> </dependency>
示例:
Schema schema = ...; DataStream<GenericRecord> stream = ...; final StreamingFileSink<GenericRecord> sink = StreamingFileSink .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema)) .build(); input.addSink(sink);
2、分桶策略
分桶策略用于定义根目录下的子目录结构,默认使用DateTimeBucketAssigner分桶器。 DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd--HH 。日期格式和时区都可以手动配置。
Flink 有两个内置的 BucketAssigners :
DateTimeBucketAssigner :默认基于时间的分配器
BasePathBucketAssigner :将所有部分文件(part file)存储在根目录下。
用户可通过实现BucketAssigner定制自己的分桶策略。
3、文件滚动策略
滚动策略 RollingPolicy 定义了文件分片策略,即指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
Flink 有两个内置的滚动策略:
DefaultRollingPolicy
文件滚动有三个检查触发点:
1)OnEvent
2)OnCheckpoint
3)onProcessingTime,可通过withBucketCheckInterval (默认1分钟)指定触发周期。
检查时只要符合以下三点任一条件时就会进行文件关闭操作。
1)当partFile大小超过partSize时
2)当前时间距partFile创建时间超过rollverInterval
3)文件未更新时间超过inactivityInterval
注意:该策略以及用户自定义策略都只适用于行编码器,批编码器仅能使用OnCheckpointRollingPolicy策略。
OnCheckpointRollingPolicy
checkpoint的时候就触发文件滚动。
二、流程分析
1) Open
根据bucketCheckInterval注册timer定时器,以触发onProcessintTime操作。
2) Invoke
创建或是获取数据对应的bucket对象,调用shouldRollOnEvent判断是否需要进行文件滚动,符合条件即关闭上周期文件,文件变为pending状态,并生成下周期滚动文件(in-process)写入当前数据。
3) snapshotState
同样先判断是否需要进行滚动,然后记录所有pending状态文件,方便checkpoint完整时将其状态修改为finish。将inprogress文件持久化并记录其offset。
4) notifyCheckpointComplete
将pending文件状态转为finish。
5) initializeState
主要用于故障恢复,将inprocess文件恢复到记录的offset位置,超过offset部分的数据删除,同时将记录的in-pending文件commit并置状态为finish。
- 点赞
- 收藏
- 关注作者
评论(0)