CompletableFuture异步编排
@toc
1、CompletableFuture异步编排
1.1 为什么需要异步编排
问题:查询商品详情页的逻辑非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。
目前我业务中商品详情页包含如下7个方法:
获取sku的基本详情和图片列表
获取实时价格
获取三级分类
获取销售属性和选中状态
获取商品切换数据
获取海报信息
获取平台信息
上面查询过程都是用OpenFeign
实现的,假设每个远程调用需要1s时间,那么全部执行完需要7s,这对用户来说是难以接受的。
那如果有多个线程同时执行这7步操作呢,时间是不是就更短了。
1.2 CompletableFuture介绍
Future
是Java 5
添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture
,提供了非常强大的Future
的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture
的方法。
CompletableFuture
类实现了Future
接口,所以你还是可以像以前一样通过get
方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture
和FutureTask
同属于Future
接口的实现类,都可以获取线程的执行结果。
1.3 创建异步对象
CompletableFuture
提供了四个静态方法来创建一个异步操作。
没有指定Executor的方法会使用ForkJoinPool.commonPool()
作为它的线程池执行异步代码。
runAsync
方法不支持返回值。supplyAsync
可以支持返回值。
whenComplete
可以处理正常或异常的计算结果,exceptionally
处理异常情况。BiConsumer<? super T,? super Throwable>
可以定义处理业务
whenComplete
和 whenCompleteAsync
的区别:
whenComplete
:是执行当前任务的线程执行继续执行 whenComplete
的任务。
whenCompleteAsync
:是执行把 whenCompleteAsync
这个任务继续提交给线程池来进行执行。
方法不以Async
结尾,意味着Action
使用相同的线程执行,而Async
可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
代码演示:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个没有返回值的异步对象
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("没有返回值结果");
});
System.out.println(future.get());
//创建一个有返回值的异步对象
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int a=1/0;
return 404;
}
}).whenComplete(new BiConsumer<Integer, Throwable>() {
/**
*whenComplete 和异步对象使用用一个线程
* @param integer 异步对象执行后的返回值结果
* @param throwable 异常对象
*/
@Override
public void accept(Integer integer, Throwable throwable) {
System.out.println("whenComplete:"+integer);
System.out.println("whenComplete:"+throwable);
}
}).exceptionally(new Function<Throwable, Integer>() {
/**
* 只处理异常的回调
* @param throwable
* @return
*/
@Override
public Integer apply(Throwable throwable) {
return null;
}
}).whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
/**
* whenCompleteAsync跟异步对象有可能不适用同一个线程,由线程池重新分配
* @param integer
* @param throwable
*/
@Override
public void accept(Integer integer, Throwable throwable) {
}
});
}
}
1.4 线程串行化与并行化方法
thenApply
方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept
方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun
方法:只要上面的任务执行完成,就开始执行thenRun
,只是处理完任务后,执行 thenRun
的后续操作
带有Async
默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
代码演示:
public class CompletableFutureDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
50,
500,
30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000)
);
//创建一个异步任务对象A
CompletableFuture<Object> futureA = CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
return "404";
}
},threadPoolExecutor);
//创建一个B
futureA.thenAcceptAsync(new Consumer<Object>() {
@SneakyThrows
@Override
public void accept(Object o) {
Thread.sleep(500);
System.out.println("我是B");
}
},threadPoolExecutor);
//创建一个C
futureA.thenAcceptAsync(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println("我是C");
}
},threadPoolExecutor);
}
}
这里是测试看是否是并行化,我们让B休眠一会,可以看到先输出C再输出B,说明是并行化。
因为如果是串行化的化,那么即使B休眠一会,那么C也会一直等着,输出顺序为B、C
1.5 多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
allOf
:等待所有任务完成。
anyOf
:只要有一个任务完成。
1.6 优化商品详情页(业务代码)
1.6.1 未优化之前的代码
@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
@Autowired
private ProductFeignClient productFeignClient;
//获取商品详情数据
@Override
public HashMap<String, Object> getItem(Long skuId) {
HashMap<String, Object> resultMap=new HashMap<>();
//获取sku的基本详情和图片列表
SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
//获取实时价格
BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
//判断
if(skuInfo!=null){
//获取三级分类
BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
//获取销售属性和选中状态
List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
//获取商品切换数据
Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
//获取海报信息
List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
resultMap.put("categoryView",categoryView);
resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
resultMap.put("spuPosterList",spuPosterBySpuId);
}
//获取平台信息
List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
//处理数据符合要求 List Obj key attrName value attrValue
List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
Map<String, String> map = new HashMap<>();
map.put("attrName", baseAttrInfo.getAttrName());
map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
return map;
}).collect(Collectors.toList());
//存储数据
resultMap.put("skuInfo",skuInfo);
resultMap.put("price",skuPrice);
resultMap.put("skuAttrList",spuAttrList);
return resultMap;
}
}
1.6.2 使用CompletableFuture异步编排
配置线程池:
@Configuration
public class ThreadPoolConfig {
/**
* 核心线程数
* 最大线程数
* 空闲存活时间
* 时间单位
* 阻塞队列
* 默认:
* 线程工厂
* 拒绝策略
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
return new ThreadPoolExecutor(
50,
500,
30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000)
);
}
}
实现类改造:
@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
@Autowired
private ProductFeignClient productFeignClient;
@Autowired
private ThreadPoolExecutor executor;
//获取商品详情数据
@Override
public HashMap<String, Object> getItem(Long skuId) {
HashMap<String, Object> resultMap=new HashMap<>();
CompletableFuture<SkuInfo> skuInfoCompletableFuture = CompletableFuture.supplyAsync(new Supplier<SkuInfo>() {
@Override
public SkuInfo get() {
//获取sku的基本详情和图片列表
SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
resultMap.put("skuInfo", skuInfo);
return skuInfo;
}
}, executor);
CompletableFuture<Void> skuPriceCompletableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
//获取实时价格
BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
resultMap.put("price", skuPrice);
}
}, executor);
//判断
CompletableFuture<Void> categoryViewCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
@Override
public void accept(SkuInfo skuInfo) {
//获取三级分类
BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
resultMap.put("categoryView",categoryView);
}
}, executor);
CompletableFuture<Void> spuSaleAttrListCheckBySkuCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
@Override
public void accept(SkuInfo skuInfo) {
//获取销售属性和选中状态
List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
}
}, executor);
CompletableFuture<Void> skuValueIdsMapCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
@Override
public void accept(SkuInfo skuInfo) {
//获取商品切换数据
Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
}
}, executor);
CompletableFuture<Void> findSpuPosterBySpuIdCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
@Override
public void accept(SkuInfo skuInfo) {
//获取海报信息
List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
resultMap.put("spuPosterList",spuPosterBySpuId);
}
}, executor);
CompletableFuture<Void> attrListCompletableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
//获取平台信息
List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
//处理数据符合要求 List Obj key attrName value attrValue
List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
Map<String, String> map = new HashMap<>();
map.put("attrName", baseAttrInfo.getAttrName());
map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
return map;
}).collect(Collectors.toList());
//存储数据
resultMap.put("skuAttrList", spuAttrList);
}
}, executor);
//多任务组合 -- 所有的异步任务执行完成才是完成
CompletableFuture.allOf(
skuInfoCompletableFuture,
skuPriceCompletableFuture,
categoryViewCompletableFuture,
spuSaleAttrListCheckBySkuCompletableFuture,
skuValueIdsMapCompletableFuture,
findSpuPosterBySpuIdCompletableFuture,
attrListCompletableFuture
).join();
return resultMap;
}
}
根据是否有返回值决定调用哪个API,然后看有没有依赖关系,有好几个都依赖SkuInfo,所以要用skuInfoCompletableFuture去创建。
我们需要等待每个任务执行完毕之后在返回,所以最后使用
allOf
方法进行多任务组合。
1.6.3 测试功能是否正常
这种异步效果其实在高并发下环境下测比较好,我们这里验证功能是否正常就行。
访问商品详情页:
查看Redis中的数据
可以看到,有6个key被缓存,由于我们的价格是实时价格,所以一直查的是数据库,千万别用缓存。
- 点赞
- 收藏
- 关注作者
评论(0)