【数据湖探索】FLINK 自定义jar作业配置checkppoint功能
一、Flink 自定义作业配置checkppoint功能
在Flink 自定义作业 Jar包代码中加入如下代码:
如下代码含义是以EXACTLY_ONCE模式,每隔40s 保存checkpoint 到obs的${bucket}桶中的jobs/checkpoint/my_jar路径。
其中最主要是的保存checkpoint路径,一般是将checkpoint存入obs桶中,路径格式如下:
路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx
示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(40000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false);
rocksDbBackend.setOptions(new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setMaxLogFileSize(64 * 1024 * 1024)
.setKeepLogFileNum(3);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions;
}
});
env.setStateBackend(rocksDbBackend);
在DLI Flink Jar作业中配置【从checkpoint恢复】功能
控制台操作:
在自定义Jar作业【编辑页面】,先勾选【异常自动重启】,再勾选【从checkpoint恢复】,再填写【Checkpoint路径】。
Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:
【Checkpoint路径】格式为:${bucket}/xxx/xxx/xxx
示例:
若Jar包中代码配置为:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
则【Checkpoint路径】填写为: mybucket/jobs/checkpoint/jar-3
注意点:
1、每个Flink Jar作业配置的Checkpoint路径不要一致,不然无法从准确的checkpoint路径恢复。
2、checkpoint路径中的obs桶需要给DLI授权,DLI服务才能访问此桶下的文件
二、查看作业是否从checkpoint恢复
- 点赞
- 收藏
- 关注作者
评论(0)