【数据湖探索】FLINK 自定义jar作业配置checkppoint功能

举报
芳萌萌芳 发表于 2020/12/31 10:40:29 2020/12/31
【摘要】 ## Flink 自定义作业配置checkppoint功能在Flink 自定义作业 Jar包代码中加入如下代码:  如下代码含义是以EXACTLY_ONCE模式,每隔40s 保存checkpoint 到obs的${bucket}桶中的jobs/checkpoint/my_jar路径。    其中最主要是的保存checkpoint路径,一般是将checkpoint存入obs桶中,路径格式如下:...

一、Flink 自定义作业配置checkppoint功能

在Flink 自定义作业 Jar包代码中加入如下代码:  
如下代码含义是以EXACTLY_ONCE模式,每隔40s 保存checkpoint 到obs的${bucket}桶中的jobs/checkpoint/my_jar路径。    
其中最主要是的保存checkpoint路径,一般是将checkpoint存入obs桶中,路径格式如下:    
路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx  
示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3  

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointInterval(40000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false);
        rocksDbBackend.setOptions(new OptionsFactory() {
            @Override
            public DBOptions createDBOptions(DBOptions currentOptions) {
                return currentOptions
                        .setMaxLogFileSize(64 * 1024 * 1024)
                        .setKeepLogFileNum(3);
            }

            @Override
            public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
                return currentOptions;
            }
        });
        env.setStateBackend(rocksDbBackend);


在DLI Flink Jar作业中配置【从checkpoint恢复】功能

控制台操作:

在自定义Jar作业【编辑页面】,先勾选【异常自动重启】,再勾选【从checkpoint恢复】,再填写【Checkpoint路径】。  
Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:  
【Checkpoint路径】格式为:${bucket}/xxx/xxx/xxx  
示例:      
若Jar包中代码配置为:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3   
则【Checkpoint路径】填写为: mybucket/jobs/checkpoint/jar-3

注意点:  
1、每个Flink Jar作业配置的Checkpoint路径不要一致,不然无法从准确的checkpoint路径恢复。  
2、checkpoint路径中的obs桶需要给DLI授权,DLI服务才能访问此桶下的文件  


二、查看作业是否从checkpoint恢复

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。