Flink源码分析(二):Async I/O 实现原理
背景
异步IO(Async IO) 是Flink1.2版本引入的特性。采用异步通信机制,解决了Flink应用和外部异交互时的网络延迟成为系统瓶颈的问题。
在流处理应用中,经常需要和外部系统打交道。
举例:订单表中存储了商品的ID,当我们需要商品详情时,需要通过商品ID在数据库中查询商品详情。常见模式,是串行同步访问,向数据库发送请求A,然后等结果返回,结果返回之后发送下一个请求。如下图左侧。

这是一种同步访问模式,网络等待时长极大地阻碍了吞吐和延迟。而AsyncIO 可以并发的处理多个请求和恢复,可以连续向数据库发送多个查询亲请求,哪个请求的回复先到达就先处理哪个请求,连续的请求之间不需要等待,极大地提高了Flink应用程序的吞吐量。如上图右侧。
Async IO 提供的功能如下:
异步访问
消息顺序性保证
状态一致性保证
下面通过例子和源码对各个功能进行剖析。
一个栗子
// 继承RichSyncFunction
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
// 数据库客户端
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化客户端
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 发送异步请求,返回结果Futrue
final Future<String> result = client.query(key);
// 请求完成时进行回调,将结果传递给 resultFuture 进行收集
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 创建原始数据流
DataStream<String> stream = ...;
// 通过AsyncDataStream 为 原始数据流进行 AsyncI/O,得到转换后的数据流
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.orderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
使用Flink的Async I/O,简洁清晰。主要有两步:
继承 RichAsyncFunction,其中是进行异步访问的业务逻辑。
使用AsyncDataStream.unorderWait 对原始的 Stream 进行异步 I/O 转换,得到转换后的Stream。
Async-I/O 异步访问
AsyncDataStream
AsyncDataStream 有 unorderedWait 和 orderedWait 两个静态方法,分别对应有序 和 无序 两种模式。
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity)
有序:消息的发送顺序 == 消息的接收顺序,也就是先进先出。
无序:ProcessingTime 消息完全无序,先返回的结果先发送。EventTime 消息,watermark 指定了乱序的边界。在两个watermark之间的消息是无序的,无序的消息不能超越watermark。
orderedWait 和 underedWait 方法主要有5个参数:
in:输入数据流。
func:实现异步请求的业务逻辑
timeout、timeUnit:超时时间,异步操作超时之后会被丢弃。
capacity:同时可处理的异步请求的最大个数。
AsyncDataStream.unorderedWait 的主要工作是 创建一个 AsyncWaitOperator。AsyncWaitOperator 是支持AsyncI/O 的算子实现,其内部会调用AsyncFunction,并且处理返回的结果。
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {
......
// 创建AsyncWaitOperatorFactor,工厂方法,内部创建 AsyncWaitOperator
AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);
return in.transform("async wait operator", outTypeInfo, operatorFactory);
}
AsyncWaitOperator
AsyncWaitOperator对元素的处理: 首先会将元素加入到 Queue中,Queue的大小为Capacity,当Queue满了,加入动作会阻塞。
如果设置了超时时间,会为该元素创建一个定时器。
最后会调用异步函数。
// AsyncWaitOperator 对元素的处理
public void processElement(StreamRecord<IN> element) throws Exception {
// 将元素加入到队列中
final ResultFuture<OUT> entry = addToWorkQueue(element);
final ResultHandler resultHandler = new ResultHandler(element, entry);
// 注册定时器
if (timeout > 0L) {
final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timeoutTimer = getProcessingTimeService().registerTimer(
timeoutTimestamp,
timestamp -> userFunction.timeout(element.getValue(), resultHandler));
resultHandler.setTimeoutTimer(timeoutTimer);
}
// 异步IO 调用
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
当异步IO之后完毕后,会调用resultHandler.complete() 方法,将结果收集到resutHandler中
// ResultHandler 的complete 方法
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");
// 互斥条件
if (!completed.compareAndSet(false, true)) {
return;
}
//将结果发送给下一个处理节点
processInMailbox(results);
}
private void processInMailbox(Collection<OUT> results) {
// mail box thread 中进行消息发送,processResults() 进行消息处理
mailboxExecutor.execute(
() -> processResults(results),
"Result in AsyncWaitOperator of input %s", results);
}
private void processResults(Collection<OUT> results) {
// 计算出了结果,取消定时器
if (timeoutTimer != null) {
// canceling in mailbox thread avoids https://issues.apache.org/jira/browse/FLINK-13635
timeoutTimer.cancel(true);
}
// 更新Queue的Entry
resultFuture.complete(results);
// 从Queue中输出所有查询出来的结果
outputCompletedElement();
}
// 将结果发送出去
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements()) {
mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
}
}
}
上述最后一个函数的resultFuture.compete() 会更新Queue中的Entry。然后将队列中已经完成的元素给发送出去。
消息顺序性保证
AsyncWaitOperator 采用 StreamElementQueue 来是实现消息的顺序性保证。有两个子类:OrderedStreamElementQueue 和 UnorderedStreamElementQueue。
OrderedStreamElementQueue
OrderedStreamElementQueue 实现了有序,内部数据结构是Java集合的Queue。当且当队列头的元素已经完成时,才会将元素发送。
代码如下:
@Override
public boolean hasCompletedElements() {
// 队列首的元素已经完成,可以发送
return !queue.isEmpty() && queue.peek().isDone();
}
// 发送元素
@Override
public void emitCompletedElement(TimestampedCollector<OUT> output) {
// 判断队首元素是否可以发送
if (hasCompletedElements()) {
final StreamElementQueueEntry<OUT> head = queue.poll();
head.emitResult(output);
}
}
UnorderedStreamElementQueue
UnorderedStreamElementQueue 实现无序发送,使用一套逻辑实现了ProcessingTime无序 和 EventTime 无序。核心代码如下:
static class Segment<OUT> {
/** Unfinished input elements. */
private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
/** Undrained finished elements. */
private final Queue<StreamElementQueueEntry<OUT>> completedElements;
}
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
/** Capacity of this queue. */
private final int capacity;
/** Queue of queue entries segmented by watermarks. */
private final Deque<Segment<OUT>> segments;
// 取出Segments 的首个元素判断是否是完成的。
@Override
public boolean hasCompletedElements() {
return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
}
}
Segment 就是一个队列,在UnorderedStreamElementQueue 中在外面又封装了一层队列。
双层队列用来解决ProcessingTime 和 EventTime 的无序。
ProcessingTime无序:segments 中永远只有一个 元素,所以将所有元素放在一个队列中。
EventTime 无序:每次放入watermark 时,在segments 队列中放入一个空的 Segment。后续的元素添加都会是另外一个队列。这样就保证了Watermark 之间的元素无序。
状态一致性保证
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
try {
// 将队列中的元素保存在状态中即可。
partitionableState.addAll(queue.values());
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
在snapShot 函数中,保存了状态的信息,这是状态一致性的基础。
AsyncWaitOperator 执行快照非常简单。从代码中可以看到执行了如下步骤:
先清空原先的状态存储
将Queue中的信息全部取出,然后放入到状态存储区中。
执行快照
【参考】
- 点赞
- 收藏
- 关注作者
评论(0)