Flink状态与容错介绍

举报
米兰的小铁匠 发表于 2020/09/30 13:07:32 2020/09/30
【摘要】 时间、窗口、状态、容错可以称为是flink的四大基石,本文将介绍下flink中的状态和容错。 本文会介绍flink状态的类型、状态后端、状态扩缩容、持久化、实现有状态算子以及结合max算子讲内置状态。 容错方面介绍基于jm、tm、rm三个组件上的组件级容错。 介绍分布式异步快照的实现以及如何做到at least once和exactly once、end to end exactly。

时间、窗口、状态、容错可以称为是flink的四大基石,本文将介绍下flink中的状态和容错。

1. 状态

flink中的计算分为有状态和无状态两种,有状态算子需要使用内置状态和流入的事件来计算结果,本文基于有状态计算来讲述状态机制。

1.1 状态类型

1.1.1 按键值区分

  根据key值分区,flink可以分为keyed state 和 operator state两种。

  对于flink数据流的每个key分区,都会有一个keystate与其对应,键值相同的记录访问到的是一样的状态。

对于非keyedstream 与其对应的是opeartor state, operator state只与算子的并行实例相关,每个算子实例只持有一部分状态数据,其对应的state类型只有一种,为listState。

1.1.2 根据操作者区分

    flink中的状态可以分为managed state(托管状态)、和raw state 原生状态,managedState是flink自己管理的state,rawstate需要用户自己管理。使用byte[]来读写状态。

1.1.3 广播状态

   即broadcaststate,广播至下游所有任务,在本地存储,broadcaststate必须是mapstate类型。对于dataset api中,广播在每个计算节点存储一份,对于datastream 而言,在每个并行度存在一份,算子的每个任务的状态都相同。同时flink watermark和CheckpointBarrier向下游传递时也是基于广播机制。

1.2 状态后端

   状态后端是一种可插拔的机制,按照存储介质,状态后端分为MemoryStateBackend、FsStateBackend、RocksDBStateBackend三种。状态后端主要负责本地状态管理和将状态checkpoint到远程存储。checkpoint机制将在下节讲解。

1. memoryStateBackend

这种形式状态存储在堆内存中,状态过大可能导致oom问题,checkpoint时快照到jobmanager内存中。

2. FsStateBackend

状态保存在taskmanager内存中,与memoryStateBackend不同的是,checkpoint会把state快照到外部文件系统中,相对memoryStateBackend可用性更高。

上述两种状态都依赖于heapkeyedstatebackend,使用statetable存储数据,statetable有两种实现

CopyOnWriteStateTable 支持copy-on-write实现,可以支持异步快照,

另外一种是NestedMapsStateTable,其使用一个双层的hashmap和单层的hashmap作为状态存储。


3. RocksDBStateBackend

   rocksdb是一种类似于hbase的kv存储本地数据库,依赖于lsm实现,可以将数据保存到本地磁盘上,读写状态时会涉及到序列化反序列化操作,与内存相比,性能会偏低些。但其可以保存比较大的状态,受限于磁盘大小,但其key value依赖于byte数组,大小受byte[]限制。在一些与外部系统交互的场景可以适当的使用rocksdb减少依赖外部系统。同时rocksdb后端支持增量checkpoint。

1.3 算子扩缩容

有状态算子并行度调整时,同时伴随着状态的重分布。

1. 对于broadcaststate,并行度改变时,状态会发到新的task上,以确保所有的task状态相同。

2. 对于liststate,并行度改变时,将每个list取出,合并成一个,根据元素均匀的分配给新的task。

