Java并发编程 Future和CompletableFuture
 
  
 
1 Future
从 Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
1.1 Callable 与 Runnable
1.1.1 Runnable
package java.lang;
@FunctionalInterface
public interface Runnable {
    public abstract void run();
    
}
1.1.2 Callable
package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
    
}
1.1.3 Callable 与 Runnable 的差异
 
  
   
   | Callable<V> | Runnable | 
 
  
  
   
   | call() | run() | 
 
   
   | call() 方法有返回值 | run() 方法无返回值 | 
 
   
   | call() 方法可抛出异常 | run( )方法不能抛出异常 | 
 
  
1.2 Future + Callable
1.2.1 Future
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。或者说,Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
 
  
   
   | Future 接口 | 解释 | 
 
  
  
   
   | cancel(boolean) | cancel(boolean) 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。 | 
 
   
   | isCancelled() | isCancelled()方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。 | 
 
   
   | isDone() | isDone方法表示任务是否已经完成,若任务完成,则返回true。 | 
 
   
   | get() | get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。 | 
 
   
   | get(long, TimeUnit) | get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。(推荐使用这个获取任务结果) | 
 
  
package java.util.concurrent;
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
1.2.2 FutureTask
1 FutureTask 类实现了RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V>
 
  
   
   | FutureTask 类 | 解释 | 
 
  
  
   
   | FutureTask(Callable<V>) | 传入一个 Callable 在运行完的时候返回结果。 | 
 
   
   | FutureTask(Runnable, V) | 传入一个 Runnable 和一个结果,在运行完成后返回该结果。 | 
 
   
   | isCancelled() | 任务是否被取消 | 
 
   
   | isDone() | 任务是否完成 | 
 
   
   | cancel(boolean) | 停止任务 | 
 
   
   | get() | 获取结果 | 
 
   
   | get() | get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。 | 
 
   
   | get(long, TimeUnit) | get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。(推荐使用这个获取任务结果) | 
 
  
2 RunnableFuture 继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
public interface RunnableFuture<V> extends Runnable, Future<V> {
1.2.3 Future + Callable 并发
1 作为 Runnable 执行
package com.xu.thread;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.FutureTask;
/**
 * All rights Reserved, Designed By Hyacinth
 *
 * @Title: Main.java
 * @Package com.xu.thread
 * @Description:
 * @author: hyacinth
 * @date: 2021年1月26日 下午9:45:55   
 * @version V1.0
 * @Copyright:
 */
public class Main {
    public static void main(String[] args) throws Exception {
        FutureTask<List<Integer>> task = new FutureTask<>(() -> {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(i);
                Thread.sleep(500);
            }
            return list;
        });
        new Thread(task).start();
        task.get(20, TimeUnit.SECONDS).stream().forEach(System.out::println);
    }
}
2 作为 Callable 执行
1 Executors
 
  
   
   | Executors 创建线程池 |  |  | 
 
  
  
   
   | Executors.newFixedThreadPool | 固定大小的线程池。 | 创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。 | 
 
   
   | Executors.newSingleThreadExecutor() | 单一后台线程。 | 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。 | 
 
   
   | Executors.newScheduledThreadPool() | 定时线程池。 | 创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。 | 
 
   
   | Executors.newCachedThreadPool() | 无界线程池,自动线程回收。 | 创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。 | 
 
  
1 Executors.newFixedThreadPool()
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
 
  
   
   | newFixedThreadPool() | 
 
  
  
   
   | corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池。 | 
 
   
   | keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程。 | 
 
   
   | workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。 | 
 
   
   | FixedThreadPool的任务执行是无序的。 | 
 
  
2 Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
3 Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
 
  
   
   | newCachedThreadPool() | 
 
  
  
   
   | corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制。 | 
 
   
   | keepAliveTime = 60s,线程空闲60s后自动结束。 | 
 
   
   | workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue。 | 
 
  
4 Executors.newScheduledThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, 
															  ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
