高并发编程/一张图精通CompletableFuture整体执行流程与设计(高手篇)

举报
肖哥弹架构 发表于 2024/11/26 18:03:44 2024/11/26
【摘要】 CompletableFuture 是 Java 8 引入的异步编程工具,它极大地丰富了并发编程的解决方案。作为 Future 的增强版,它不仅支持异步操作的结果管理,还提供了强大的链式调用能力,允许开发者以声明式的方式编排复杂的异步逻辑。CompletableFuture 的出现,使得代码更加简洁、清晰,同时提高了程序的响应性和吞吐量,是现代 Java 并发编程中不可或缺的一部分。

image.png
CompletableFuture 是 Java 8 引入的异步编程工具,它极大地丰富了并发编程的解决方案。作为 Future 的增强版,它不仅支持异步操作的结果管理,还提供了强大的链式调用能力,允许开发者以声明式的方式编排复杂的异步逻辑。CompletableFuture 的出现,使得代码更加简洁、清晰,同时提高了程序的响应性和吞吐量,是现代 Java 并发编程中不可或缺的一部分。

0、CompletableFuture执行流程设计

image.png

0、组件设计需求

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它位于 java.util.concurrent 包中。以下是 CompletableFuture 的设计背景和主要特点:

设计原有

在 Java 8 之前,异步编程通常依赖于 FutureCallable 接口,但这些工具的使用相对繁琐,且不支持链式调用和组合操作。CompletableFuture 的引入旨在提供一种更简洁、更强大的异步编程模型,以支持复杂的异步逻辑和提高代码的可读性及可维护性。

设计特点

  1. 异步执行CompletableFuture 可以异步执行任务,不会阻塞当前线程,允许任务在另一个线程上执行,而主线程可以继续执行其他任务。
  2. 组合性:多个 CompletableFuture 实例可以组合在一起,形成复杂的异步流程。例如,一个 CompletableFuture 的结果可以作为另一个 CompletableFuture 的输入。
  3. 回调机制:提供了多种方法来注册回调函数,这些回调函数会在未来的某个时刻(如异步操作完成时)被自动调用。
  4. 异常处理:提供了一种处理异步任务中抛出的异常的方式,例如使用 exceptionally 方法来指定异常发生时的处理逻辑。
  5. 可定制的执行器:可以通过指定 Executor 来控制异步任务的执行线程,提供了更好的控制和资源管理能力。
  6. 支持同步和异步方法CompletableFuture 提供了同步和异步两种方法,例如 thenApply 和 thenApplyAsync,允许开发者根据需要选择使用同步或异步方式处理结果。

2、CompletableFuture 功能范围

2.1. 异步执行

CompletableFuture 提供了 supplyAsyncrunAsync 方法,允许异步执行任务。这些方法会将任务提交给 ForkJoinPool(默认的公共线程池)或其他指定的 Executor 来执行。

2.2. 组合异步任务

CompletableFuture 支持通过 thenApplythenAcceptthenRun 等方法将异步任务的结果传递给后续的任务,实现任务之间的数据流和控制流的组合。

2.3. 错误处理

通过 exceptionallyhandle 方法,CompletableFuture 允许开发者定义异常处理逻辑,以优雅地处理异步任务中发生的异常。

2.4. 组合多个异步任务

CompletableFuture 提供了 thenCombinethenAcceptBoththenCompose 等方法,允许将多个异步任务的结果合并处理。

2.5. 任意一个或全部完成

CompletableFuture 提供了 anyOfallOf 方法,用于处理多个异步任务中的任何一个完成或全部完成的场景。

2.6. 超时和取消

CompletableFuture 支持通过 orTimeout 方法设置超时,以及通过 cancel 方法取消异步任务。

2.7. 结果获取

CompletableFuture 提供了 getjoin 方法,用于在需要时阻塞当前线程以等待异步任务的结果。

2.8. 非阻塞结果处理

CompletableFuture 的设计允许开发者以非阻塞的方式处理异步任务的结果,这是通过提供回调函数来实现的,这些回调函数会在异步任务完成时被调用。

