Java的所有线程知识精华全在CompletableFuture了
今天在读项目代码的过程中发现了项目中有CompletableFuture的使用,虽然很早就知道这个类,也会使用但是从来没有探究代码的实现逻辑,今天凑着一个机会从里到外扒一扒这个类,希望能讲明白。
1、Thread,Runnable,Callable
1.1 线程的概念
先讲一讲线程,我想刚入门的同学都知道线程是什么,线程是为了提升cpu利用效率,防止阻塞的执行单位,举个例子,比如你正在做饭,发现家里没有酱油了,这时候你有两个选择,一个是停下手里的活,自己去打酱油,另外一个方式就是叫你儿子去打酱油,你儿子就是相当于一个新线程,帮你解决问题。打酱油这件事情就是一个任务。
1.2 线程的创建
所有的事情都可以从生活中发现蛛丝马迹,从上面的例子中我们可以看到,线程就是一个执行单位,可以具象理解为就是一个人,而要做的事情可以理解为一件任务,所以迁移到程序中我们可以理解线程。
Thread -> 人
Runnable->没有回复的任务
callable -> 有回复的任务
Runnable 和 callable 的不同就是有没有回复结果,比如在开发中发出命令不需要回复,当存储数据库的时候不关注结果,只是发出动作可以使用runnable,比如第一个例子的打酱油,是需要计算结果的,这个时候使用callable是合适的。
1.2.1 直接创建线程
通过继承Thread 创建线程,覆盖实现run方法,将任务代码写在run内,通过start 启动线程。
class MyThread extends Thread{ // 继承Thread类,作为线程的实现类
private String name ; // 表示线程的名称
public MyThread(String name){
this.name = name ; // 通过构造方法配置name属性
}
public void run(){ // 覆写run()方法,作为线程任务
for(int i=0;i<10;i++){
System.out.println(name + "运行,i = " + i) ;
}
}
};
public class ThreadDemo{
public static void main(String args[]){
MyThread mt1 = new MyThread("香菜A ") ; // 实例化对象
MyThread mt2 = new MyThread("香菜B ") ; // 实例化对象
mt1.start() ; // 开始执行任务吧
mt2.start() ; // 开始执行任务吧
}
}
1.2.2 实现Runnable定义任务
Runnable 是一个接口,也就是一个规范,定义了任务的基本方法run,这个在有些也叫契约(垃圾概念)
这种方式是任务和线程进行分离,在启动线程的时候告诉他执行什么任务。只定义任务,随便来个线程都可以执行
class MyThread implements Runnable{ // 实现Runnable接口
public void run(){ // 覆写run()方法
while(true){
System.out.println(Thread.currentThread().getName() + "在运行。") ;
}
}
};
public class ThreadDemo{
public static void main(String args[]){
MyThread mt = new MyThread() ; // 实例化Runnable子类对象
Thread t = new Thread(mt,"线程"); // 实例化Thread对象
t.setDaemon(true) ; // 此线程在后台运行
t.start() ; // 启动线程
}
};
1.2.3 通过callable 创建可以有返回值的任务
实现 Callable 接口, 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
使用Callable 需要有 FutureTask的支持,再次使用上面打酱油的例子,你在让你儿子打酱油的时候说,等会打酱油回来之后可以直接放在桌子的左上角,你在做饭的过程中只要发现左上角有酱油就代表任务完成了。
FutureTask 可以理解为约定的一个地方,线程执行完之后就会把结果放在FutureTask的result容器中。具体的可以参照:《多线程系列二》不理解future怎么能有future?
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
//2.接收线程运算后的结果
try {
Integer sum = result.get(); //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的
System.out.println(sum);
System.out.println("------------------------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
return sum;
}
}
小结:在理解多线程的时候主要映射到现实生活的需求中,Thread 就是一个人,需要做的事情就是任务,任务分为有回复的任务和无回复的任务,也就分出Runnable和Callable 了。
2、function,consumer
第二个话题其实是Java8 的新特性,在之前的工作经历中,发现还有很多人停留在Java6的语法上,不愿意学习也不愿意接受新的语法特性,究其原因是没理解,不能很好的掌握,这个我也写过一篇文章,里面有详细的说明。
一篇文章掌握lambda,function下41个类 我相信读完一定能一下掌握所有的function。
简单一句话概括,所有的function功能上和定义的函数一样,只不过定义了通用的结构,相当于通用的规则,只要填写代码就可以了,而这一组函数按不同的需求可以分为
Function 有返回值的函数,使用accept函数
Consumer 必须有参数的函数,使用apply函数
Supplier 有返回值的函数,使用get函数
3、Executor,Executors,ExecutorService
3.1 为什么要使用线程池
线程池这个大家都知道,是为了提高效率,可以类比生活,如果开个店,需要几个员工,正常的操作都是雇佣员工,而不是每天使用临时工,这样用完就解雇掉,对于店主来说招人的成本太高,还需要培训,我想正常的都不会这么做,线程池也是同样的道理,避免了创建和销毁线程的开销。
3.2Executor
java线程池中的一个顶级接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,在Executor中,可以使用Executor而不用显示地创建线程:
executor.execute(new RunnableTask()); // 异步执行 3.3 ExecutorService ExecutorService:是一个比Executor使用更广泛的子类接口,提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
3.4 Executors
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
newScheduledThreadPool 定时执行的线程池
newCachedThreadPool 缓存使用过的线程
newFixedThreadPool 固定数量的线程池
newWorkStealingPool 将大任务分解为小任务的线程池
可以看到提供了一些具体的线程池模型,可以根据自己的需求使用。
4、CompletableFuture
终于到我今天想说的了,上面的基本上都可以在我的博客中找到相关的专题,今天主要聊一下CompletableFuture。
4.1 CompletionStage
CompletionStage 的接口一般都返回新的CompletionStage,表示执行完一些逻辑后,生成新的CompletionStage,构成链式的阶段型的操作。
CompletionStage的接口方法可以从多种角度进行分类,可
以从函数的命名和函数进行分类
每个函数名可以拆分为三段,第一个单词表示触发时机,第二个表述stage之间的关系,最后一个表示执行的方式
从函数的参数可以看到主要是有返回值的function,消费型的Consumer以及runnable
4.2 CompletableFuture
先来个例子吧,可以运行下面的例子看下打酱油的全过程,并不影响你做菜。
public static void main(String[] args) throws InterruptedException {
CompletableFuture
//让儿子去打酱油
.supplyAsync(()-> {
try {
System.out.println("儿子跑步去打酱油");
TimeUnit.SECONDS.sleep(1);
System.out.println("酱油打好了");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "酱油";
})
//告诉我一声
.thenAccept(oil->{
System.out.println("做菜用酱油:" + oil);
});
System.out.println("继续做菜");
Thread.currentThread().join();
}
一个completetableFuture就代表了一个任务,看名字就知道和Future的关系。使用future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。
4.3 看下源码到底咋回事
4.3.1 线程池怎么回事
从上面的源码我们看到并没有我们之前知道的线程池相关的东西,也没使用线程池,到底是怎么做的呐?
打开源码瞧一瞧。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
Supplier 是一个有返回值的函数,在上面的例子中我们反悔了一个酱油的字符串。
在上面的return语句中我们看到使用了一个asyncPool,这是个什么东西?我们找下定义
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
从上面的代码注释中我们看到,这是一个缺省的线程池,使用ForkJoinPool.commonPool(),这下我们明白了原来是使用了缺省的线程池。这个线程池默认创建的线程数是 CPU 的核数
4.3.2 怎么串行的?
CompletionStage的作用就是为了链式编程而存在的,所以可以猜测下CompletionStage 扮演了重要的作用,看下源码吧。
//1
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//2
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
asyncSupplyStage()方法中,调用指定的线程池,并执行execute(new AsyncSupply(d,f)),这里d就是我们的“源任务”,接下来thenApply()要依赖着这个源任务进行后续逻辑操作,f是Supplier的函数式编程。
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
在run()方法里。在run()方法里,先判断d.result == null,判断该任务是否已经完成,防止并发情况下其他线程完成此任务了。f.get()就是调用的Supplier的函数式编程。
主线程会在asyncSupplyStage()方法中返回d,就是我们的“依赖任务”,而这个任务此时还处在阻塞中。接下来main线程会继续执行CompletableFuture的thenAccept(Comsumer<? super T> action)方法,然后调用CompletableFuture的uniAcceptStage()方法。
CompletableFuture中有“源任务”和“依赖任务”,“源任务”的完成能够触发“依赖任务”的执行,这里的完成可以是返回正常结果或者是异常。
总结
CompletableFuture 是一个自带缺省线程池的,并且支持链式编程的,免去线程之间关系的类,在多线程编程中可以减少代码量,减少线程的调用,推荐
码字不易,求三连,求支持,为爱发电
- 点赞
- 收藏
- 关注作者
评论(0)