package com.xu.thread;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
 * All rights Reserved, Designed By Hyacinth
 *
 * @Title: Main.java
 * @Package com.xu.thread
 * @Description:
 * @author: hyacinth
 * @date: 2021年1月26日 下午9:45:55
 * @version V1.0
 * @Copyright:
 */
public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<List<Integer>> task = new FutureTask<>(() -> {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(i);
                Thread.sleep(500);
            }
            return list;
        });
        executor.submit(task);
        task.get(20, TimeUnit.SECONDS).stream().forEach(System.out::println);
    }
}
2 ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,// 核心线程池大小
                          int maximumPoolSize,// 最大线程池大小 
                          long keepAliveTime,// 线程最大空闲时间
                          TimeUnit unit,// 时间单位
                          BlockingQueue<Runnable> workQueue,// 线程等待队列
                          ThreadFactory threadFactory,// 线程创建工厂
                          RejectedExecutionHandler handler)// 拒绝策略
 
  
   
   | ThreadPoolExecutor 参数名称 | 类型 | 含义 | 
 
  
  
   
   | corePoolSize | int | 核心线程池大小 | 
 
   
   | maximumPoolSize | int | 最大线程池大小 | 
 
   
   | keepAliveTime | long | 线程最大空闲时间 | 
 
   
   | unit | TimeUnit | 时间单位 | 
 
   
   | workQueue | BlockingQueue<Runnable> | 线程等待队列 | 
 
   
   | threadFactory | ThreadFactory | 线程创建工厂 | 
 
   
   | handler | RejectedExecutionHandler | 拒绝策略 | 
 
  
package com.xu.thread;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * All rights Reserved, Designed By Hyacinth
 *
 * @version V1.0
 * @Title: Main.java
 * @Package com.xu.thread
 * @Description:
 * @author: hyacinth
 * @date: 2021年1月26日 下午9:45:55
 * @Copyright:
 */
public class Main {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(15,// 核心线程池大小
                20,// 最大线程池大小
                10,// 线程最大空闲时间
                TimeUnit.MILLISECONDS,// 时间单位
                new ArrayBlockingQueue<Runnable>(5),// 线程等待队列
                Executors.defaultThreadFactory(),// 线程创建工厂
                new ThreadPoolExecutor.AbortPolicy());// 拒绝策略
        FutureTask<List<Integer>> task = new FutureTask<>(() -> {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(i);
                Thread.sleep(500);
            }
            return list;
        });
        executor.submit(task);
        task.get(20, TimeUnit.SECONDS).stream().forEach(System.out::println);
    }
}
1.3 Future 的局限性
 
  
   
   | Future 的局限性 | 
 
  
  
   
   | 1 Future很难直接表述多个Future 结果之间的依赖性,如将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)。 | 
 
   
   | 2 等待 Future 集合中的所有任务都完成。 | 
 
   
   | 3 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。 | 
 
  
2 CompletableFuture
 
  
   
   | CompletableFuture | 
 
  
  
   
   | Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行。 | 
 
   
   | 以 Runnable 为入参的无返回结果(不需要反回结果的可以使用),以Supplier<U>为入参的有返回结果。 | 
 
  
2.1 创建 CompletableFuture 对象
CompletableFuture 提供了四个静态方法用来创建 CompletableFuture 对象:
 
  
   
   | 方法 | 返回结果 | 线程池 | 参数 | 
 
  
  
   
   | public static CompletableFuture<Void> runAsync(Runnable runnable) | 无返回结果 | 采用内部forkjoin线程池 | Runnable 接口。 | 
 
   
   | public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) | 无返回结果 | 自定义线程池 | Runnable 接口。 | 
 
   
   | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) | 有返回结果 | 采用内部forkjoin线程池 | Supplier<U> 函数式接口。 | 
 
   
   | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) | 有返回结果 | 自定义线程池 | Supplier<U> 函数式接口。 | 
 
  
