Flink综合案例(九)
【摘要】 Flink综合案例(九) 今日目标Flink FileSink 落地写入到 HDFSFlinkSQL 整合 Hive数据仓库订单自动好评综合案例 Flink FileSink 落地写入到 HDFS常用的文件存储格式TextFilecsvrcFileparquetorcsequenceFile支持流批一体的写入到 HDFSFile Sink 需求将流数据写入到 HDFSpackage cn....
Flink综合案例(九)
今日目标
- Flink FileSink 落地写入到 HDFS
- FlinkSQL 整合 Hive数据仓库
- 订单自动好评综合案例
Flink FileSink 落地写入到 HDFS
-
常用的文件存储格式
TextFile
csv
rcFile
parquet
orc
sequenceFile
-
支持流批一体的写入到 HDFS
-
File Sink 需求
将流数据写入到 HDFS
package cn.itcast.flink.filesink; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */ public class FileSinkDemo { public static void main(String[] args) throws Exception { //1.初始化流计算运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径 // Sink保证仅一次语义使用 checkpoint 和 二段提交 env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4.接入socket数据源,获取数据 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5.创建Streamingfilesink对象 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("coo") .withPartSuffix(".txt") .build(); //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet FileSink sink = FileSink .forRowFormat( new Path("hdfs://node1:8020/FileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH--mm")) //withRollingPolicy -> 默认滚筒策略 .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(64 * 1024 * 1024) .withRolloverInterval(TimeUnit.SECONDS.toMillis(10)) .withInactivityInterval(TimeUnit.SECONDS.toMillis(5)) .build()) //withOutputFileConfig -> 输出文件的配置 .withOutputFileConfig(config) .build(); //6.设置输出 sink source.print(); source.sinkTo(sink).setParallelism(1); //source.addSink(sink).setParallelism(1); //7.执行任务 env.execute(); } }
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)