jdk 新特性 CompletableFuture 并行框架

举报
程序员-上善若水 发表于 2022/06/23 23:03:56 2022/06/23
【摘要】 一、执行异步任务supplyAsync\runAsync CompletableFuture 提供了四个静态方法来创建一个异步操作。 //不支持返回值。 public static Completab...

一、执行异步任务supplyAsync\runAsync

CompletableFuture 提供了四个静态方法来创建一个异步操作。

//不支持返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

//可以支持返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  1. 创建线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 //核心线程数
 executor.setCorePoolSize(5);
 //最大线程数
 executor.setMaxPoolSize(10);
 //线程保持时间 秒
 executor.setKeepAliveSeconds(3);
 //队列的存储个数
 executor.setQueueCapacity(10);
 //拒绝策略
 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 executor.initialize();

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 基本使用
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
    System.out.println("哈哈哈");
},executor);

  
 
  • 1
  • 2
  • 3
CompletableFuture<Long> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("哈哈哈");
    return System.currentTimeMillis();
},executor);

long time = future1.get();

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二、回调方法whenComplete\whenCompleteAsync\exceptionally

CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

whenCompletewhenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
CompletableFuture<Void> infoFuture = CompletableFuture
        .runAsync(
                () -> {
                System.out.println("哈哈");
                },executor)
        .whenComplete((v,e)->{
            System.out.println("哈哈");
        }).exceptionally(e->{
            System.out.println("执行失败!" + e.toString());
            return null;
        });


CompletableFuture<String> infoFuture = CompletableFuture
        .supplyAsync(
                () -> {
                    return "哈哈";
                },executor)
        .whenCompleteAsync((result,e)->{
            System.out.println(result);
        }).exceptionally(e->{
            System.out.println("执行失败!" + e.toString());
            return "";
        });

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

三、线程依赖thenApply

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

CompletableFuture<Integer> future8 = CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName());
    return 1;
},executor).thenApplyAsync(s->{
    System.out.println(Thread.currentThread().getName());
    return s+1;
},executor).thenApply(s->{
    System.out.println(Thread.currentThread().getName());
    return s+1;
});
System.out.println(future8.get());

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

下面的任务依赖上一个任务的结果。

四、执行完成对结果处理handle

handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
    return 1/0;
},executor).handleAsync((i,e)->{
    if (e != null){
        return 0;
    }
    return i;
},executor);
System.out.println(future1.get());

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

五、后续操作 thenAccept

接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
    return 1;
}).thenAcceptAsync(i->{
    System.out.println(i+"");
},executor);


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。

六、后续操作 thenRun

thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

  
 
  • 1
  • 2
  • 3

CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
   return 1;
}).thenRunAsync(()->{
   System.out.println("后续");
},executor);

System.out.println(future.get());


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。

七、合并任务 有返回 thenCombine

thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
    return "hello";
},executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
    return "hello";
},executor);
CompletableFuture<String> result = future1.thenCombineAsync(future2, (s1,s2)->{
    return s1+s2;
},executor);
System.out.println(result.get());


CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
    return "hello";
},executor).thenCombineAsync(CompletableFuture.supplyAsync(()->{
    return "44";
},executor),(s1,s2)->{
    return s1+s2;
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

八、合并任务 没有返回thenAcceptBoth

当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);

public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);

public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(()->{
    return "hello";
},executor).thenAcceptBothAsync(CompletableFuture.supplyAsync(()->{
    return "44";
},executor),(s1,s2)->{
    System.out.println(s1+s2);
});


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

九、根据返回结果的速度拿到返回结果applyToEither

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);

public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);

public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
    try {
        Thread.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
},executor);

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(()->{
    try {
        Thread.sleep(200);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello1";
},executor);

future2.applyToEither(future3,s->{
    return s;
});

System.out.println(future2.get());


CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
   try {
       Thread.sleep(300);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   return "111";
}, executor).applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
   try {
       Thread.sleep(500);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   return "abc";
}, executor), s -> {
   return s;
});

System.out.println(future2.get());


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

十、根据返回结果的速度快慢的结果 执行后续acceptEither

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);

public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "111";
}, executor).acceptEither(CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
}, executor), s -> {
    System.out.println(s);
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

十一、任何一个完成了都会执行下一步runAfterEither

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);

public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "111";
}, executor).runAfterEither(CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
}, executor), () -> {
    System.out.println("上面有一个已经完成了。");
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

十二、线程依赖thenCompose

thenApply()thenCompose()的区别:

  • thenapply()是返回的是非CompletableFuture类型:
    它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>

  • thenCompose()用来连接两个CompletableFuture,返回值是新的CompletableFuture

总结:thenApply()转换的是泛型中的类型,是同一个CompletableFuture
thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
   return "111";
}, executor).thenComposeAsync(s -> {
   return CompletableFuture.supplyAsync(()->{
       return "qq"+s;
   },executor);
});

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

十三、等待join

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "a";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "b";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "c";
});

CompletableFuture<Void> future3 = CompletableFuture.allOf(future, future1, future2);
future3.join();

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_43692950/article/details/115736507

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。