3、CompletableFuture 设计原则

  • 非阻塞性CompletableFuture 的设计避免了在等待异步任务结果时阻塞主线程。
  • 可组合性CompletableFuture 允许通过链式调用来组合多个异步任务,使得复杂的异步逻辑可以被分解为一系列简单的步骤。
  • 灵活性CompletableFuture 提供了多种方法来处理异步任务的不同方面,包括执行、组合、异常处理等。
  • 健壮性CompletableFuture 的设计考虑了错误处理和异常情况,提供了丰富的API来处理这些情况。

4、CompletableFuture 方法定义

4.1 常规方法相关执行流程图

4.2. 创建和启动异步计算

  • CompletableFuture<Void> runAsync(Runnable runnable): 在默认的 ForkJoinPool 中异步执行一个任务。
  • CompletableFuture<Void> runAsync(Runnable runnable, Executor executor): 在指定的 Executor 中异步执行一个任务。
  • CompletableFuture supplyAsync(Supplier<U> supplier): 在默认的 ForkJoinPool 中异步执行一个供应者。
  • CompletableFuture supplyAsync(Supplier<U> supplier, Executor executor): 在指定的 Executor 中异步执行一个供应者。

4.2. 完成时的回调

  • CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action): 当任务完成时(无论成功或失败)执行一个回调。
  • CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action): 当任务完成时(无论成功或失败)异步执行一个回调。
  • CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor): 在指定的 Executor 中异步执行一个回调。

4.3. 正常完成时的回调

  • CompletableFuture<T> thenAccept(Consumer<? super T> action): 当任务正常完成时执行一个回调。
  • CompletableFuture<T> thenAcceptAsync(Consumer<? super T> action): 当任务正常完成时异步执行一个回调。
  • CompletableFuture<T> thenAcceptAsync(Consumer<? super T> action, Executor executor): 在指定的 Executor 中异步执行一个回调。

4.4. 正常完成时的转换

  • CompletableFuture thenApply(Function<? super T, ? extends U> fn): 当任务正常完成时,将一个函数应用于结果。
  • CompletableFuture thenApplyAsync(Function<? super T, ? extends U> fn): 当任务正常完成时,异步应用一个函数于结果。
  • CompletableFuture thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor): 在指定的 Executor 中异步应用一个函数于结果。

4.5. 组合 CompletableFuture

  • CompletableFuture<T> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action): 当两个任务都完成时,消费它们的结果。
  • CompletableFuture<T> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action): 异步消费两个任务的结果。
  • CompletableFuture<T> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn): 当两个任务都完成时,组合它们的结果。
  • CompletableFuture<T> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn): 当任务完成时,以它的结果为输入启动另一个 CompletableFuture
  • CompletableFuture<T> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn): 异步启动另一个 CompletableFuture

4.6. 异常处理

  • CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn): 当任务异常完成时,应用一个函数来计算结果。

4.7. 完成时的回调(无论成功或失败)

  • CompletableFuture<T> handle(BiFunction<? super T, Throwable, ? extends U> fn): 当任务完成时(无论成功或失败)执行一个回调。

4.8. 等待其他 CompletableFuture

  • CompletableFuture<T> thenRun(Runnable action): 当任务完成时执行一个动作。
  • CompletableFuture<T> thenRunAsync(Runnable action): 当任务完成时异步执行一个动作。
  • CompletableFuture<T> thenRunAsync(Runnable action, Executor executor): 在指定的 Executor 中异步执行一个动作。

4.9. 任意一个或全部完成

  • CompletableFuture<T> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action): 当当前或另一个任务完成时消费结果。
  • CompletableFuture<T> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action): 异步消费结果。
  • CompletableFuture<T> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn): 当当前或另一个任务完成时应用函数。
  • CompletableFuture<T> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn): 异步应用函数。
  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action): 当当前和另一个任务都完成时执行动作。
  • CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action): 异步执行动作。
  • CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor): 在指定的 Executor 中异步执行动作。
  • CompletableFuture<T> thenCompose(CompletionStage<Function<? super T, ? extends U>> fn): 当任务完成时,以它的结果为输入启动另一个 CompletableFuture
  • CompletableFuture thenComposeAsync(CompletionStage<Function<? super T, ? extends U>> fn): 异步启动另一个 CompletableFuture
  • CompletableFuture<Void> allOf(CompletableFuture<?>... cfs): 当所有给定的任务都完成时完成。
  • CompletableFuture<T> anyOf(CompletableFuture<? extends T>... cfs): 当任何一个给定的任务完成时完成。

