异步任务编排神器CompletableFuture

举报
菜菜的后端私房菜 发表于 2024/08/15 09:20:23 2024/08/15
【摘要】 异步任务编排神器CompletableFuture当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的...

异步任务编排神器CompletableFuture

当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果

但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用

比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回

当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务

image.png

比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢

异步查询多个服务的数据再汇总返回,则能提高更多的性能

API

这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)

CompletableFuture提供的API大概分为几个大类:

同步与异步、串行、AND、OR、

同步与异步

API携带Async则说明是异步,并且可以设置线程池

一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
            System.out.println("task a run");
            return "a";
});

串行

串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务

串行API通常以then开头,如:thenRunAsync、thenAccpetAsync、thenApplyAsync

CompletableFuture<String> taskB = taskA.thenApply((s) -> {
    System.out.println("task b run");
    return s + "b";
})

AND

AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务

AND相关API通常以Combine、Both有关,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync

CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {
    System.out.println("task d run");
    return b + c;
})

如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)

CompletableFuture.allOf(taskF,taskE,taskD);

OR

OR指的是两个任务中其中一个完成,就可以继续执行后续任务

OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync

如果依赖多个任务的OR时使用:CompletableFuture.anyOf

异常处理

任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理

CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {
    System.out.println("task f run");
    return "a";
}).exceptionally(e -> {
    System.out.println("出现异常");
    throw new RuntimeException("error");
});

注意事项

使用CompletableFuture时需要注意,如果不了解原理容易踩坑:

比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?

带着这一系列问题,我们往下看

出了异常怎么办?

使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此必须使用API进行处理

CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常

public static void exception() {
    CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {
        System.out.println("begin");
        return null;
    });

    taskException
            .thenApply(result -> {
                int i = 1 / 0;
                return i;
            })
            .exceptionally(err -> {
                //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
                System.out.println(err);

                //java.lang.ArithmeticException: / by zero
                System.out.println(err.getCause());

                //java.lang.ArithmeticException: / by zero
                //使用工具处理异常
                System.out.println(getException(err));
                return 0;
            });
    
}

因为异常会被包装,因此处理异常时,最好使用工具类获取异常

public static Throwable getException(Throwable throwable) {
    //异常为CompletionException或ExecutionException,并且Cause不为空时解析
    if ((throwable instanceof CompletionException|| throwable instanceof ExecutionException)
            && Objects.nonNull(throwable.getCause())) {
        return throwable.getCause();
    }
    return throwable;
}

如何选择线程池?

CompletableFuture中选择线程池有三种情况:

  1. 使用方法时指定线程池
  2. 未指定线程池时,使用ForkJoin的公共线程池 ForkJoinPool.commonPool() (适合CPU任务,最大线程数量 = CPU - 1)
  3. 未指定线程池时,使用 ThreadPerTaskExecutor 每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)

当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑

并且 ThreadPerTaskExecutorForkJoinPool.commonPool() 都不适合IO任务

接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池

CompletableFuture.supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池

private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool() 还是使用 ThreadPerTaskExecutor

那么useCommonPool是如何确定的呢?我们继续往下查看

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism() 决定,当它大于1时则使用 ForkJoinPool.commonPool() 否则使用 ThreadPerTaskExecutor

ForkJoinPool.getCommonPoolParallelism() 返回字段 commonParallelism

static final int commonParallelism;

commonParallelism 用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化

ForkJoinPool.static

static {
    //其他略...
    
    //创建公共池
    common = java.security.AccessController.doPrivileged
        (new java.security.PrivilegedAction<ForkJoinPool>() {
            public ForkJoinPool run() { return makeCommonPool(); }});
    
    //计算并行粒度
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

commonParallelism 并发粒度的字段由par决定,而par = common.config & SMASK

其中SMASK为65535(十进制),其二进制为全1,因此由 common 的字段 config 决定

(在创建公共池的过程会设置config字段)

ForkJoinPool.makeCommonPool

在创建公共池的代码中主要观察变量 parallelism 它为并发粒度

如果不携带参数,默认情况下并发粒度为CPU核数-1

private static ForkJoinPool makeCommonPool() {

    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
            new CommonPoolForkJoinWorkerThreadFactory();
    //初始化并发粒度为-1
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            //如果携带启动参数则设置为对应的并发粒度
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = commonPoolForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        //默认情况下并发粒度 = CPU核数 - 1
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

在构建对象时,config字段 this.config = (parallelism & SMASK) | mode

其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度

至此我们可以得出结论:默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)

static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}

ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销

使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离

如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务

线程如何执行?

在同步与异步的API中线程如何执行?

在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor

在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行

public static void testSync() {
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
//            try {
//                Thread.sleep(5000);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
            return "ok";
        }, threadPool);


        CompletableFuture<Void> taskB = taskA.thenAccept(s -> {
            //任务A执行完(不睡时)由当前线程执行
            //任务A未执行完(睡眠时)由线程池的工作线程执行
            System.out.println(s);
            System.out.println(s);
        });

        taskB.join();
}

总结

CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排

使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常

CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池

如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)

未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池

当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行

🌠最后(一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。