实时即未来,大数据项目车联网之重启机制及数据积压
1 checkpoint配置
l 选择合适的Checkpoint存储方式
l CheckPoint存储方式存在三种
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-10/ops/state/state_backends.html
MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend
服务器中配置默认状态存储路径:flink-conf.yaml
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints 服务器环境存储多个挂载的本地磁盘为最优
# prod envirment file:///data/flink/checkpoints
state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints
不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,在代码中设置检查点存储,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择存在挂在存储在磁盘上的FsStateBackend、RocksDBStateBackend。
u 这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。 受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;
u 其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。 与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。
“state.checkpoints.dir”参数来指定所有的checkpoints数据和元数据存储的位置
// 本地访问hdfs设置系统属性
System.setProperty("HADOOP_USER_NAME", "root");
// 1.创建flink的流式环境、设置任务的检查点(数据存储hdfs)、设置分区发现、设置任务的重启策略、数据积压内部解决策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 1.1.设置流式数据的参照时间 ProcessingTime:事件被处理时机器的系统时间;IngestionTime:事件进入flink事件;EventTime:事件数据中某个字段的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1.2.checkpoint配置 开启检查点,设置时间间隔为5分钟
env.enableCheckpointing(300000);
// 1.3.检查点Model设置 exactly once 仅消费一次 保证消息不丢失不重复
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//1.4 防止checkpoint 太过于频繁而导致业务处理的速度下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
//1.5 设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
//1.6 设置checkpoint最大的尝试次数,次数必须 >= 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//1.7 设置取消任务时保留checkpoint,checkpoint默认会在整个job取消时被删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//1.8 设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
String hdfsUri = ConfigLoader.getProperty("hdfsUri");
// 1.9.设置检查点存储的位置,使用RocksDBStateBackend,存储在hdfs分布式文件系统中,增量检查点
try {
env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask"));
} catch (IOException e) {
e.printStackTrace();
}
2 任务重启策略
l Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。
l 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-10/dev/restart_strategies.html
重启策略 | 重启策略值 | 含义 |
---|---|---|
Fixed delay | fixed-delay | 固定延迟重启策略 |
Failure rate | failure-rate | 故障率重启策略 |
Fallback Restart | None | 背压重启策略(集群使用的重启策略) |
No Restart Strategy | None | 无重启策略 |
l 默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。
// 重启策略
env.setRestartStrategy(RestartStrategies.noRestart());
// 故障率重启策略
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.minutes(10)));
// 设置重启策略为延迟重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 600000));
3 分区发现
Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次精确的保证来使用它们。最初检索分区元数据后(即,作业开始运行时)发现的所有分区将从最早的偏移量开始消耗。
默认情况下,分区发现是禁用的。要启用它,请flink.partition-discovery.interval-millis在提供的属性配置中为设置一个非负值,表示发现间隔(以毫秒为单位)。
从使用Flink 3.x之前的Flink版本的保存点还原使用者时,无法在还原运行中启用分区发现。如果启用,还原将失败,并出现异常。在这种情况下,为了使用分区发现,请首先在Flink 3.x中获取一个保存点,然后从中再次进行恢复。
// 2.创建flink消费kafka数据的对象,伴随着kafka消费者的属性
String topic = ConfigLoader.getPropety("kafka.topic");
String brokeServers = ConfigLoader.getPropety("bootstrap.servers");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", brokeServers);
prop.setProperty("group.id", "KafkaSourceDataTask");
/** 设置kafka分区感知 间隔时间30秒*/
prop.setProperty("flink.partition-discovery.interval-millis", "30000");
// 设置key的反序列化对象
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置value发序列化对象
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置自动提交offset策略为 earliest
prop.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<String>(topic, new SimpleStringSchema(), prop);
// 设置消费数据时根据指定消费组的offset进行消费
kafkaConsumer.setStartFromGroupOffsets();
// 设置自动提交offset保存到检查点
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
4 数据积压问题
4.1 什么是数据积压
流系统中消息的处理速度跟不上消息的发送速度,会导致消息的堆积,就是数据积压
4.2 数据积压的原因
l 垃圾回收卡顿可能会导致流入的数据快速堆积
l 一个数据源可能生产数据的速度过快
4.3 数据积压的后果
背压如果不能得到正确地处理,可能会导致 资源被耗尽 或者甚至出现更糟的情况导致数据丢失 在同一时间点,不管是流处理job还是sink,如果有1秒的卡顿,那么将导致至少500万条记录的积压。换句话说,source可能会产生一个脉冲,在一秒内数据的生产速度突然翻倍。
举例分析:
l 正常情况:消息处理速度 >= 消息的发送速度,不发生消息拥堵,系统运行流畅
l 异常情况:消息处理速度< 消息的发送速度,发生了消息拥堵,系统运行不畅
4.4 积压解决方案
方案 | 影响 |
---|---|
删除拥堵消息 | 会导致数据丢失,许多流处理程序而言是不可接受的 |
将缓冲区持久化 | 方便在处理失败的情况下进行数据重放,会导致缓冲区积压的数据越来越多 |
缓存拥堵信息,告知消息发送者减缓消息发送的速度 | 对source进行限流来适配整个pipeline中最慢组件的速度,从而获得稳定状态 |
4.5 解决数据积压方法
Flink内部自动实现数据流自然降速,而无需担心数据丢失。Flink所获取的最大吞吐量是由pipeline中* 最慢的组件决定。
Flink1.5+ 版本引入了基于Credit的流控和反压机制,本质上是将TCP的流控机制从传输层提升到了应用层——InputGate和ResultPartition的层级,从而避免传输层造成阻塞。
仍然是Sender发送速度与Receiver接收速度的比是2:1的情景。Sender端的ResultSubPartition积压了2个缓存的数据,因此会将该批次要发送的数据与backlog size = 2一同发往Receiver。
Receiver收到当前批数据和backlog size之后,会计算InputChannel是否有足够的缓存来接收下一批数据,如果不够,则会去LocalBufferPool/NetworkBufferPool申请缓存,并将credit = 3通知到上游的ResultSubPartition,表示自己能够接收3个缓存的消息。
随着Receiver端的数据不断积压,网络缓存最终被耗尽,因此会反馈给上游credit = 0(相当于TCP滑动窗口中的window = 0),Sender端ResultPartition到Netty的链路会被阻断。按照上一节所述的流程,Sender端的网络缓存会被更快地耗尽,RecordWriter不能再写数据,从而达到反压的效果。
背后的原理:
l TaskManager(TM)启动时,会初始化网络缓冲池(NetworkBufferPool)
默认生成 2048 个内存块(MemorySegment)
网络缓冲池是Task之间共享的
l Task线程启动时,Flink 会为Task的 Input Gate(IG)和 ResultSubpartition(RS)分别创建一个LocationBufferPool
n LocationBufferPool的内存数量由Flink分配
为了系统更容易应对瞬时压力,内存数量是动态分配的
l Task线程执行时,Netty接收端接收到数据时,为了将数据保存拷贝到Task中
Task线程需要向本地缓冲池(LocalBufferPool)申请内存
若本地缓冲池没有可用内存,则继续向网络缓冲池(NetworkBufferPool)申请内存
内存申请成功,则开始从Netty中拷贝数据
若缓冲池已申请的数量达到上限,或网络缓冲池(NetworkerBufferPool)也没有可用内存时,该Task的Netty Channel会暂停读取,上游的发送端会立即响应停止发送,Flink流系统进入反压状态
l 经过 Task 处理后,由 Task 写入到 RequestPartition (RS)中
当Task线程写数据到ResultPartition(RS)时,也会向网络缓冲池申请内存
如果没有可用内存块,也会阻塞Task,暂停写入
l Task处理完毕数据后,会将内存块交还给本地缓冲池(LocalBufferPool)
如果本地缓冲池申请内存的数量超过池子设置的数量,将内存块回收给 网络缓冲池。如果没超过,会继续留在池子中,减少反复申请开销
- 点赞
- 收藏
- 关注作者
评论(0)