Flink源码分析(二):Async I/O 实现原理
背景
异步IO(Async IO) 是Flink1.2版本引入的特性。采用异步通信机制,解决了Flink应用和外部异交互时的网络延迟成为系统瓶颈的问题。
在流处理应用中,经常需要和外部系统打交道。
举例:订单表中存储了商品的ID,当我们需要商品详情时,需要通过商品ID在数据库中查询商品详情。常见模式,是串行同步访问,向数据库发送请求A,然后等结果返回,结果返回之后发送下一个请求。如下图左侧。
这是一种同步访问模式,网络等待时长极大地阻碍了吞吐和延迟。而AsyncIO 可以并发的处理多个请求和恢复,可以连续向数据库发送多个查询亲请求,哪个请求的回复先到达就先处理哪个请求,连续的请求之间不需要等待,极大地提高了Flink应用程序的吞吐量。如上图右侧。
Async IO 提供的功能如下:
异步访问
消息顺序性保证
状态一致性保证
下面通过例子和源码对各个功能进行剖析。
一个栗子
// 继承RichSyncFunction class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { // 数据库客户端 private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { // 初始化客户端 client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // 发送异步请求,返回结果Futrue final Future<String> result = client.query(key); // 请求完成时进行回调,将结果传递给 resultFuture 进行收集 CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // 创建原始数据流 DataStream<String> stream = ...; // 通过AsyncDataStream 为 原始数据流进行 AsyncI/O,得到转换后的数据流 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.orderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
使用Flink的Async I/O,简洁清晰。主要有两步:
继承 RichAsyncFunction,其中是进行异步访问的业务逻辑。
使用AsyncDataStream.unorderWait 对原始的 Stream 进行异步 I/O 转换,得到转换后的Stream。
Async-I/O 异步访问
AsyncDataStream
AsyncDataStream 有 unorderedWait 和 orderedWait 两个静态方法,分别对应有序 和 无序 两种模式。
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity)
有序:消息的发送顺序 == 消息的接收顺序,也就是先进先出。
无序:ProcessingTime 消息完全无序,先返回的结果先发送。EventTime 消息,watermark 指定了乱序的边界。在两个watermark之间的消息是无序的,无序的消息不能超越watermark。
orderedWait 和 underedWait 方法主要有5个参数:
in:输入数据流。
func:实现异步请求的业务逻辑
timeout、timeUnit:超时时间,异步操作超时之后会被丢弃。
capacity:同时可处理的异步请求的最大个数。
AsyncDataStream.unorderedWait 的主要工作是 创建一个 AsyncWaitOperator。AsyncWaitOperator 是支持AsyncI/O 的算子实现,其内部会调用AsyncFunction,并且处理返回的结果。
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, int bufSize, OutputMode mode) { ...... // 创建AsyncWaitOperatorFactor,工厂方法,内部创建 AsyncWaitOperator AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<>( in.getExecutionEnvironment().clean(func), timeout, bufSize, mode); return in.transform("async wait operator", outTypeInfo, operatorFactory); }
AsyncWaitOperator
AsyncWaitOperator对元素的处理: 首先会将元素加入到 Queue中,Queue的大小为Capacity,当Queue满了,加入动作会阻塞。
如果设置了超时时间,会为该元素创建一个定时器。
最后会调用异步函数。
// AsyncWaitOperator 对元素的处理 public void processElement(StreamRecord<IN> element) throws Exception { // 将元素加入到队列中 final ResultFuture<OUT> entry = addToWorkQueue(element); final ResultHandler resultHandler = new ResultHandler(element, entry); // 注册定时器 if (timeout > 0L) { final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer( timeoutTimestamp, timestamp -> userFunction.timeout(element.getValue(), resultHandler)); resultHandler.setTimeoutTimer(timeoutTimer); } // 异步IO 调用 userFunction.asyncInvoke(element.getValue(), resultHandler); }
当异步IO之后完毕后,会调用resultHandler.complete() 方法,将结果收集到resutHandler中
// ResultHandler 的complete 方法 @Override public void complete(Collection<OUT> results) { Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing"); // 互斥条件 if (!completed.compareAndSet(false, true)) { return; } //将结果发送给下一个处理节点 processInMailbox(results); } private void processInMailbox(Collection<OUT> results) { // mail box thread 中进行消息发送,processResults() 进行消息处理 mailboxExecutor.execute( () -> processResults(results), "Result in AsyncWaitOperator of input %s", results); } private void processResults(Collection<OUT> results) { // 计算出了结果,取消定时器 if (timeoutTimer != null) { // canceling in mailbox thread avoids https://issues.apache.org/jira/browse/FLINK-13635 timeoutTimer.cancel(true); } // 更新Queue的Entry resultFuture.complete(results); // 从Queue中输出所有查询出来的结果 outputCompletedElement(); } // 将结果发送出去 private void outputCompletedElement() { if (queue.hasCompletedElements()) { // emit only one element to not block the mailbox thread unnecessarily queue.emitCompletedElement(timestampedCollector); // if there are more completed elements, emit them with subsequent mails if (queue.hasCompletedElements()) { mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement"); } } }
上述最后一个函数的resultFuture.compete() 会更新Queue中的Entry。然后将队列中已经完成的元素给发送出去。
消息顺序性保证
AsyncWaitOperator 采用 StreamElementQueue 来是实现消息的顺序性保证。有两个子类:OrderedStreamElementQueue 和 UnorderedStreamElementQueue。
OrderedStreamElementQueue
OrderedStreamElementQueue 实现了有序,内部数据结构是Java集合的Queue。当且当队列头的元素已经完成时,才会将元素发送。
代码如下:
@Override public boolean hasCompletedElements() { // 队列首的元素已经完成,可以发送 return !queue.isEmpty() && queue.peek().isDone(); } // 发送元素 @Override public void emitCompletedElement(TimestampedCollector<OUT> output) { // 判断队首元素是否可以发送 if (hasCompletedElements()) { final StreamElementQueueEntry<OUT> head = queue.poll(); head.emitResult(output); } }
UnorderedStreamElementQueue
UnorderedStreamElementQueue 实现无序发送,使用一套逻辑实现了ProcessingTime无序 和 EventTime 无序。核心代码如下:
static class Segment<OUT> { /** Unfinished input elements. */ private final Set<StreamElementQueueEntry<OUT>> incompleteElements; /** Undrained finished elements. */ private final Queue<StreamElementQueueEntry<OUT>> completedElements; } public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Queue of queue entries segmented by watermarks. */ private final Deque<Segment<OUT>> segments; // 取出Segments 的首个元素判断是否是完成的。 @Override public boolean hasCompletedElements() { return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted(); } }
Segment 就是一个队列,在UnorderedStreamElementQueue 中在外面又封装了一层队列。
双层队列用来解决ProcessingTime 和 EventTime 的无序。
ProcessingTime无序:segments 中永远只有一个 元素,所以将所有元素放在一个队列中。
EventTime 无序:每次放入watermark 时,在segments 队列中放入一个空的 Segment。后续的元素添加都会是另外一个队列。这样就保证了Watermark 之间的元素无序。
状态一致性保证
@Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); ListState<StreamElement> partitionableState = getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); try { // 将队列中的元素保存在状态中即可。 partitionableState.addAll(queue.values()); } catch (Exception e) { partitionableState.clear(); throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e); } }
在snapShot 函数中,保存了状态的信息,这是状态一致性的基础。
AsyncWaitOperator 执行快照非常简单。从代码中可以看到执行了如下步骤:
先清空原先的状态存储
将Queue中的信息全部取出,然后放入到状态存储区中。
执行快照
【参考】
- 点赞
- 收藏
- 关注作者
评论(0)