Java 之 CompletableFuture 使用
一、 CompletableFuture 介绍
日常开发中经常会遇到一个接口中有多个方法的调用( RPC 服务或者存储介质),我们必然会想提升接口性能,在 JDK1.8 之前大都使用线程池结合 Future 进行并行调用。Future 用于异步计算,如果要减少阻塞一般会使用回调的方式,大量的回调代码就会有 回调地狱 问题,降低了 可读性 和 可维护性 ,且会造成系统资源浪费。
JDK1.8 引入了 CompletableFuture 工具并提供了丰富的 API 支撑日常开发,使用函数式书写风格让我们在异步化组合编排上更加便利,且充分利用了系统资源。并且 CompletableFuture 也支持传入业务自定义的线程池。
CompletableFuture 中包含两个字段: result 和 stack 。result 用于存储当前 CF 的结果,stack 表示当前 CF 完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的 CF 的计算,依赖动作可以有多个(表示有多个依赖它的 CF ),以栈的形式存储,stack 表示栈顶元素,这种方式类似“观察者模式”。
二、CompletableFuture 常见 API
-
supplyAsync
单个异步计算请求 -
thenAccept
接收上一个 CompletableFuture 结果进行后续处理 -
thenAppy
接收上一个 CompletableFuture 结果进行后续处理,返回 CompletableFuture 泛型结果可以改变 -
thenCompose
接收上一个 CompletableFuture 结果,返回一个新的 CompletableFuture -
complete
可用于将方法返回值转为 CompletableFuture 类型,并添加值信息 -
completeExceptionally
可用于将方法返回值转为 CompletableFuture 类型,并添加异常信息 -
completedFuture
创建一个已处理完成的 CompletableFuture -
thenCombine
用于有两个请求依赖进行编排请求 -
allOf
和anyOf
可用于多个请求依赖的控制 -
exceptionally
接收异步计算发生的异常信息 -
get
和join
都可以阻塞获取 CompletableFuture 的返回值,但 join 是抛出的是 uncheck 异常,get 方法则需要手动处理
三、各 API 使用介绍及演示 Demo
2.1 supplyAsync、completeFuture 和 complete
ExecutorService executor = Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return "result1";
}, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
2.2 thenCombine
可用于多个 CompletableFuture 的编排,如有三个操作 step1、step2、step3 存在依赖关系,step3 依赖 step1 和 step2 的结果,则可以使用该 API 进行编排,这是由于 CompletableFuture 实现了接口 CompletionStage ,能够按步骤进行下一次 Stage 的触发,我们可以结合 thenAppy、thenCompose 等函数式编程方法使用。
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行step 1");
return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行step 2");
return "step2 result";
});
cf1.thenCombine(cf2, (result1, result2) -> {
System.out.println(result1 + " , " + result2);
System.out.println("执行step 3");
return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));
2.3 thenApply、thenAccept、thenCompose
thenApply 和 thenAccept 都可以获取前一个任务的返回值,但 thenAccept 自身不会有返回值。thenCompose 可以返回一个新的 CF 。
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
//result1为CF1的结果
//......
return "result3";
});
CompletableFuture<Void> cf4 = cf2.thenAccept(result2 -> {
//result2为CF2的结果
//......
});
CompletableFuture<Void> cf4 = cf2.thenAccept(result2 -> {
//result2为CF2的结果
//......
});
CompletableFuture<Integer> cf5 = orderUserMobileFuture.thenCompose(result4 -> CompletableFuture.supplyAsync(() -> 123));
2.4 allOf 和 anyOf
allOf 是要求所有的 CompletableFuture 都完成后才会触发后续动作, anyOf 是任意一个 CompletableFuture 完成即可。
CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf1, cf2, cf3);
CompletableFuture<String> result = cf4.thenApply(v -> {
//这里的join并不会阻塞,cf1 cf2 cf3 必须都是完成后才会进入 thenApply
result1 = cf1.join();
result2 = cf2.join();
result3 = cf3.join();
//根据result1、result2、result53组装最终result;
return "result";
});