Java异步必备技能 | CompletableFuture:supplyAsync与runAsync
—theme: technology-style —
CompletableFuture是Java 8中引入的一个类,用于简化异步编程和并发操作。它提供了一种方便的方式来处理异步任务的结果,以及将多个异步任务组合在一起执行。CompletableFuture支持链式操作,使得异步编程更加直观和灵活。
在引入CompletableFuture之前,Java已经有了Future接口来表示异步计算的结果,但是它的功能相对有限,无法轻松实现复杂的异步操作链。CompletableFuture通过提供更丰富的方法和操作,使得异步编程变得更加便捷。
CompletableFuture实现了Future接口, CompletionStage接口,成为JDK8多任务协同场景下一个有效利器。
提交有返回值的异步任务
package com.neo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class DemoCompletableFuture {
private static Logger logger = LoggerFactory.getLogger(DemoCompletableFuture.class);
public static void main(String[] args) throws Exception {
//提交一个CompletableFuture任务
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
long start = System.currentTimeMillis();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
logger.info("@@ 打印执行耗时:" +(end - start) + " ms");
return 1;
});
logger.info("CompletableFuture.supplyAsync 开始" );
//通过get方法阻塞获取任务执行结果
logger.info("CompletableFuture.supplyAsync 执行结果: {}", task.get());
logger.info("CompletableFuture.supplyAsync 结束");
}
}
输出结果如下,可以看出CompletableFuture的get方法会阻塞主线程工作,直到得到返回值为止。
13:39:32.976 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 开始
13:39:37.985 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture - @@ 打印执行耗时:5011 ms
13:39:37.986 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 执行结果: 1
13:39:37.990 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 结束
对此我们来看看get方法是如何做到阻塞主线程并等待异步线程任务执行完成的。
从下面这段源码我们可以看到get方法的执行步骤:
/**
* Waits if necessary for this future to complete, and then
* returns its result.
*
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
reportGet函数分析
/**
* Reports result using Future.get conventions.
*/
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
这是CompletableFuture类中的一个私有静态方法reportGet,用于报告异步任务执行的结果,遵循Future.get的约定。让我们逐步分析这个方法:
参数类型
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException
这是一个泛型方法,接收一个Object类型的参数r,表示异步任务的结果。
判断结果是否为null
if (r == null)
throw new InterruptedException();
如果结果r为null,按照惯例表示任务被中断,此时抛出InterruptedException。
处理AltResult
if (r instanceof AltResult) {
// ...
}
如果结果是AltResult类型,说明异步任务执行过程中发生了异常。进入AltResult的处理逻辑。
获取异常信息并抛出相应异常
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
如果AltResult中的异常ex为null,说明异步任务被取消,返回null。
如果异常是CancellationException,抛出CancellationException。
如果异常是CompletionException,获取它的原因(cause),如果有原因就将异常替换为原因,最终抛出ExecutionException。
类型转换并返回结果
@SuppressWarnings("unchecked") T t = (T) r;
return t;
最后,将r强制类型转换为泛型类型T,然后返回。
这个方法主要负责处理异步任务执行结果中可能涉及的异常情况,并根据Future.get的约定进行适当的处理。
waitingGet函数分析
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
这是CompletableFuture
类中的一个私有方法waitingGet
,用于在异步任务完成前等待其结果。让我们逐步分析这个方法:
初始化变量
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
这里初始化了一些变量,包括一个Signaller
对象q
,一个表示是否已经将任务推入栈的标志queued
,一个用于自旋等待的计数spins
,以及用于存储异步任务结果的变量r
。
自旋等待任务完成
while ((r = result) == null) {
// 自旋等待任务完成
}
在这个循环中,不断检查result
是否为null,如果为null,说明任务还未完成,就继续等待。
自旋等待策略
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
- 如果
spins
为负值,根据当前系统的处理器数量决定是否使用自旋等待。如果有多个处理器,使用brief spin-wait。 - 如果
spins
大于0,且随机数为正,则减少spins
,继续自旋等待。
创建和推送Signaller对象
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
- 如果
q
为null,创建一个Signaller
对象。Signaller
是用于协调等待的辅助类。 - 如果
q
已创建但未推送到栈中,尝试推送到栈中。
处理中断
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
-
- 如果支持中断且
q.interruptControl
小于0,表示中断发生,清理相关状态并返回null。
- 如果支持中断且
使用ManagedBlocker进行等待
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
- 如果
q.thread
不为null,且任务未完成,使用ForkJoinPool.managedBlock
进行等待。这是一种协作式的等待方式。
处理中断和结果
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
- 清理
Signaller
对象的状态。 - 如果支持中断,根据中断控制状态设置返回值
r
为null或者中断当前线程。
完成异步任务后的处理
postComplete();
最后,调用postComplete
方法,该方法用于处理异步任务完成后的一些后续操作。
返回结果
return r;
返回异步任务的结果。
这个方法主要负责等待异步任务的完成,使用了一些自旋等待、协作式等待和中断处理的策略,确保在任务完成后能够正确返回结果。
提交无返回值的异步任务
通过runAsync提交一个无返回值的异步任务,这里我们为了实现任务执行完成再关闭主线程用了个get阻塞等待任务完成。
package com.neo;
/**
* @Author zhangt
* @create 2023/11/10
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author zhangt
* @date 2023年11月10日
*/
public class DemoCompletableFuture2 {
private static Logger logger = LoggerFactory.getLogger(DemoCompletableFuture2.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
long start = System.currentTimeMillis();
logger.info(Thread.currentThread().getName() + "开始执行时间:" + start);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + "结束总执行时间:" + (System.currentTimeMillis() - start));
});
logger.info("CompletableFuture.supplyAsync 主线程开始运行" );
//get阻塞主线程等待任务结束
logger.info("get阻塞主线程等待任务结束 :" + supplyAsync.get());
logger.info("CompletableFuture.supplyAsync 主线程运行结束");
}
}
返回结果
15:29:59.922 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9开始执行时间:1699860599920
15:29:59.922 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程开始运行
15:30:00.935 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9结束总执行时间:1015
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - get阻塞主线程等待任务结束 :null
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程运行结束
区别
CompletableFuture.supplyAsync和CompletableFuture.runAsync都是用于创建异步任务的方法,但它们在任务的类型和返回值处理上有一些区别。
CompletableFuture.supplyAsync
任务类型: 用于执行有返回值的异步任务。任务由Supplier提供,不接收任何参数,返回一个结果。
方法签名
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
示例
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 有返回值的异步任务
return "Hello, CompletableFuture!";
});
CompletableFuture.runAsync
任务类型: 用于执行没有返回值的异步任务。任务由Runnable提供,不返回任何结果。
方法签名
public static CompletableFuture<Void> runAsync(Runnable runnable)
示例
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 没有返回值的异步任务
System.out.println("Running async task");
});
区别总结
CompletableFuture.supplyAsync用于执行有返回值的异步任务,接收一个Supplier,返回一个CompletableFuture对象,可获取异步任务的结果。
CompletableFuture.runAsync用于执行没有返回值的异步任务,接收一个Runnable,返回一个CompletableFuture<Void>对象,表示异步任务执行完毕。
这两个方法都允许通过传递Executor来指定异步任务的执行线程。例如,可以使用
CompletableFuture.supplyAsync(supplier, executor)
或
CompletableFuture.runAsync(runnable, executor)
来指定特定的线程池。
- 点赞
- 收藏
- 关注作者
评论(0)