3. 对于keyedState,flink会将所有的键值分为不同的键值组,每个键值组包含部分键值,每个键值隶属于唯一的键值组,flink以键值为单位将键值分配给不同的任务。`键值组分配算法如下:

/**
 * Assigns the given key to a key-group index.
 *
 * @param key the key to assign
 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 * @return the key-group to which the given key is assigned
 */
public static int assignToKeyGroup(Object key, int maxParallelism) {
   Preconditions.checkNotNull(key, "Assigned key must not be null!");
   return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
/**
 * Assigns the given key to a key-group index.
 *
 * @param keyHash the hash of the key to assign
 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 * @return the key-group to which the given key is assigned
 */
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
   return MathUtils.murmurHash(keyHash) % maxParallelism;
}

该图展现了keyedState的扩缩容流程

1.4 持久化策略

    heapsnapshotStrategy策略对应heapKeyedStateBackend,rocksdbStateBackend支持rocksfullSnapshotStratey和rocksIncementalSnapshotStrategy全量持久化策略和增量持久化策略。

1. 全量持久化

    全量持久化策略每次讲全量的state写到状态存储中,在执行持久化策略时,使用异步的方式,后台启动一个线程去做持久化工作,基于memory的状态后端会使用CopyOnWriteStateTable来保证线程安全,基于rocksdb的则使用rocksdb的快照机制来确保线程安全。

2. 增量持久化

     只有rocksdb后端支持增量持久化,rocksdb基于lsm-tree实现,

基于lsm-tree的memtable、sstable、以及Compaction来实现增量更新。


1.5 基于max算子讲解managedState

会有同学有疑问,我没看到状态,状态在哪里被使用,下面基于max算子讲下managedstate的使用流程。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();      
         env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.fromElements(WORDS)  
          .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {        }) 
           .keyBy(0)    datastream 以key划分成keyedStream,相同的key流入一个分区
           .max(1)   max为有状态算子,每个key上都有一个对应的状态,每来一条数据的时候,都会取出取出状态和当前数据比较,并更新放回状态。是个           reducefunction
           .print();

下面可以看下max的具体实现

public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
   return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,  // 重置compare方法
         getExecutionConfig()));
}

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
   StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
         clean(aggregate), getType().createSerializer(getExecutionConfig()));
   return transform("Keyed Aggregation", getType(), operator);
}/

// StreamGroupedReduce  继承于AbstractUdfStreamOperator --》 AbstractStreamOperator --》 StreamOperator算子体系
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
      implements OneInputStreamOperator<IN, IN> {
 
   private transient ValueState<IN> values;
 public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
      super(reducer);    // 传入reduce函数即上文重写compare方法后的 ComparableAggregator extends reducefunction
      this.serializer = serializer;
   }
   @Override
   public void open() throws Exception {
    ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);  // 初始化 valuestate 描述符
      values = getPartitionedState(stateId); // 获取当前key对应的valuestate
   }
   @Override
   public void processElement(StreamRecord<IN> element) throws Exception {
      IN value = element.getValue();  // 拿到当前流数据
      IN currentValue = values.value();  // 取出状态
      if (currentValue != null) {
         IN reduced = userFunction.reduce(currentValue, value);
         values.update(reduced); // 更新状态
         output.collect(element.replace(reduced));
      } else {
         values.update(value);  // 第一次接受流数据
         output.collect(element.replace(value));
      }
   }

1.6 实现有状态函数

1.用户可以使用flink自带的richfunction操作状态。richfunction可以通过getRunctionContext获取RuntimeContext对象,以此操作状态。首先注册一个状态描述符,包含状态名称和类型,状态名称的作用域是整个算子,可以在算子中注册多个状态描述符来创建多个状态对象。状态引用通常在richfunction中的open方法中初始化。

2. 处理函数 ProcessFunction,不过其也是继承与richfunction体系。

3. 通过实现ListCheckpointed和实现CheckpointedFunction接口

public interface ListCheckpointed<T extends Serializable> {
// 生成检查点时调用,返回列表状态快照
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
// 初始化函数时调用,作业启动或者故障恢复时使用,
 void restoreState(List<T> state) throws Exception;
}

snapshotState可以将算子状态拆分成多个部分,restoreState可以对多个子状态进行组装,算子进行状态恢复时,flink会将状态的各个部分分发到相关并行实例上。

public interface CheckpointedFunction {

   /**
    * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
    * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
    * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     */  生成检查点时被调用
   void snapshotState(FunctionSnapshotContext context) throws Exception;

   /**
    * This method is called when the parallel function instance is created during distributed
    * execution. Functions typically set up their state storing data structures in this method.
    */  任务启动或故障重启时触发
   void initializeState(FunctionInitializationContext context) throws Exception;
}

checkpointedFunction 可使用列表状态和键值分区状态。

1.7 状态过期

2. 容错机制

流式系统通常被设计用来7 * 24 运行,这样就要确保作业能够从失败中恢复,失败时,系统需要重启任务,并且恢复状态。

 2.1 容错语义

1. at more once

至多一次

2. at least once

至少一次

3. exactly once

flink内精确一次

4. end to end exactly once

端到端精确一次会关注三个问题,sources、shuffle、sinks间的exactly once。

How Dataflow guarantees that every source record is processed exactly once

How Dataflow guarantees that every record is shuffled exactly once

How Dataflow guarantees that every sink produces accurate output.

本文中的内容将回答这三个问题。


2.2 组件容错

对于flink这种分布式组件,每个组件失败都会导致作业失败,下文将基于jobmaster、taskmanager、resourcemanager三种角色来讲解容错,这三种组件通过akka进行网络通信,保持心跳。故障检测需要使用心跳的方式,故障恢复需要状态快照和恢复。重要组件使用至少两个的HA机制,一个leader和若干个standby。


2.2.1 jobmaster容错

jobmaster在flink中负责应用程序执行调度和管理,将客户端提交过来的jobGraph转化为executionGraph,向resourcemanager申请资源,并将executionGraph的任务分发给taskmanager执行,在执行过程中,jm还会触发checkpoint执行,通常一个leader,一个standby。

jm失败时,所有的task都会被自动取消,首先会从zk中取到jobgraph、jar、最近的checkpoint地址,向resourcemanager去申请资源重启任务,任务重启后将从最近的一次checkpoint状态恢复task。 如图所示:

private JobGraph jobGraph;

private final HighAvailabilityServices highAvailabilityServices;
// 心跳管理
ivate final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;

private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

// slot资源池
private final SlotPool slotPool;   
/** Managers for the different slot sharing groups. */
protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
private final SchedulingStrategy schedulingStrategy;
//重启策略
private final RestartStrategy restartStrategy;
// --------- BackPressure -------- 反压追踪
private final BackPressureStatsTracker backPressureStatsTracker;
// --------- ResourceManager --------
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// --------- TaskManagers --------
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
// -------- Mutable fields ---------
private ExecutionGraph executionGraph;

2.2.2 taskmanager容错

    taskmanager在flink中的工作进程,是flink处理资源单元,同时是checkpoint的执行者,每个taskmanager有1到多个slot,tm启动后,会向rm注册它的处理槽,jm就可以像slot分配任务来执行任务,task在tm中执行,tm负责task的启动、执行、取消,task异常时会向jm汇报,tm为task数据交换提供依托,如同一个tm内不同线程间的通过缓冲区的数据交换,不同tm的线程间通过网络传输机制进行数据交换并提供反压机制。执行期间如果一个taskmanager失败了,jobmaster将向resourcemanager申请可用的slot,有足够的slot后,这个taskmanager将会重启。重启策略分为三种,fixed-delay、failure-rate、no-restart。

/** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;

/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<Void, AccumulatorReport> jobManagerHeartbeatManager;

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
// slot清单
private final TaskSlotTable taskSlotTable;

2.2.3 resourcemanager容错

     resourcemanager在flink中是一个可插拔组件,对于不用的环境有不同的实现,如对用k8s 和 yarn 有对应的KubernetesResourceManager和YarnResourceManager等。其负责资源的管理,同时和tm、jm保持心跳。rm中持有一个slotmanager组件,负责维护当前有多少tm,各个slot的使用情况。 rm发生故障时,jm会通过leader选举通知得到新的rm,并重新尝试和新的rm建立连接。tm也会将自己的slot注册到新的rm。

// 高可用管理
/** High availability services for leader retrieval and election. */
private final HighAvailabilityServices highAvailabilityServices;
// 心跳管理
/** The heartbeat manager with task managers. */
private final HeartbeatManager<SlotReport, Void> taskManagerHeartbeatManager;
/** The heartbeat manager with job managers. */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
// slot管理器
/** The slot manager maintains the available slots. */
private final SlotManager slotManager;

2.3 checkpoint

   flink的checkpoint机制是轻量级异步分布式快照,那它是如何做到它的名字中说的异步分布式的呢,又是如何做到at least once 和 exactly once的呢?


2.3.1 分布式快照

      分布式是按算子将数据流切分,flink使用Chandy-Lamport分布式快照算法来生成检查点,将生成检查点的过程与处理过程分离。流计算计算的正确性语义需要依赖state + checkpoint + connector特性来支持。flink jm中使用CheckpointCoordinator周期性的以广播的形式发送barrier到数据流中,每个barrier都有一个检查点编号,用来从逻辑上切分数据流,barrier在流中的位置不会提前或延后,随数据流乡下流动,在数据源注入barrier后,触发向状态后端生成检查点,barrier所在位置就是恢复数据时的起始位置。下游算子收齐barrier后,会执行自己的算子state快照,并向下游广播barrier,直至下游sink算子收集齐barrier后,对自己的state执行快照,完成快照后,并通知CheckpointCoordinator快照地址,CheckpointCoordinator收集齐barriar确认信息后,确认本次快照完成。

2.3.2 at least once

    对齐barrier时,早来的数据持续处理就是at least once。该数据依旧被包含在checkpoint备份的状态之中,当故障发生时,从备份的状态恢复时,该数据依旧会被处理。

2.3.3 exactly once

     对齐barrier时,早来的数据进行buffer就是flink引擎内exectly once。就是对齐环节,将数据收集缓存起来,对齐完成之后再处理。但只能保证flink引擎内的exactly once 语义。

2.3.4 end to end exactly once

如果要做到end to end exactly once 语义,需要Source端和sink端的配合,

1. 数据源需要支持断点读取

2. sink端需要支持回滚机制或满足幂等性。回滚--将部分写入结果回滚到写入之前的状态,幂等-- 多次写入一致

源端和目的端支持的语义如下:

    


    flink中采用的是2pc解决方案,即 two phase commit 两阶段提交,所谓的两个阶段是指:第一阶段:准备阶段(投票阶段)和第二阶段:提交阶段(执行阶段)。我们将提议的节点称为协调者(coordinator),其他参与决议节点称为参与者(participants)。另还有3pc的方式。

   sink算子收到上游所有的barrier后,执行state快照,并预提交事务,再通知CheckpointCoordinator,CheckpointCoordinator确认本次快照完成, sink算子提交事务。

flink提供了一个2pc的TwoPhaseCommitSinkFunction的抽象 类

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
      extends RichSinkFunction<IN>
      implements CheckpointedFunction, CheckpointListener {  // 继承于checkpointedfunction 和 checkpointlistener 和richfunction
/**
 * Method that starts a new transaction.  开启事务 创建临时文件
 */
protected abstract TXN beginTransaction() throws Exception;

/**
 * Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
 * transaction for a commit that might happen in the future. After this point the transaction might still be
 * aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
 * will always succeed.
 *  预提交阶段  写到临时文件, 关闭文件,同时开启新事务,执行属于下一个检查点的写入操作。
*/
protected abstract void preCommit(TXN transaction) throws Exception;

/**
 * Commit a pre-committed transaction. If this method fail, Flink application will be
 * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
 * same transaction.   提交阶段,原子操作上一阶段的文件,写入真正的文件下, 提交失败, 重启尝试恢复并重新提交事务,恢复时幂等提交。
 */
protected abstract void commit(TXN transaction);

* Abort a transaction.   终止事务, 删除临时文件
 */
protected abstract void abort(TXN transaction);
}

    TwoPhaseCommitSinkFunction 继承了checkpointfunction接口,可以在预提交阶段,将检查点写到可靠性存储。其又继承了checkpointlistener接口,在提交阶段接收jm的确认通知,触发提交外部事务。

2.4 savepoint

     savepoint和checkpoint都是一致性快照,checkpoint由flink自动创建,故障发生时会自动加载恢复,flink根据配置自动删除,而保存点需要用户手动触发,不会被自动删除,启动时,从savepoint启动即可。savepoint主要用于作业迁移和集群升级等场景中。

bin/flink/ savepoint jobId savepointpath 
bin/flin/ run -s savepointpath ...

3。 后记

     后续将更多的投向flink sql,so,上云就上华为云,流计算服务推荐选择华为云EI 数据湖探索 DLI-FLINK serverless云服务。


参考:

   https://ci.apache.org/projects/flink/flink-docs-master/

  https://github.com/apache/flink/tree/blink

  https://www.oreilly.com/library/view/stream-processing-with/9781491974285/

  

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。