Java的所有线程知识精华全在CompletableFuture了

香菜聊游戏 发表于 2022/06/26 21:58:28 2022/06/26
【摘要】 今天在读项目代码的过程中发现了项目中有CompletableFuture的使用,虽然很早就知道这个类,也会使用但是从来没有探究代码的实现逻辑,今天凑着一个机会从里到外扒一扒这个类,希望能讲明白。1、Thread,Runnable,Callable1.1 线程的概念先讲一讲线程,我想刚入门的同学都知道线程是什么,线程是为了提升cpu利用效率,防止阻塞的执行单位,举个例子,比如你正在做饭,发现家...

今天在读项目代码的过程中发现了项目中有CompletableFuture的使用,虽然很早就知道这个类,也会使用但是从来没有探究代码的实现逻辑,今天凑着一个机会从里到外扒一扒这个类,希望能讲明白。

1、Thread,Runnable,Callable

1.1 线程的概念

先讲一讲线程,我想刚入门的同学都知道线程是什么,线程是为了提升cpu利用效率,防止阻塞的执行单位,举个例子,比如你正在做饭,发现家里没有酱油了,这时候你有两个选择,一个是停下手里的活,自己去打酱油,另外一个方式就是叫你儿子去打酱油,你儿子就是相当于一个新线程,帮你解决问题。打酱油这件事情就是一个任务。

1.2 线程的创建

所有的事情都可以从生活中发现蛛丝马迹,从上面的例子中我们可以看到,线程就是一个执行单位,可以具象理解为就是一个人,而要做的事情可以理解为一件任务,所以迁移到程序中我们可以理解线程。

Thread -> 人

Runnable->没有回复的任务

callable -> 有回复的任务

Runnable 和 callable 的不同就是有没有回复结果,比如在开发中发出命令不需要回复,当存储数据库的时候不关注结果,只是发出动作可以使用runnable,比如第一个例子的打酱油,是需要计算结果的,这个时候使用callable是合适的。

1.2.1 直接创建线程

通过继承Thread 创建线程,覆盖实现run方法,将任务代码写在run内,通过start 启动线程。

class MyThread extends Thread{  // 继承Thread类,作为线程的实现类
    private String name ;       // 表示线程的名称
    public MyThread(String name){
        this.name = name ;      // 通过构造方法配置name属性
    }
    public void run(){  // 覆写run()方法,作为线程任务
        for(int i=0;i<10;i++){
            System.out.println(name + "运行,i = " + i) ;
        }
    }
};
public class ThreadDemo{
    public static void main(String args[]){
        MyThread mt1 = new MyThread("香菜A ") ;    // 实例化对象
        MyThread mt2 = new MyThread("香菜B ") ;    // 实例化对象
        mt1.start() ;   // 开始执行任务吧
        mt2.start() ;   // 开始执行任务吧
    }
}

1.2.2 实现Runnable定义任务

Runnable 是一个接口,也就是一个规范,定义了任务的基本方法run,这个在有些也叫契约(垃圾概念)

这种方式是任务和线程进行分离,在启动线程的时候告诉他执行什么任务。只定义任务,随便来个线程都可以执行

class MyThread implements Runnable{ // 实现Runnable接口
    public void run(){  // 覆写run()方法
        while(true){
            System.out.println(Thread.currentThread().getName() + "在运行。") ;
        }
    }
};
public class ThreadDemo{
    public static void main(String args[]){
        MyThread mt = new MyThread() ;  // 实例化Runnable子类对象
        Thread t = new Thread(mt,"线程");     // 实例化Thread对象
        t.setDaemon(true) ; // 此线程在后台运行
        t.start() ; // 启动线程
    }
};

1.2.3 通过callable 创建可以有返回值的任务

实现 Callable 接口, 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。

使用Callable 需要有 FutureTask的支持,再次使用上面打酱油的例子,你在让你儿子打酱油的时候说,等会打酱油回来之后可以直接放在桌子的左上角,你在做饭的过程中只要发现左上角有酱油就代表任务完成了。

FutureTask 可以理解为约定的一个地方,线程执行完之后就会把结果放在FutureTask的result容器中。具体的可以参照:《多线程系列二》不理解future怎么能有future?

public class TestCallable {
   public static void main(String[] args) {
       ThreadDemo td = new ThreadDemo();
       //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
       FutureTask<Integer> result = new FutureTask<>(td);
       new Thread(result).start();
       //2.接收线程运算后的结果
       try {
           Integer sum = result.get();  //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的
           System.out.println(sum);
           System.out.println("------------------------------------");
       } catch (InterruptedException | ExecutionException e) {
           e.printStackTrace();
       }
   }

}

class ThreadDemo implements Callable<Integer> {
   @Override
   public Integer call() throws Exception {
       int sum = 0;

       for (int i = 0; i <= 100000; i++) {
           sum += i;
       }
       return sum;
   }
}

小结:在理解多线程的时候主要映射到现实生活的需求中,Thread 就是一个人,需要做的事情就是任务,任务分为有回复的任务和无回复的任务,也就分出Runnable和Callable 了。

2、function,consumer

第二个话题其实是Java8 的新特性,在之前的工作经历中,发现还有很多人停留在Java6的语法上,不愿意学习也不愿意接受新的语法特性,究其原因是没理解,不能很好的掌握,这个我也写过一篇文章,里面有详细的说明。

