Flink综合案例(九)

举报
Maynor学长 发表于 2022/07/24 11:52:53 2022/07/24
【摘要】 Flink综合案例(九) 今日目标Flink FileSink 落地写入到 HDFSFlinkSQL 整合 Hive数据仓库订单自动好评综合案例 Flink FileSink 落地写入到 HDFS常用的文件存储格式TextFilecsvrcFileparquetorcsequenceFile支持流批一体的写入到 HDFSFile Sink 需求将流数据写入到 HDFSpackage cn....

Flink综合案例(九)

今日目标

  • Flink FileSink 落地写入到 HDFS
  • FlinkSQL 整合 Hive数据仓库
  • 订单自动好评综合案例

Flink FileSink 落地写入到 HDFS

  • 常用的文件存储格式

    TextFile

    csv

    rcFile

    parquet

    orc

    sequenceFile

  • 支持流批一体的写入到 HDFS

  • File Sink 需求

    将流数据写入到 HDFS

    package cn.itcast.flink.filesink;
    
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * Author itcast
     * Date 2021/6/24 10:52
     * Desc TODO
     */
    public class FileSinkDemo {
        public static void main(String[] args) throws Exception {
            //1.初始化流计算运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径
            // Sink保证仅一次语义使用 checkpoint 和 二段提交
            env.enableCheckpointing(10000);
            env.setStateBackend(new FsStateBackend("file:///d:/chk/"));
    
            //4.接入socket数据源,获取数据
            DataStreamSource<String> source = env.socketTextStream("node1", 9999);
            //5.创建Streamingfilesink对象
            OutputFileConfig config = OutputFileConfig
                    .builder()
                    .withPartPrefix("coo")
                    .withPartSuffix(".txt")
                    .build();
            //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet
            FileSink sink = FileSink
                    .forRowFormat(
                    new Path("hdfs://node1:8020/FileSink/parquet"),
                    new SimpleStringEncoder<String>("UTF-8"))
                    // sink-kafka new FlinkKafkaProducer
                    //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
                    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH--mm"))
                    //withRollingPolicy -> 默认滚筒策略
                    .withRollingPolicy(DefaultRollingPolicy.builder()
                            .withMaxPartSize(64 * 1024 * 1024)
                            .withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
                            .withInactivityInterval(TimeUnit.SECONDS.toMillis(5))
                            .build())
            //withOutputFileConfig -> 输出文件的配置
                    .withOutputFileConfig(config)
                    .build();
            //6.设置输出 sink
            source.print();
            source.sinkTo(sink).setParallelism(1);
            //source.addSink(sink).setParallelism(1);
            //7.执行任务
            env.execute();
        }
    }
    
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。