2.1.1 CompletableFuture.runAsync
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("a");
        }, executor);
    }
}
2.1.2 CompletableFuture.supplyAsync
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "a";
        }, executor);
    }
}
 
  
   
   | 函数式接口名称 | 作用 | 主要方法 | 解释 | 
 
  
  
   
   | Function<T, R> | 方法 | R apply(T t) | 接受一个T类型参数,返回R类型 | 
 
   
   | Consumer<T> | 消费者 | void accept(T t) | 接受一个T类型参数,没有返回值。 | 
 
   
   | Supplier<T> | 生产者 | T get() | 不接受参数,但是提供一个返回值。 | 
 
   
   | Predicate<T> | 判断 | boolean test(T t) | 接受一个T类型参数,返回一个boolean类型值。 | 
 
  
2.2 获取 CompletableFuture 结果
2.2.1 自动获取结果
 
  
   
   | 获取结果方法 | 解释 | 
 
  
  
   
   | public T get() | 返回计算的结果,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。 | 
 
   
   | public T get(long timeout, TimeUnit unit) | 返回计算的结果,设置超时时间,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理。 | 
 
   
   | public T getNow(T valueIfAbsent) | 如果已完成则返回结果或者抛出异常,否则返回给定的valueIfAbsent的值。 | 
 
   
   | public T join() | 返回计算的结果或者抛出一个unchecked异常,CancellationException,CompletionException 无需要用户手动处理。 | 
 
  
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class UnitTest {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(15, // 核心线程池大小
            20, // 最大线程池大小
            10, // 线程最大空闲时间
            TimeUnit.MILLISECONDS, // 线程最大空闲时间单位
            new ArrayBlockingQueue<>(100), // 线程等待队列
            Executors.defaultThreadFactory(), // 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
    @Test
    public void testCompletableFuture() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }, executor).thenApply(x -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x + "b";
        }).thenCompose(e -> CompletableFuture.supplyAsync(e::toUpperCase));
        System.out.println(future.get());
        System.out.println(future.join());
        System.out.println(future.getNow("C"));
        System.out.println(future.get(1, TimeUnit.SECONDS));
    }
}
AB
AB
AB
AB
2.2.2 主动获取结果
 
  
   
   | 获取结果方法 | 解释 | 
 
  
  
   
   | public boolean complete(T value) | 当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且get()获取当前设置的value。 | 
 
   
   | public boolean completeExceptionally(Throwable ex) | 当调用CompletableFuture.get() 被阻塞的时候,那么这个方法就是结束阻塞,并且抛出异常。 | 
 
  
future.complete("直接使用这个结果")
future.completeExceptionally(new Throwable("运行超时"))
2.3 CompletableFuture 异步回调
2.3.1 thenAccept…
 
  
   
   | 异步回调 | 参数 | 返回值 | 线程池 | 功能 | 
 
  
  
   
   | public CompletionStage<Void> thenAccept(Consumer<? super T> action) | 上一个任务的结果 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 | 
 
   
   | public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) | 上一个任务的结果 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 | 
 
   
   | public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) | 上一个任务的结果 | 无 | 自定义线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,无返回结果。 | 
 
  
上一个任务需要有返回值
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).thenAcceptAsync(x -> {
        System.out.println("thenRun");
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}
2.3.2 thenRun…
 
  
   
   | 异步回调 | 参数 | 返回值 | 线程池 | 功能 | 
 
  
  
   
   | public CompletionStage<Void> thenRun(Runnable action) | 无 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 | 
 
   
   | public CompletionStage<Void> thenRunAsync(Runnable action) | 无 | 无 | 采用内部forkjoin线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 | 
 
   
   | public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor) | 无 | 无 | 自定义线程池 | 上一个任务执行完成后执行,不需要参数,无返回结果。 | 
 
  
上一个任务有无返回值均可
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).thenRun(() -> {
        System.out.println("thenRun");
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}
2.3.3 thenApply…
 
  
   
   | 异步回调 | 参数 | 返回值 | 线程池 | 功能 | 
 
  
  
   
   | public <U> CompletableFuture<U> henApply(Function<? super T,? extends U> fn) | 上一个任务的结果 | 有 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 | 
 
   
   | public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) | 上一个任务的结果 | 有 | 采用内部forkjoin线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 | 
 
   
   | public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 上一个任务的结果 | 有 | 自定义线程池 | 上一个任务执行完成执行,上一个任务的结果作为参数,有返回结果。 | 
 
  