一篇文章掌握lambda,function下41个类 我相信读完一定能一下掌握所有的function。

简单一句话概括,所有的function功能上和定义的函数一样,只不过定义了通用的结构,相当于通用的规则,只要填写代码就可以了,而这一组函数按不同的需求可以分为

Function 有返回值的函数,使用accept函数

Consumer 必须有参数的函数,使用apply函数

Supplier 有返回值的函数,使用get函数

image.png

3、Executor,Executors,ExecutorService

image.png

3.1 为什么要使用线程池

线程池这个大家都知道,是为了提高效率,可以类比生活,如果开个店,需要几个员工,正常的操作都是雇佣员工,而不是每天使用临时工,这样用完就解雇掉,对于店主来说招人的成本太高,还需要培训,我想正常的都不会这么做,线程池也是同样的道理,避免了创建和销毁线程的开销。

3.2Executor

java线程池中的一个顶级接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,在Executor中,可以使用Executor而不用显示地创建线程:

executor.execute(new RunnableTask()); // 异步执行 3.3 ExecutorService ExecutorService:是一个比Executor使用更广泛的子类接口,提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

3.4 Executors

Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

image.png

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

可以看到提供了一些具体的线程池模型,可以根据自己的需求使用。

4、CompletableFuture

终于到我今天想说的了,上面的基本上都可以在我的博客中找到相关的专题,今天主要聊一下CompletableFuture。

4.1 CompletionStage

CompletionStage 的接口一般都返回新的CompletionStage,表示执行完一些逻辑后,生成新的CompletionStage,构成链式的阶段型的操作。

image.png

CompletionStage的接口方法可以从多种角度进行分类,可

以从函数的命名和函数进行分类

每个函数名可以拆分为三段,第一个单词表示触发时机,第二个表述stage之间的关系,最后一个表示执行的方式

从函数的参数可以看到主要是有返回值的function,消费型的Consumer以及runnable

4.2 CompletableFuture

先来个例子吧,可以运行下面的例子看下打酱油的全过程,并不影响你做菜。

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture
                //让儿子去打酱油
                .supplyAsync(()-> {
                    try {
                        System.out.println("儿子跑步去打酱油");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("酱油打好了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "酱油";
                })
                //告诉我一声
                .thenAccept(oil->{
                    System.out.println("做菜用酱油:" + oil);
                });
        System.out.println("继续做菜");
        Thread.currentThread().join();
    }

一个completetableFuture就代表了一个任务,看名字就知道和Future的关系。使用future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。

4.3 看下源码到底咋回事

4.3.1 线程池怎么回事

从上面的源码我们看到并没有我们之前知道的线程池相关的东西,也没使用线程池,到底是怎么做的呐?

打开源码瞧一瞧。

   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

Supplier 是一个有返回值的函数,在上面的例子中我们反悔了一个酱油的字符串。

在上面的return语句中我们看到使用了一个asyncPool,这是个什么东西?我们找下定义

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

从上面的代码注释中我们看到,这是一个缺省的线程池,使用ForkJoinPool.commonPool(),这下我们明白了原来是使用了缺省的线程池。这个线程池默认创建的线程数是 CPU 的核数

4.3.2 怎么串行的?

CompletionStage的作用就是为了链式编程而存在的,所以可以猜测下CompletionStage 扮演了重要的作用,看下源码吧。

   //1
   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    //2
    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

asyncSupplyStage()方法中,调用指定的线程池,并执行execute(new AsyncSupply(d,f)),这里d就是我们的“源任务”,接下来thenApply()要依赖着这个源任务进行后续逻辑操作,f是Supplier的函数式编程。

 static final class AsyncSupply<T> extends ForkJoinTask<Void>
           implements Runnable, AsynchronousCompletionTask {
       CompletableFuture<T> dep; Supplier<T> fn;
       AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
           this.dep = dep; this.fn = fn;
       }
​
       public final Void getRawResult() { return null; }
       public final void setRawResult(Void v) {}
       public final boolean exec() { run(); return true; }
​
       public void run() {
           CompletableFuture<T> d; Supplier<T> f;
           if ((d = dep) != null && (f = fn) != null) {
               dep = null; fn = null;
               if (d.result == null) {
                   try {
                       d.completeValue(f.get());
                   } catch (Throwable ex) {
                       d.completeThrowable(ex);
                   }
               }
               d.postComplete();
           }
       }
   }

在run()方法里。在run()方法里,先判断d.result == null,判断该任务是否已经完成,防止并发情况下其他线程完成此任务了。f.get()就是调用的Supplier的函数式编程。

主线程会在asyncSupplyStage()方法中返回d,就是我们的“依赖任务”,而这个任务此时还处在阻塞中。接下来main线程会继续执行CompletableFuture的thenAccept(Comsumer<? super T> action)方法,然后调用CompletableFuture的uniAcceptStage()方法。

CompletableFuture中有“源任务”和“依赖任务”,“源任务”的完成能够触发“依赖任务”的执行,这里的完成可以是返回正常结果或者是异常。

总结

CompletableFuture 是一个自带缺省线程池的,并且支持链式编程的,免去线程之间关系的类,在多线程编程中可以减少代码量,减少线程的调用,推荐

码字不易,求三连,求支持,为爱发电



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。