实时即未来,大数据项目车联网之重启机制及数据积压

举报
Maynor学长 发表于 2022/10/20 14:18:07 2022/10/20
【摘要】 1 checkpoint配置l 选择合适的Checkpoint存储方式l CheckPoint存储方式存在三种官方文档:https://ci.apache.org/projects/flink/flink-docs-release-10/ops/state/state_backends.htmlMemoryStateBackend、FsStateBackend 和 RocksDBStat...

1 checkpoint配置

l 选择合适的Checkpoint存储方式

l CheckPoint存储方式存在三种

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-10/ops/state/state_backends.html

MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend

服务器中配置默认状态存储路径:flink-conf.yaml

# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints 服务器环境存储多个挂载的本地磁盘为最优
# prod envirment file:///data/flink/checkpoints
state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints

不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,在代码中设置检查点存储,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择存在挂在存储在磁盘上的FsStateBackend、RocksDBStateBackend。

u 这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。 受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;

u 其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。 与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。

“state.checkpoints.dir”参数来指定所有的checkpoints数据和元数据存储的位置

// 本地访问hdfs设置系统属性
System.setProperty("HADOOP_USER_NAME", "root");
// 1.创建flink的流式环境、设置任务的检查点(数据存储hdfs)、设置分区发现、设置任务的重启策略、数据积压内部解决策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 // 1.1.设置流式数据的参照时间 ProcessingTime:事件被处理时机器的系统时间;IngestionTime:事件进入flink事件;EventTime:事件数据中某个字段的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1.2.checkpoint配置 开启检查点,设置时间间隔为5分钟
env.enableCheckpointing(300000);
// 1.3.检查点Model设置 exactly once 仅消费一次 保证消息不丢失不重复
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//1.4 防止checkpoint 太过于频繁而导致业务处理的速度下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
//1.5 设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
//1.6 设置checkpoint最大的尝试次数,次数必须 >= 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//1.7 设置取消任务时保留checkpoint,checkpoint默认会在整个job取消时被删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//1.8 设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
String hdfsUri = ConfigLoader.getProperty("hdfsUri");
// 1.9.设置检查点存储的位置,使用RocksDBStateBackend,存储在hdfs分布式文件系统中,增量检查点
try {
    env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask"));
} catch (IOException e) {
    e.printStackTrace();
}

2 任务重启策略

l Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。

l 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-10/dev/restart_strategies.html

重启策略 重启策略值 含义
Fixed delay fixed-delay 固定延迟重启策略
Failure rate failure-rate 故障率重启策略
Fallback Restart None 背压重启策略(集群使用的重启策略)
No Restart Strategy None 无重启策略

l 默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。

// 重启策略
env.setRestartStrategy(RestartStrategies.noRestart());
// 故障率重启策略
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.minutes(10)));
// 设置重启策略为延迟重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 600000));

3 分区发现

Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次精确的保证来使用它们。最初检索分区元数据后(即,作业开始运行时)发现的所有分区将从最早的偏移量开始消耗。

默认情况下,分区发现是禁用的。要启用它,请flink.partition-discovery.interval-millis在提供的属性配置中为设置一个非负值,表示发现间隔(以毫秒为单位)。

从使用Flink 3.x之前的Flink版本的保存点还原使用者时,无法在还原运行中启用分区发现。如果启用,还原将失败,并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 3.x中获取一个保存点,然后从中再次进行恢复。

// 2.创建flink消费kafka数据的对象,伴随着kafka消费者的属性
String topic = ConfigLoader.getPropety("kafka.topic");
String brokeServers = ConfigLoader.getPropety("bootstrap.servers");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", brokeServers);
prop.setProperty("group.id", "KafkaSourceDataTask");
/** 设置kafka分区感知 间隔时间30秒*/
prop.setProperty("flink.partition-discovery.interval-millis", "30000");
// 设置key的反序列化对象
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置value发序列化对象
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置自动提交offset策略为 earliest
prop.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(), prop);
// 设置消费数据时根据指定消费组的offset进行消费
kafkaConsumer.setStartFromGroupOffsets();
// 设置自动提交offset保存到检查点
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

