高并发编程/一张图精通CompletableFuture整体执行流程与设计(高手篇)
CompletableFuture
是 Java 8 引入的异步编程工具,它极大地丰富了并发编程的解决方案。作为 Future
的增强版,它不仅支持异步操作的结果管理,还提供了强大的链式调用能力,允许开发者以声明式的方式编排复杂的异步逻辑。CompletableFuture
的出现,使得代码更加简洁、清晰,同时提高了程序的响应性和吞吐量,是现代 Java 并发编程中不可或缺的一部分。
0、CompletableFuture执行流程设计
0、组件设计需求
CompletableFuture
是 Java 8 引入的一个强大的异步编程工具,它位于 java.util.concurrent
包中。以下是 CompletableFuture
的设计背景和主要特点:
设计原有
在 Java 8 之前,异步编程通常依赖于 Future
和 Callable
接口,但这些工具的使用相对繁琐,且不支持链式调用和组合操作。CompletableFuture
的引入旨在提供一种更简洁、更强大的异步编程模型,以支持复杂的异步逻辑和提高代码的可读性及可维护性。
设计特点
- 异步执行:
CompletableFuture
可以异步执行任务,不会阻塞当前线程,允许任务在另一个线程上执行,而主线程可以继续执行其他任务。 - 组合性:多个
CompletableFuture
实例可以组合在一起,形成复杂的异步流程。例如,一个CompletableFuture
的结果可以作为另一个CompletableFuture
的输入。 - 回调机制:提供了多种方法来注册回调函数,这些回调函数会在未来的某个时刻(如异步操作完成时)被自动调用。
- 异常处理:提供了一种处理异步任务中抛出的异常的方式,例如使用
exceptionally
方法来指定异常发生时的处理逻辑。 - 可定制的执行器:可以通过指定
Executor
来控制异步任务的执行线程,提供了更好的控制和资源管理能力。 - 支持同步和异步方法:
CompletableFuture
提供了同步和异步两种方法,例如thenApply
和thenApplyAsync
,允许开发者根据需要选择使用同步或异步方式处理结果。
2、CompletableFuture 功能范围
2.1. 异步执行
CompletableFuture
提供了 supplyAsync
和 runAsync
方法,允许异步执行任务。这些方法会将任务提交给 ForkJoinPool
(默认的公共线程池)或其他指定的 Executor
来执行。
2.2. 组合异步任务
CompletableFuture
支持通过 thenApply
、thenAccept
、thenRun
等方法将异步任务的结果传递给后续的任务,实现任务之间的数据流和控制流的组合。
2.3. 错误处理
通过 exceptionally
和 handle
方法,CompletableFuture
允许开发者定义异常处理逻辑,以优雅地处理异步任务中发生的异常。
2.4. 组合多个异步任务
CompletableFuture
提供了 thenCombine
、thenAcceptBoth
、thenCompose
等方法,允许将多个异步任务的结果合并处理。
2.5. 任意一个或全部完成
CompletableFuture
提供了 anyOf
和 allOf
方法,用于处理多个异步任务中的任何一个完成或全部完成的场景。
2.6. 超时和取消
CompletableFuture
支持通过 orTimeout
方法设置超时,以及通过 cancel
方法取消异步任务。
2.7. 结果获取
CompletableFuture
提供了 get
和 join
方法,用于在需要时阻塞当前线程以等待异步任务的结果。
2.8. 非阻塞结果处理
CompletableFuture
的设计允许开发者以非阻塞的方式处理异步任务的结果,这是通过提供回调函数来实现的,这些回调函数会在异步任务完成时被调用。
3、CompletableFuture 设计原则
- 非阻塞性:
CompletableFuture
的设计避免了在等待异步任务结果时阻塞主线程。 - 可组合性:
CompletableFuture
允许通过链式调用来组合多个异步任务,使得复杂的异步逻辑可以被分解为一系列简单的步骤。 - 灵活性:
CompletableFuture
提供了多种方法来处理异步任务的不同方面,包括执行、组合、异常处理等。 - 健壮性:
CompletableFuture
的设计考虑了错误处理和异常情况,提供了丰富的API来处理这些情况。
4、CompletableFuture 方法定义
4.1 常规方法相关执行流程图
4.2. 创建和启动异步计算
- CompletableFuture<Void>
runAsync(Runnable runnable)
: 在默认的ForkJoinPool
中异步执行一个任务。 - CompletableFuture<Void>
runAsync(Runnable runnable, Executor executor)
: 在指定的Executor
中异步执行一个任务。 - CompletableFuture
supplyAsync(Supplier<U> supplier)
: 在默认的ForkJoinPool
中异步执行一个供应者。 - CompletableFuture
supplyAsync(Supplier<U> supplier, Executor executor)
: 在指定的Executor
中异步执行一个供应者。
4.2. 完成时的回调
- CompletableFuture<T>
whenComplete(BiConsumer<? super T, ? super Throwable> action)
: 当任务完成时(无论成功或失败)执行一个回调。 - CompletableFuture<T>
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
: 当任务完成时(无论成功或失败)异步执行一个回调。 - CompletableFuture<T>
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
: 在指定的Executor
中异步执行一个回调。
4.3. 正常完成时的回调
- CompletableFuture<T>
thenAccept(Consumer<? super T> action)
: 当任务正常完成时执行一个回调。 - CompletableFuture<T>
thenAcceptAsync(Consumer<? super T> action)
: 当任务正常完成时异步执行一个回调。 - CompletableFuture<T>
thenAcceptAsync(Consumer<? super T> action, Executor executor)
: 在指定的Executor
中异步执行一个回调。
4.4. 正常完成时的转换
- CompletableFuture
thenApply(Function<? super T, ? extends U> fn)
: 当任务正常完成时,将一个函数应用于结果。 - CompletableFuture
thenApplyAsync(Function<? super T, ? extends U> fn)
: 当任务正常完成时,异步应用一个函数于结果。 - CompletableFuture
thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
: 在指定的Executor
中异步应用一个函数于结果。
4.5. 组合 CompletableFuture
- CompletableFuture<T>
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
: 当两个任务都完成时,消费它们的结果。 - CompletableFuture<T>
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
: 异步消费两个任务的结果。 - CompletableFuture<T>
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
: 当两个任务都完成时,组合它们的结果。 - CompletableFuture<T>
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
: 当任务完成时,以它的结果为输入启动另一个CompletableFuture
。 - CompletableFuture<T>
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
: 异步启动另一个CompletableFuture
。
4.6. 异常处理
- CompletableFuture<T>
exceptionally(Function<Throwable, ? extends T> fn)
: 当任务异常完成时,应用一个函数来计算结果。
4.7. 完成时的回调(无论成功或失败)
- CompletableFuture<T>
handle(BiFunction<? super T, Throwable, ? extends U> fn)
: 当任务完成时(无论成功或失败)执行一个回调。
4.8. 等待其他 CompletableFuture
- CompletableFuture<T>
thenRun(Runnable action)
: 当任务完成时执行一个动作。 - CompletableFuture<T>
thenRunAsync(Runnable action)
: 当任务完成时异步执行一个动作。 - CompletableFuture<T>
thenRunAsync(Runnable action, Executor executor)
: 在指定的Executor
中异步执行一个动作。
4.9. 任意一个或全部完成
- CompletableFuture<T>
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
: 当当前或另一个任务完成时消费结果。 - CompletableFuture<T>
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
: 异步消费结果。 - CompletableFuture<T>
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
: 当当前或另一个任务完成时应用函数。 - CompletableFuture<T>
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
: 异步应用函数。 - CompletableFuture<Void>
runAfterBoth(CompletionStage<?> other, Runnable action)
: 当当前和另一个任务都完成时执行动作。 - CompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action)
: 异步执行动作。 - CompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
: 在指定的Executor
中异步执行动作。 - CompletableFuture<T>
thenCompose(CompletionStage<Function<? super T, ? extends U>> fn)
: 当任务完成时,以它的结果为输入启动另一个CompletableFuture
。 - CompletableFuture
thenComposeAsync(CompletionStage<Function<? super T, ? extends U>> fn)
: 异步启动另一个CompletableFuture
。 - CompletableFuture<Void>
allOf(CompletableFuture<?>... cfs)
: 当所有给定的任务都完成时完成。 - CompletableFuture<T>
anyOf(CompletableFuture<? extends T>... cfs)
: 当任何一个给定的任务完成时完成。
4.10. 超时和取消
- CompletableFuture<T>
orTimeout(long timeout, TimeUnit unit)
: 如果在给定的超时时间内没有完成,则异常完成。 - CompletableFuture<T>
completeOnTimeout(T value, long timeout, TimeUnit unit)
: 如果在给定的超时时间内没有完成,则以给定的值完成。 - boolean
cancel(boolean mayInterruptIfRunning)
: 尝试取消执行。
4.11. 结果获取
T get()
: 等待任务完成并获取结果。T join()
: 等待任务完成并获取结果。T getNow(T valueIfAbsent)
: 获取当前结果,如果未完成则返回给定的值。T getThenApplyAsync(Function<? super T, ? extends U> fn)
: 获取结果并异步应用函数。
5、CompletableFuture 应用案例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ComprehensiveCompletableFutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 4.2. 创建和启动异步计算
CompletableFuture<Boolean> inventoryCheckFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("检查库存...");
return true; // 假设库存充足
}, executor);
// 4.4. 正常完成时的转换
CompletableFuture<String> orderCreationFuture = inventoryCheckFuture.thenApplyAsync(
Boolean::booleanValue,
ordered -> "ORDER-" + ordered,
executor
);
// 4.5. 组合 CompletableFuture
CompletableFuture<String> paymentProcessingFuture = orderCreationFuture.thenComposeAsync(
order -> CompletableFuture.supplyAsync(() -> {
System.out.println("处理支付...");
return "PAYMENT-" + order;
}),
executor
);
// 4.6. 异常处理
CompletableFuture<String> exceptionHandledFuture = paymentProcessingFuture.exceptionally(
ex -> "EXCEPTION-" + ex.getMessage()
);
// 4.7. 完成时的回调(无论成功或失败)
CompletableFuture<String> finalResultFuture = exceptionHandledFuture.handle((result, ex) -> {
if (ex == null) {
System.out.println("订单处理成功: " + result);
return result;
} else {
System.out.println("订单处理失败: " + ex.getMessage());
return "FAILURE-" + ex.getMessage();
}
});
// 4.8. 等待其他 CompletableFuture
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
inventoryCheckFuture,
orderCreationFuture,
paymentProcessingFuture,
exceptionHandledFuture,
finalResultFuture
);
// 4.9. 任意一个或全部完成
CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(
inventoryCheckFuture,
orderCreationFuture,
paymentProcessingFuture,
exceptionHandledFuture,
finalResultFuture
);
// 4.10. 超时和取消
CompletableFuture<String> timeoutFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "TIMEOUT";
}).orTimeout(1, TimeUnit.SECONDS);
// 4.11. 结果获取
finalResultFuture.thenAccept(result -> {
System.out.println("最终结果: " + result);
});
timeoutFuture.exceptionally(ex -> {
System.out.println("超时异常: " + ex.getMessage());
return null;
});
// 等待所有任务完成
allFutures.join();
// 关闭线程池
executor.shutdown();
}
}
代码解释:
- 创建和启动异步计算:使用
supplyAsync
检查库存。 - 正常完成时的转换:使用
thenApplyAsync
创建订单。 - 组合CompletableFuture:使用
thenComposeAsync
处理支付。 - 异常处理:使用
exceptionally
处理支付过程中的异常。 - 完成时的回调(无论成功或失败) :使用
handle
处理最终结果。 - 等待其他CompletableFuture:使用
allOf
等待所有任务完成。 - 任意一个或全部完成:使用
anyOf
等待任一任务完成。 - 超时和取消:使用
orTimeout
设置超时。 - 结果获取:使用
thenAccept
打印最终结果。
- 点赞
- 收藏
- 关注作者
评论(0)