4.10. 超时和取消

  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit): 如果在给定的超时时间内没有完成,则异常完成。
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit): 如果在给定的超时时间内没有完成,则以给定的值完成。
  • boolean cancel(boolean mayInterruptIfRunning): 尝试取消执行。

4.11. 结果获取

  • T get(): 等待任务完成并获取结果。
  • T join(): 等待任务完成并获取结果。
  • T getNow(T valueIfAbsent): 获取当前结果,如果未完成则返回给定的值。
  • T getThenApplyAsync(Function<? super T, ? extends U> fn): 获取结果并异步应用函数。

5、CompletableFuture 应用案例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ComprehensiveCompletableFutureExample {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 4.2. 创建和启动异步计算
        CompletableFuture<Boolean> inventoryCheckFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("检查库存...");
            return true; // 假设库存充足
        }, executor);

        // 4.4. 正常完成时的转换
        CompletableFuture<String> orderCreationFuture = inventoryCheckFuture.thenApplyAsync(
                Boolean::booleanValue,
                ordered -> "ORDER-" + ordered,
                executor
        );

        // 4.5. 组合 CompletableFuture
        CompletableFuture<String> paymentProcessingFuture = orderCreationFuture.thenComposeAsync(
                order -> CompletableFuture.supplyAsync(() -> {
                    System.out.println("处理支付...");
                    return "PAYMENT-" + order;
                }),
                executor
        );

        // 4.6. 异常处理
        CompletableFuture<String> exceptionHandledFuture = paymentProcessingFuture.exceptionally(
                ex -> "EXCEPTION-" + ex.getMessage()
        );

        // 4.7. 完成时的回调(无论成功或失败)
        CompletableFuture<String> finalResultFuture = exceptionHandledFuture.handle((result, ex) -> {
            if (ex == null) {
                System.out.println("订单处理成功: " + result);
                return result;
            } else {
                System.out.println("订单处理失败: " + ex.getMessage());
                return "FAILURE-" + ex.getMessage();
            }
        });

        // 4.8. 等待其他 CompletableFuture
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                inventoryCheckFuture,
                orderCreationFuture,
                paymentProcessingFuture,
                exceptionHandledFuture,
                finalResultFuture
        );

        // 4.9. 任意一个或全部完成
        CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(
                inventoryCheckFuture,
                orderCreationFuture,
                paymentProcessingFuture,
                exceptionHandledFuture,
                finalResultFuture
        );

        // 4.10. 超时和取消
        CompletableFuture<String> timeoutFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "TIMEOUT";
        }).orTimeout(1, TimeUnit.SECONDS);

        // 4.11. 结果获取
        finalResultFuture.thenAccept(result -> {
            System.out.println("最终结果: " + result);
        });

        timeoutFuture.exceptionally(ex -> {
            System.out.println("超时异常: " + ex.getMessage());
            return null;
        });

        // 等待所有任务完成
        allFutures.join();

        // 关闭线程池
        executor.shutdown();
    }
}

代码解释:

  1. 创建和启动异步计算:使用supplyAsync检查库存。
  2. 正常完成时的转换:使用thenApplyAsync创建订单。
  3. 组合CompletableFuture:使用thenComposeAsync处理支付。
  4. 异常处理:使用exceptionally处理支付过程中的异常。
  5. 完成时的回调(无论成功或失败) :使用handle处理最终结果。
  6. 等待其他CompletableFuture:使用allOf等待所有任务完成。
  7. 任意一个或全部完成:使用anyOf等待任一任务完成。
  8. 超时和取消:使用orTimeout设置超时。
  9. 结果获取:使用thenAccept打印最终结果。
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。