五分钟带你了解CompletableFuture
五分钟带你了解CompletableFuture
前言
面试官:JAVA8特性你知道哪些
我:Lambda表达式,函数式接口,Stream流
当下我正准备向面试官展示我的能力时, 面试官突然用一句简短的话回应道: 除此之外呢, 你了解CompletableFuture吗?
我:顿时脑袋一懵,啥玩意呀,只听说过Future呀,内心一万句mmp。
别慌,小编今天带带大家了解下CompletableFuture。
正文
在正式学习Future之前,小编有必要先带大家了解下JKD5中的另一个重要概念——Future接口.该接口的设计初衷是对未来某个时刻可能发生的结果进行建模,它模拟了一种非阻塞计算流程,返回一个执行运算结果的引用.当运算结束后,其他人可以通过该引用获取执行结果.值得注意的是,通过使用Future机制,我们能够将等待耗时操作的线程资源释放出来并投入更有价值的任务.在实际工作中我也曾运用过这一技术,以下是一个具体的应用场景.
List<CommissionDetailVo> bigList = new ArrayList<>();
List<CommissionDetailVo> onePage = new ArrayList<>();
List<Future> futureList = new ArrayList<>();
try {
for ( int i = 0; i < THREADS; i++) {
final int k = i;
Future list = threadPool.submit(() -> {
return esSearch(k,settlementDetailSearchVo);
});
futureList.add(list);
}
}catch (Exception e) {
e.printStackTrace();
logger.error("Es查询失败", e.getMessage());
}
for ( Future list: futureList) {
onePage =(List<CommissionDetailVo>) list.get();
bigList.addAll(onePage);
}
AI写代码
那么可能会有同学好奇:既然有了Future,为何还需要学习CompletableFuture呢?别着急,请听小编我为你娓娓道来。
在使用Future的过程中,大家会发现Future有两种方式返回结果的方式
- 阻塞方式get()
- 轮询方法isDone()
阻塞方式获取结果在某种程度上与异步编程的根本目标相互矛盾;然而,在采用轮询机制时会消耗大量的CPU资源。因此,在获取结果方面Future的表现并不尽如人意。
当希望异步任务完成后能够立即获取结果时,则Current Future已经无法满足我们的需求
1.异步任务结束后,可以自动回调某个对象的方法;
2.异步任务出错后,可以自动回调某个对象的方法;
3.主线程设置好回调后,可以不再关心异步任务的执行;
可以看下面的例子:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync( () -> feeItem.getAmount());
future.thenAccept(amount -> System.out.println(amount));
future.exceptionally(throwable -> {System.out.println("发生异常"+throwable.toString());return 111;});
AI写代码
不论是异步任务顺利运行还是出现异常情况,在我们正确配置回调机制后,在异步任务完成时会自动触发
基本API
runAsync与supplyAsync
创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
AI写代码
若未显式指定线程池,则默认采用ForkJoinPool/commonPool()以执行异步运算。其主要区别在于:runAsync操作不返回执行结果,而 supplyAsync则可提供执行结果。
实例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> feeItem.getAmount());
AI写代码
计算结果完成时的回调方法
当异步计算完成时,或者出现异常的时候可以执行指定的Action.
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
AI写代码
那whenComplete 与 whenCompleteAsync有啥区别呢
当当前任务完成后会立即执行 whenComplete 中所指定的任务。
完成后会将 whenComplete 中所提交的任务交给其他线程池进行处理。
完成后会立即传递 whenComplete 中所指定的任务给其他线程池处理。
完成后会立即把 whenComplete 事件中的任务提交给其他线程池处理。
实例:
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> feeItem.getAmount());
future3.whenComplete( (amount, throwable)-> System.out.println(amount));
AI写代码
handle
该处理方法虽然返回的是一个新的CompletableFuture实例,但其结果与直接调用原来的CompletableFuture进行计算所得的结果不同.每当原先的那个CompletableFuture完成执行或发生异常时,就会触发当前这个CompletableFuture对象进行计算,其结果由BiFunction参数来进行综合评估.由此可见,这一系列处理集成了当Complete和转换两大功能特点.
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
AI写代码
实例:
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> feeItem.getAmount());
CompletableFuture<Integer> future4 = future3.handleAsync((amount,throwable) -> amount*3);
System.out.println(future4.get());
AI写代码
转换
thenApply负责处理原有的Computable Future(CF)计算结果,并将其传递给函数fn。该函数fn会处理这个输入并将其结果作为新的Computable Future的基础进行进一步操作。其作用是将原有Computable Future转换成一个新的Computable future对象。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
AI写代码
实例:
CompletableFuture<Integer> a =CompletableFuture.supplyAsync(() -> 1).thenApply(i -> i+1).thenApply(i -> i*i).whenComplete((r,throwable) -> System.out.println(r));
System.out.println(a.get());
AI写代码
纯消费执行Action
则则接受仅作用于计算所得的结果,并不会输出新的计算值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
AI写代码
thenCombine
可以将两个CompletableFuture对象结果整合起来
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
AI写代码
thenCompose
将两个异步操作进行串联,在第一个操作完成后通过调用thenCompose方法传递一个函数对象。在第一个CompletableFuture完成执行后其结果将作为该函数的参数传递,并由该函数返回的结果将以第一个CompletableFuture 的最终结果作为输入来启动第二个CompletableFuture 对象。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
AI写代码
实例:
List<CompletableFuture<Integer>> list = feeItemList.stream()
.map(feeItems ->
CompletableFuture.supplyAsync(() -> feeItems.getAmount()))
.map(future -> future.thenApply(i -> i * 2))
.map(future -> future.thenCompose(amount -> CompletableFuture.supplyAsync( () -> amount*3 )))
.collect(Collectors.toList()); System.out.println(list.stream().map(CompletableFuture::join).collect(Collectors.toList()));
AI写代码
这里的CompletableFuture的join操作用于返回CompletableFuture计算后的结果
辅助方法allOf和anyOf
allOf: 当所有的CompletableFuture都执行完才计算
在所有CompletableFuturer执行完之前,主线程会阻塞
anyOf: 当任意一个ComplteableFuture执行完就执行计算
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
AI写代码
实例:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> feeItem.getAmount());
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> feeItem1.getAmount());
CompletableFuture<Object> f1 = CompletableFuture.anyOf(future2, future1);
CompletableFuture<Void> f2= CompletableFuture.allOf(future2, future1);
System.out.println(f1.get()+" "+f2.get());
AI写代码
CompletableFuture的方法还有很多,大概有60多个,这里就不一一赘述了。
并行流 or CompletableFuture
对集合进行并行运算有两种方法:一种是采用parallelStream()(其结果可能造成阻塞),另一种是利用CompletableFuture支持自定义线程池配置(从而避免因单个线程长时间等待I/O操作而引发的整体计算阻塞)。具体建议如下:首先可以选择parallelStream()(其结果可能造成阻塞);其次也可以选择基于自定义线程池配置的CompletableFuture实现(允许我们根据需求调节线程池规模)。这两种方法各有优劣,在实际应用中可以根据具体场景选择合适的方式。
当执行的完全是计算密集型操作且无I/O操作时
如果你的多线程作业需要等待I/O操作或网络通信,则采用CompletableFuture的灵活性更高。
使用CompletableFuture优化你的代码性能
该方案通过使用CompletableFuture来优化系统的性能指标和吞吐量。在大多数同学的系统设计中这一设计模式被广泛采用,并且已经被证明是一种有效的实现方式。

