Flink源码分析(二):Async I/O 实现原理

举报
flink爱好者 发表于 2020/07/25 18:00:04 2020/07/25
【摘要】 对Flink Async I/O 的 原理,结合Flink的源代码进行分析。

背景

异步IO(Async IO) 是Flink1.2版本引入的特性。采用异步通信机制,解决了Flink应用和外部异交互时的网络延迟成为系统瓶颈的问题。

在流处理应用中,经常需要和外部系统打交道。

举例:订单表中存储了商品的ID,当我们需要商品详情时,需要通过商品ID在数据库中查询商品详情。常见模式,是串行同步访问,向数据库发送请求A,然后等结果返回,结果返回之后发送下一个请求。如下图左侧。


image.png


这是一种同步访问模式,网络等待时长极大地阻碍了吞吐和延迟。而AsyncIO 可以并发的处理多个请求和恢复,可以连续向数据库发送多个查询亲请求,哪个请求的回复先到达就先处理哪个请求,连续的请求之间不需要等待,极大地提高了Flink应用程序的吞吐量。如上图右侧。


Async IO 提供的功能如下:

  • 异步访问

  • 消息顺序性保证

  • 状态一致性保证


下面通过例子和源码对各个功能进行剖析。

一个栗子

// 继承RichSyncFunction
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
    // 数据库客户端
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
    	// 初始化客户端
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // 发送异步请求,返回结果Futrue
        final Future<String> result = client.query(key);

        // 请求完成时进行回调,将结果传递给 resultFuture 进行收集
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// 创建原始数据流
DataStream<String> stream = ...;

// 通过AsyncDataStream 为 原始数据流进行 AsyncI/O,得到转换后的数据流
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.orderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);


使用Flink的Async I/O,简洁清晰。主要有两步:

  1. 继承 RichAsyncFunction,其中是进行异步访问的业务逻辑。

  2. 使用AsyncDataStream.unorderWait 对原始的 Stream 进行异步 I/O 转换,得到转换后的Stream。

Async-I/O 异步访问

AsyncDataStream

AsyncDataStream 有 unorderedWait 和 orderedWait 两个静态方法,分别对应有序 和 无序 两种模式。

public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
      DataStream<IN> in,
      AsyncFunction<IN, OUT> func,
      long timeout,
      TimeUnit timeUnit,
      int capacity)
      
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
      DataStream<IN> in,
      AsyncFunction<IN, OUT> func,
      long timeout,
      TimeUnit timeUnit,
      int capacity)


  • 有序:消息的发送顺序 == 消息的接收顺序,也就是先进先出。

  • 无序:ProcessingTime 消息完全无序,先返回的结果先发送。EventTime 消息,watermark 指定了乱序的边界。在两个watermark之间的消息是无序的,无序的消息不能超越watermark。


orderedWait 和 underedWait 方法主要有5个参数:

  • in:输入数据流。

  • func:实现异步请求的业务逻辑

  • timeout、timeUnit:超时时间,异步操作超时之后会被丢弃。

  • capacity:同时可处理的异步请求的最大个数。


AsyncDataStream.unorderedWait 的主要工作是 创建一个 AsyncWaitOperator。AsyncWaitOperator 是支持AsyncI/O 的算子实现,其内部会调用AsyncFunction,并且处理返回的结果。

private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
      DataStream<IN> in,
      AsyncFunction<IN, OUT> func,
      long timeout,
      int bufSize,
      OutputMode mode) {

    ......
   // 创建AsyncWaitOperatorFactor,工厂方法,内部创建 AsyncWaitOperator
   AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<>(
      in.getExecutionEnvironment().clean(func),
      timeout,
      bufSize,
      mode);

   return in.transform("async wait operator", outTypeInfo, operatorFactory);
}


AsyncWaitOperator

AsyncWaitOperator对元素的处理: 首先会将元素加入到 Queue中,Queue的大小为Capacity,当Queue满了,加入动作会阻塞。

如果设置了超时时间,会为该元素创建一个定时器。

最后会调用异步函数。

// AsyncWaitOperator 对元素的处理
public void processElement(StreamRecord<IN> element) throws Exception {
	// 将元素加入到队列中
	final ResultFuture<OUT> entry = addToWorkQueue(element);

	final ResultHandler resultHandler = new ResultHandler(element, entry);

	// 注册定时器
	if (timeout > 0L) {
		final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

		final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer(
			timeoutTimestamp,
			timestamp -> userFunction.timeout(element.getValue(), resultHandler));

		resultHandler.setTimeoutTimer(timeoutTimer);
	}

	// 异步IO 调用
	userFunction.asyncInvoke(element.getValue(), resultHandler);
}


