【精通函数式编程】(十一) CompletableFuture、反应式编程源码解析与实战
前言📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫
🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆
🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦
本文导读
Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。
一、同步与异步
1、为什么要有异步
在Java发展的这20年,他只做了一件事不被淘汰,为了不被淘汰不断的更新jdk的版本,以便使用计算机硬件、操作系统以及新的编程概念。
Java一开始提供了 synchronized 锁、Runable,后面java5有引入了 java.util.concurrent 包,java7中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到java8中Stream流、lambda表达式的支持,这一切都是为了支持高并发。
即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用k个线程的线程池就只能并发的执行k个任务,其他任务还是回休眠或者阻塞。
这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8考虑到了,充分发挥了计算机硬件的处理能力,异步API 应运而生。
2、什么是同步?什么是异步?
同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);
异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。
下面举一个例子就说明什么是同步、什么是异步
异步编程涉及两种风格,Future风格API 和反应式风格API ,Future<Integer> fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。
二、Future异步编程
1、Future 接口
Java5中就引入了Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要for循环去累加,Future 接口使用的时候只需要封装 Callable中,再提交给ExecutorService。
2、Future 接口的使用
Future 接口的使用看下Java8之前是如何使用异步的
public static void main(String[] args) throws Exception {
List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
// 创建 ExecutorService 通过它可以向线程池提交任务
ExecutorService executorService = Executors.newCachedThreadPool();
// 异步操作的同时,可以进行其他操作
Future<BigDecimal> decimalFuture = executorService.submit(new Callable<BigDecimal>() {
@Override
public BigDecimal call() throws Exception {
return reduceAmt(orderInfos);
}
});
// Java8 写法
Future<BigDecimal> decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
System.out.println(decimalFuture.get());
}
private static BigDecimal reduceAmt(List<OrderInfo> orderInfos) {
return orderInfos.stream()
.map(OrderInfo::getOrderAmt)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用get() 方法获取操作结果,如果操作完成会立刻返回,如果操作没有完成则回阻塞线程,直到操作完成返回。
3、Future 接口的缺陷(局限性)
Future接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。
但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。
就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当Future的完成事件发生时会收到通知,使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。
三、CompletableFuture 接口详解
1、CompletableFuture 的创建
CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值,runAsync适合创建不需要返回值的计算任务
通过该supplyAsync 函数创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端,则可以通过get或join获取最终计算结果。
同事直接new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计
import java.util.concurrent.CompletableFuture;
import org.springframework.core.task.AsyncTaskExecutor;
public class CompletableFutureImpl {
@Autowired
@Qualifier("asyncExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) {
// 添加购物车
GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
/**
* 异步刷新到购物车
*/
CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor);
// 通过supplyAsync 函数创建的
CompletableFuture<Object> uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO));
// 构造函数创建
CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
return new DefaultResponseVO(code, msg, respVO);
}
/**
* 添加购物车缓存(异步刷新到购物车)
*/
public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) {
try {
// 添加购物车缓存(异步刷新到购物车)
mallCartProcess.getCartInfo(cartItemEntity);
} catch (Exception e) {
logger.error("syncAddCache error", e);
}
}
}
2、CompletableFuture.supplyAsync 源码分析
本小节讲解CompletableFuture的底层实现
上面为java 8中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。
值得注意的是,当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被set到对应的CompletableFuture的result字段中。调用方通过join或者get就能取到该CompletableFuture的result字段的值。所以,虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。这种方式与传统的Future中结果传递方式类似(计算线程set值,使用线程get值)。
上面为java 8中 AsyncSupply 的实现源码,AsyncSupply的源码很简单。首先,它实现了Runnable接口,所以被提交到线程池中后,工作线程会执行其run()方法。通过对AsyncSupply中run方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用run方法执行,并设置到CompletableFuture的结果中。其他线程中的使用方,则可以调用该CompletableFuture的join或者get方法获取其结果。
因此,我们只需要搞清楚其run()中的实现即可。在 run() 中,程序首先检查了传入的CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是CompletableFuture与Future最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用Supplier(源码中的 f 变量)的get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到CompletableFuture中。最后,调用CompletableFuture的postComplete()方法,执行连接到当前CompletableFuture上的后置任务。
3、CompletableFuture.runAsync 源码分析
通过上面的源码可以看出,runAsync也会生成一个空的CompletableFuture,并包装在AsyncRun中提交到线程池中执行。这与supplyAsync是完全一致的。由于Runnable没有返回值,这里返回的CompletableFuture的结果值是Void类型的。
AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。 设计方面和supplyAsync一致
4、CompletableFuture API 实战
thenApply、thenAccept、thenRun
thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。
thenCombine、thenCompose
thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。
whenComplete、handle
whenComplete主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果)。handle与handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)
四、反应式编程
1、什么是反应式(resctive)编程
反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。
反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核CPU的特点,大多反应式框架(RxJava等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。
2、反应式流(Flow API)
2.1、发布订阅模式
Future 和 CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。
与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP请求,并根据请求的内容返回相应的数据。
2.2、Flow API 源码解析
Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )
反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。
反应式流(Flow API)规范可以总结为4个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和Processor(处理者)
Publisher负责生成数据,并将数据发送给 Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法 subscribe(),Subscriber可以通过该方法向 Publisher发起订阅。
一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的
Subscriber的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher调用 onSubscribe() 方法时,会将Subscription对象传递给 Subscriber。通过Subscription,Subscriber可以管理其订阅情况
Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher 发送多于 Subscriber能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。
Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的onComplete() 方法来告知 Subscriber 它已经结束
反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。有我们自己实现。
2.3、Flow API 实战
FlowImpl :创建Publisher并向其订阅TempSubscriber
public class FlowImpl {
public static void main(String[] args) {
// 创建 Publisher 并向其订阅 Subscriber
getOrderAmt("20220727123").subscribe(new SubscriberImpl());
}
// Publisher是个函数式接口
private static Flow.Publisher<OrderInfo> getOrderAmt(String orderId) {
return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId));
}
}
Subscription接口:实现向 Subscriber 发送 OrderInfo Steam
public class SubscriptionImpl implements Flow.Subscription {
private final Flow.Subscriber<? super OrderInfo> subscriber;
private final String orderId;
public SubscriptionImpl(Flow.Subscriber<? super OrderInfo> subscriber, String orderId) {
this.subscriber = subscriber;
this.orderId = orderId;
}
@Override
public void request(long n) {
// 另起一个线程向 subscriber 发送下一个元素
Executors.newSingleThreadExecutor().submit(() -> {
for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环
try {
// 将当前 订单号 发送给 Subscriber
subscriber.onNext(OrderInfo.reduceAmt(orderId));
} catch (Exception e) {
// 查询报错将这个报错信息传给 Subscriber
subscriber.onError(e);
e.printStackTrace();
break;
}
});
}
@Override
public void cancel() {
// 如果 Subscription 被取消了,向 subscriber 发送一个完成信号
subscriber.onComplete();
}
}
Subscriber接口:实现打印输出收到的 订单 数据
public class SubscriberImpl implements Flow.Subscriber<OrderInfo> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(OrderInfo orderInfo) {
System.out.println(orderInfo);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done!");
}
}
总结
Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。
- 点赞
- 收藏
- 关注作者
评论(0)