jdk 新特性 CompletableFuture 并行框架
一、执行异步任务supplyAsync\runAsync
CompletableFuture
提供了四个静态方法来创建一个异步操作。
//不支持返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//可以支持返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
没有指定Executor
的方法会使用ForkJoinPool.commonPool()
作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- 创建线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(5);
//最大线程数
executor.setMaxPoolSize(10);
//线程保持时间 秒
executor.setKeepAliveSeconds(3);
//队列的存储个数
executor.setQueueCapacity(10);
//拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 基本使用
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("哈哈哈");
},executor);
- 1
- 2
- 3
CompletableFuture<Long> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("哈哈哈");
return System.currentTimeMillis();
},executor);
long time = future1.get();
- 1
- 2
- 3
- 4
- 5
- 6
二、回调方法whenComplete\whenCompleteAsync\exceptionally
当CompletableFuture
的计算结果完成,或者抛出异常的时候,可以执行特定的Action
。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
可以看到Action
的类型是BiConsumer<? super T,? super Throwable>
它可以处理正常的计算结果,或者异常情况。
whenComplete
和 whenCompleteAsync
的区别:
whenComplete
:是执行当前任务的线程执行继续执行 whenComplete 的任务。whenCompleteAsync
:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
CompletableFuture<Void> infoFuture = CompletableFuture
.runAsync(
() -> {
System.out.println("哈哈");
},executor)
.whenComplete((v,e)->{
System.out.println("哈哈");
}).exceptionally(e->{
System.out.println("执行失败!" + e.toString());
return null;
});
CompletableFuture<String> infoFuture = CompletableFuture
.supplyAsync(
() -> {
return "哈哈";
},executor)
.whenCompleteAsync((result,e)->{
System.out.println(result);
}).exceptionally(e->{
System.out.println("执行失败!" + e.toString());
return "";
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
三、线程依赖thenApply
当一个线程依赖另一个线程时,可以使用 thenApply
方法来把这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- 1
- 2
- 3
- 4
- 5
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
CompletableFuture<Integer> future8 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
return 1;
},executor).thenApplyAsync(s->{
System.out.println(Thread.currentThread().getName());
return s+1;
},executor).thenApply(s->{
System.out.println(Thread.currentThread().getName());
return s+1;
});
System.out.println(future8.get());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
下面的任务依赖上一个任务的结果。
四、执行完成对结果处理handle
handle
是执行任务完成时对结果的处理。
handle
方法和 thenApply
方法处理方式基本一样。不同的是 handle
是在任务完成后再执行,还可以处理异常的任务。thenApply
只可以执行正常的任务,任务出现异常则不执行 thenApply
方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
return 1/0;
},executor).handleAsync((i,e)->{
if (e != null){
return 0;
}
return i;
},executor);
System.out.println(future1.get());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
五、后续操作 thenAccept
接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
return 1;
}).thenAcceptAsync(i->{
System.out.println(i+"");
},executor);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。
六、后续操作 thenRun
跟 thenAccept
方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept
。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
- 1
- 2
- 3
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
return 1;
}).thenRunAsync(()->{
System.out.println("后续");
},executor);
System.out.println(future.get());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
该方法同 thenAccept
方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun
方法。只是处理玩任务后,执行 thenAccept
的后续操作。
七、合并任务 有返回 thenCombine
thenCombine
会把 两个 CompletionStage
的任务都执行完成后,把两个任务的结果一块交给 thenCombine
来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "hello";
},executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
return "hello";
},executor);
CompletableFuture<String> result = future1.thenCombineAsync(future2, (s1,s2)->{
return s1+s2;
},executor);
System.out.println(result.get());
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
return "hello";
},executor).thenCombineAsync(CompletableFuture.supplyAsync(()->{
return "44";
},executor),(s1,s2)->{
return s1+s2;
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
八、合并任务 没有返回thenAcceptBoth
当两个CompletionStage
都执行完成后,把结果一块交给thenAcceptBoth
来进行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(()->{
return "hello";
},executor).thenAcceptBothAsync(CompletableFuture.supplyAsync(()->{
return "44";
},executor),(s1,s2)->{
System.out.println(s1+s2);
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
九、根据返回结果的速度拿到返回结果applyToEither
两个CompletionStage
,谁执行返回的结果快,我就用那个CompletionStage
的结果进行下一步的转化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
},executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello1";
},executor);
future2.applyToEither(future3,s->{
return s;
});
System.out.println(future2.get());
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "111";
}, executor).applyToEitherAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
}, executor), s -> {
return s;
});
System.out.println(future2.get());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
十、根据返回结果的速度快慢的结果 执行后续acceptEither
两个CompletionStage
,谁执行返回的结果快,我就用那个CompletionStage
的结果进行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "111";
}, executor).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
}, executor), s -> {
System.out.println(s);
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
十一、任何一个完成了都会执行下一步runAfterEither
两个CompletionStage
,任何一个完成了都会执行下一步的操作(Runnable
)
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
- 1
- 2
- 3
- 4
- 5
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "111";
}, executor).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
}, executor), () -> {
System.out.println("上面有一个已经完成了。");
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
十二、线程依赖thenCompose
thenApply()
和thenCompose()
的区别:
-
thenapply()
是返回的是非CompletableFuture
类型:
它的功能相当于将CompletableFuture<T>
转换成CompletableFuture<U>
-
thenCompose()
用来连接两个CompletableFuture
,返回值是新的CompletableFuture
总结:thenApply()
转换的是泛型中的类型,是同一个CompletableFuture
;
thenCompose()
用来连接两个CompletableFuture
,是生成一个新的CompletableFuture
。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
- 1
- 2
- 3
- 4
- 5
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "111";
}, executor).thenComposeAsync(s -> {
return CompletableFuture.supplyAsync(()->{
return "qq"+s;
},executor);
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
十三、等待join
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "a";
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "b";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "c";
});
CompletableFuture<Void> future3 = CompletableFuture.allOf(future, future1, future2);
future3.join();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_43692950/article/details/115736507
- 点赞
- 收藏
- 关注作者
评论(0)