当异步IO之后完毕后,会调用resultHandler.complete() 方法,将结果收集到resutHandler中

// ResultHandler 的complete 方法
@Override
public void complete(Collection<OUT> results) {
	Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");

	// 互斥条件
	if (!completed.compareAndSet(false, true)) {
		return;
	}

	//将结果发送给下一个处理节点
	processInMailbox(results);
}

private void processInMailbox(Collection<OUT> results) {
	// mail box thread 中进行消息发送,processResults() 进行消息处理
	mailboxExecutor.execute(
		() -> processResults(results),
		"Result in AsyncWaitOperator of input %s", results);
}

private void processResults(Collection<OUT> results) {
	// 计算出了结果,取消定时器
	if (timeoutTimer != null) {
		// canceling in mailbox thread avoids https://issues.apache.org/jira/browse/FLINK-13635
		timeoutTimer.cancel(true);
	}

	// 更新Queue的Entry
	resultFuture.complete(results);
	// 从Queue中输出所有查询出来的结果
	outputCompletedElement();
}

// 将结果发送出去
private void outputCompletedElement() {
	if (queue.hasCompletedElements()) {
		// emit only one element to not block the mailbox thread unnecessarily
		queue.emitCompletedElement(timestampedCollector);
		// if there are more completed elements, emit them with subsequent mails
		if (queue.hasCompletedElements()) {
			mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
		}
	}
}


上述最后一个函数的resultFuture.compete()  会更新Queue中的Entry。然后将队列中已经完成的元素给发送出去。


消息顺序性保证

AsyncWaitOperator 采用 StreamElementQueue 来是实现消息的顺序性保证。有两个子类:OrderedStreamElementQueue 和 UnorderedStreamElementQueue。


OrderedStreamElementQueue

OrderedStreamElementQueue 实现了有序,内部数据结构是Java集合的Queue。当且当队列头的元素已经完成时,才会将元素发送。

代码如下:

@Override
public boolean hasCompletedElements() {
    // 队列首的元素已经完成,可以发送
   return !queue.isEmpty() && queue.peek().isDone();
}

// 发送元素
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
    // 判断队首元素是否可以发送
   if (hasCompletedElements()) {
      final StreamElementQueueEntry<OUT> head = queue.poll();
      head.emitResult(output);
   }
}


UnorderedStreamElementQueue

UnorderedStreamElementQueue 实现无序发送,使用一套逻辑实现了ProcessingTime无序 和 EventTime 无序。核心代码如下:

static class Segment<OUT> {
   /** Unfinished input elements. */
   private final Set<StreamElementQueueEntry<OUT>> incompleteElements;

   /** Undrained finished elements. */
   private final Queue<StreamElementQueueEntry<OUT>> completedElements;
}


public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

   private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

   /** Capacity of this queue. */
   private final int capacity;

   /** Queue of queue entries segmented by watermarks. */
   private final Deque<Segment<OUT>> segments;
   
   // 取出Segments 的首个元素判断是否是完成的。
   @Override
    public boolean hasCompletedElements() {
       return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
    }
}


Segment 就是一个队列,在UnorderedStreamElementQueue 中在外面又封装了一层队列。

双层队列用来解决ProcessingTime 和 EventTime 的无序。


ProcessingTime无序:segments 中永远只有一个 元素,所以将所有元素放在一个队列中。

EventTime 无序:每次放入watermark 时,在segments 队列中放入一个空的 Segment。后续的元素添加都会是另外一个队列。这样就保证了Watermark 之间的元素无序。


状态一致性保证

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
   super.snapshotState(context);

   ListState<StreamElement> partitionableState =
      getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
   partitionableState.clear();

   try {
      // 将队列中的元素保存在状态中即可。
      partitionableState.addAll(queue.values());
   } catch (Exception e) {
      partitionableState.clear();

      throw new Exception("Could not add stream element queue entries to operator state " +
         "backend of operator " + getOperatorName() + '.', e);
   }
}


在snapShot 函数中,保存了状态的信息,这是状态一致性的基础。

AsyncWaitOperator 执行快照非常简单。从代码中可以看到执行了如下步骤:

  1. 先清空原先的状态存储

  2. 将Queue中的信息全部取出,然后放入到状态存储区中。

  3. 执行快照


【参考】

  1. http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

  2. https://ci.apache.org/projects/flink/flink-docs-release-1.10/

  3. https://juejin.im/post/5e188a176fb9a02fe971eeb4

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。