4 数据积压问题

4.1 什么是数据积压

流系统中消息的处理速度跟不上消息的发送速度,会导致消息的堆积,就是数据积压

4.2 数据积压的原因

l 垃圾回收卡顿可能会导致流入的数据快速堆积

l 一个数据源可能生产数据的速度过快

4.3 数据积压的后果

背压如果不能得到正确地处理,可能会导致 资源被耗尽 或者甚至出现更糟的情况导致数据丢失 在同一时间点,不管是流处理job还是sink,如果有1秒的卡顿,那么将导致至少500万条记录的积压。换句话说,source可能会产生一个脉冲,在一秒内数据的生产速度突然翻倍。

举例分析:

l 正常情况:消息处理速度 >= 消息的发送速度,不发生消息拥堵,系统运行流畅

img

l 异常情况:消息处理速度< 消息的发送速度,发生了消息拥堵,系统运行不畅

img

4.4 积压解决方案
方案 影响
删除拥堵消息 会导致数据丢失,许多流处理程序而言是不可接受的
将缓冲区持久化 方便在处理失败的情况下进行数据重放,会导致缓冲区积压的数据越来越多
缓存拥堵信息,告知消息发送者减缓消息发送的速度 对source进行限流来适配整个pipeline中最慢组件的速度,从而获得稳定状态
4.5 解决数据积压方法

Flink内部自动实现数据流自然降速,而无需担心数据丢失。Flink所获取的最大吞吐量是由pipeline中* 最慢的组件决定。

Flink1.5+ 版本引入了基于Credit的流控和反压机制,本质上是将TCP的流控机制从传输层提升到了应用层——InputGate和ResultPartition的层级,从而避免传输层造成阻塞。

img

仍然是Sender发送速度与Receiver接收速度的比是2:1的情景。Sender端的ResultSubPartition积压了2个缓存的数据,因此会将该批次要发送的数据与backlog size = 2一同发往Receiver。

Receiver收到当前批数据和backlog size之后,会计算InputChannel是否有足够的缓存来接收下一批数据,如果不够,则会去LocalBufferPool/NetworkBufferPool申请缓存,并将credit = 3通知到上游的ResultSubPartition,表示自己能够接收3个缓存的消息。

img

随着Receiver端的数据不断积压,网络缓存最终被耗尽,因此会反馈给上游credit = 0(相当于TCP滑动窗口中的window = 0),Sender端ResultPartition到Netty的链路会被阻断。按照上一节所述的流程,Sender端的网络缓存会被更快地耗尽,RecordWriter不能再写数据,从而达到反压的效果。

背后的原理:

img

l TaskManager(TM)启动时,会初始化网络缓冲池(NetworkBufferPool)

默认生成 2048 个内存块(MemorySegment)

网络缓冲池是Task之间共享的

l Task线程启动时,Flink 会为Task的 Input Gate(IG)和 ResultSubpartition(RS)分别创建一个LocationBufferPool

n LocationBufferPool的内存数量由Flink分配

为了系统更容易应对瞬时压力,内存数量是动态分配的

l Task线程执行时,Netty接收端接收到数据时,为了将数据保存拷贝到Task中

Task线程需要向本地缓冲池(LocalBufferPool)申请内存

若本地缓冲池没有可用内存,则继续向网络缓冲池(NetworkBufferPool)申请内存

内存申请成功,则开始从Netty中拷贝数据

若缓冲池已申请的数量达到上限,或网络缓冲池(NetworkerBufferPool)也没有可用内存时,该Task的Netty Channel会暂停读取,上游的发送端会立即响应停止发送,Flink流系统进入反压状态

l 经过 Task 处理后,由 Task 写入到 RequestPartition (RS)中

当Task线程写数据到ResultPartition(RS)时,也会向网络缓冲池申请内存

如果没有可用内存块,也会阻塞Task,暂停写入

l Task处理完毕数据后,会将内存块交还给本地缓冲池(LocalBufferPool)

如果本地缓冲池申请内存的数量超过池子设置的数量,将内存块回收给 网络缓冲池。如果没超过,会继续留在池子中,减少反复申请开销

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。