上一个任务需要有返回值
 @Test
 public void testCompletableFuture() throws Exception {
     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "a";
     }).thenApplyAsync(x -> {
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return x + "c";
     }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
     System.out.println(future.join());
 }
2.3.4 handle
 
  
   
   | 异步回调 | 参数 | 返回值 | 线程池 | 功能 | 
 
  
  
   
   | public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) | CompletableFuture 全部完成后的结果 | 有 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
   
   | public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) | CompletableFuture 全部完成后的结果 | 有 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
   
   | public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) | CompletableFuture 全部完成后的结果 | 有 | 自定义线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
  
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).handleAsync((a, b) -> {
        System.out.println(a + "handle" + b);
        return "K";
    });
    System.out.println(future.join());
}
2.3.5 whenComplete
 
  
   
   | 异步回调 | 参数 | 返回值 | 线程池 | 功能 | 
 
  
  
   
   | public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action) | CompletableFuture 全部完成后的结果 | 无 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
   
   | public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | CompletableFuture 全部完成后的结果 | 无 | 采用内部forkjoin线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
   
   | public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | CompletableFuture 全部完成后的结果 | 无 | 自定义线程池 | CompletableFuture 全部完成或者抛出异常后,处理结果。 | 
 
  
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).whenComplete((a, b) -> {
        System.out.println(a + "handle" + b);
    });
    System.out.println(future.join());
}
2.3.6 exceptionally
exceptionally 方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中。
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}
2.4 CompletableFuture 组合处理
2.4.1 thenCompose
thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,空则返回null,否则返回新的CompletableFuture实例。
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    }).exceptionally((thread) -> {
        System.out.println(thread.getMessage());
        return "ERROR";
    }).thenApplyAsync(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return x + "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    System.out.println(future.join());
}
2.4.2 thenCombine / thenAcceptBoth / runAfterBoth
 
  
   
   | 组合处理 | 参数 | 返回值 | 前置条件 | 功能 | 
 
  
  
   
   | public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 前两个的 CompletableFuture 的结果 | 有返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。 | 
 
   
   | public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 前两个的 CompletableFuture 的结果 | 无返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的结果。 | 
 
   
   | public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) | 无参数 | 无返回值 | 前两个的 CompletableFuture 都完成后 | 两个CompletableFuture 组合起来,处理前两个 CompletableFuture 完成后的后修操作。 | 
 
  
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "b";
    });
    // thenCombine
    CompletableFuture<String> future3 = future1.thenCombine(future2, (a, b) -> {
        return a + b + "c";
    });
    // thenAcceptBoth
    CompletableFuture<Void> future4 = future1.thenAcceptBoth(future2, (a, b) -> {
        System.out.println(a + b + "c");
    });
    // runAfterBoth
    CompletableFuture<Void> future5 = future1.runAfterBoth(future2, () -> {
        System.out.println("c");
    });
}
2.4.3 applyToEither / acceptEither / runAfterEither
 
  
   
   | 组合处理 | 参数 | 返回值 | 前置条件 | 功能 | 
 
  
  
   
   | public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) | 前两个的 CompletableFuture 中的一个完成的结果 | 有返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。 | 
 
   
   | public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) | 前两个的 CompletableFuture 中的一个完成的结果 | 无返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的结果。 | 
 
   
   | public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) | 无 | 无返回值 | 前两个的 CompletableFuture 有一个完成后 | 两个CompletableFuture 组合起来,处理前两个的 CompletableFuture 中的一个完成后的逻辑。 | 
 
  
2.4.4 allOf / anyOf
 
  
   
   | 组合处理 | 参数 | 返回值 | 前置条件 | 能获取完成结果 | 功能 | 
 
  
  
   
   | public static CompletableFuture<Void> allOf(CompletableFuture<?>… cfs) | CompletableFuture 任务数组 | 无 | CompletableFuture 任务数组 全部完成或者异常 | 不能 | 处理 CompletableFuture 任务数组 全部完成或者异常 | 
 
   
   | public static CompletableFuture<Object> anyOf(CompletableFuture<?>… cfs) | CompletableFuture 任务数组 | 无 | CompletableFuture 任务数组 任一完成或者异常 | 能 | 处理 CompletableFuture 任务数组 全部完成或者异常 | 
 
  