这样会有什么问题呢
1.系统的吞吐量不高
2.接口调用次数频繁,时间长
我们就可以进行如下改进:

可将大量请求先加入阻塞队列中,并不必每次都直接访问查询接口;之后可以通过定时任务每隔一段时间自动处理队列中的请求。
接收请求并执行批量查询操作。一旦批量查询完成并返回结果数据,则按照批次信息将结果传递给相应的线程组。
下面是小编写的一个例子:
public class AsyncServie {
@Autowired
private OrderService orderService;
public AtomicInteger atomicInteger = new AtomicInteger(0);
class Request {
String orderCode;
String serialNo;
CompletableFuture<Map<String,Object>> future;
}
LinkedBlockingDeque<Request> blockingDeque = new LinkedBlockingDeque<>();
public Map<String, Object> queryOrderInfo(String orderCode) throws InterruptedException, ExecutionException {
Request request = new Request();
request.orderCode = orderCode;
request.serialNo = UUID.randomUUID().toString();
CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
request.future = future;
blockingDeque.add(request);
atomicInteger.getAndIncrement();
System.out.println(atomicInteger);
//监听有没有返回值 一直阻塞
return future.get();
}
@PostConstruct
public void init() {
//每隔10ms去队列获取请求,批量发起
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
executorService.scheduleAtFixedRate( () -> {
int size = blockingDeque.size();
if (0 == size) {
return;
}
List<Map<String, String>> params = Lists.newArrayList();
List<Request> requestList = Lists.newArrayList();
for (int i=0; i< size; i++){
Request request = blockingDeque.poll();
Map<String ,String> map = Maps.newHashMap();
map.put("orderCode", request.orderCode);
map.put("serialNo", request.serialNo);
params.add(map);
requestList.add(request);
}
System.out.println("批量处理的size"+size);
System.out.println(Thread.currentThread().getName());
List<Map<String, Object>> orderInfo = orderService.getOrderInfo(params);
//匹配对应的serialNo
Optional.ofNullable(requestList).ifPresent(requests -> requests.forEach(request -> {
String serialNo = request.serialNo;
Optional.ofNullable(orderInfo).ifPresent(orderInfos -> orderInfos.forEach(response -> {
String serial = response.get("serialNo").toString();
if (Objects.equals(serialNo, serial)) {
request.future.complete(response);
}
}));
}));
}, 0, 10, TimeUnit.MILLISECONDS);
}
}
AI写代码
好了今天就介绍到这了。