@Test
public void testCompletableFuture() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "a";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "b";
    });
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "c";
    }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
    // allOf
    CompletableFuture future4 = CompletableFuture.allOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
        if (null != b) {
            System.out.println("出现异常");
        } else {
            System.out.println("全部 CompletableFuture 都完成");
        }
    });
    // anyOf
    CompletableFuture future5 = CompletableFuture.anyOf(future1, future2, future3).whenCompleteAsync((a, b) -> {
        if (null != b) {
            System.out.println("出现异常");
        } else {
            System.out.println("最先完成的 CompletableFuture 任务的结果为 " + a);
        }
    });
}
3 CompletableFuture 多任务
package com.xu.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TaskTest {
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10,// 核心线程池大小
            20,// 最大线程池大小
            10,// 线程最大空闲时间
            TimeUnit.MILLISECONDS,// 时间单位
            new ArrayBlockingQueue<>(1000),// 线程等待队列
            Executors.defaultThreadFactory(),// 线程创建工厂
            new ThreadPoolExecutor.AbortPolicy());// 拒绝策略
    private AtomicInteger atomic = new AtomicInteger();
    public static void main(String[] args) throws Exception {
        TaskTest task = new TaskTest();
        task.run().get().forEach(System.out::println);
        executor.shutdown();
    }
    public CompletableFuture<List<String>> run() {
        List<CompletableFuture<List<String>>> future = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            future.add(task());
        }
        return CompletableFuture.supplyAsync(() -> {
            List<String> list = new ArrayList<>();
            future.stream().parallel().forEach(task1 -> {
                list.addAll(task1.join());
            });
            return list;
        });
    }
    public CompletableFuture<List<String>> task() {
        return CompletableFuture.supplyAsync(() -> {
            // TODO:任务
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new ArrayList<String>() {{
                add(atomic.getAndIncrement() + "->");
            }};
        }, executor);
    }
}
"C:\Program Files\Java\jdk-11.0.8\bin\java.exe" "-javaagent:D:\IDE\IntelliJ IDEA 2020.3.1\lib\idea_rt.jar=12191:D:\IDE\IntelliJ IDEA 2020.3.1\bin" -Dfile.encoding=UTF-8 -classpath E:\SourceCode\Idea-2020.2\OpenCV\out\production\OpenCV com.xu.task.CompletableFutureTest
1->
0->
7->
2->
9->
6->
3->
8->
Process finished with exit code 0
4 Future 多任务
package com.xu.task;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
public class FutureTest {
    ExecutorService executor = Executors.newCachedThreadPool();
    AtomicInteger atomic = new AtomicInteger();
    public static void main(String[] args) {
        FutureTest test = new FutureTest();
        long t1 = System.currentTimeMillis();
        test.task().stream().forEach(e -> {
            try {
                System.out.println(e.get());
            } catch (Exception e1) {
            }
        });
        long t2 = System.currentTimeMillis();
        System.out.println(t2 - t1);
        test.executor.shutdown();
    }
    public List<Future<String>> task() {
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            futures.add(executor.submit(() -> {
                Thread.sleep(1000);
                return atomic.getAndIncrement() + "";
            }));
        }
        return futures;
    }
}
"C:\Program Files\Java\jdk-11.0.8\bin\java.exe" "-javaagent:D:\IDE\IntelliJ IDEA 2020.3.1\lib\idea_rt.jar=12239:D:\IDE\IntelliJ IDEA 2020.3.1\bin" -Dfile.encoding=UTF-8 -classpath E:\SourceCode\Idea-2020.2\OpenCV\out\production\OpenCV com.xu.task.FutureTest
0
2
1
6
9
8
7
5
4
3
1030
Process finished with exit code 0
         
        
评论(0)