CompletableFuture
概述
CompletableFuture
可以显式完成的 Future
(设置其值和状态),并且可以用作 CompletionStage
,支持在其完成时触发的相关函数和操作。
类结构
CompletableFuture
的UML图如下所示:
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。
- Future代表异步计算的结果。
CompletionStage
每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:
- Function。有输入、有输出。
- Runnable。无输入、无输出
- Consumer。有输入、无输出
示例
创建子任务
CompletionStage子任务的创建是通过CompletableFuture完成的,
//子任务包装一个supplier实例,并使用ForkJoinPool.commonPool线程池调用
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
//子任务包装一个supplier实例,并使用指定的线程池调用
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
//子任务包装一个Runnable实例,并使用ForkJoinPool.commonPool线程池调用
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
//子任务包装一个Runnable实例,并使用指定的线程池调用
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
基础示例如下所示:
// result is xw say:hello world
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "hello world";
}).thenApply((x) ->
{
return "xw say:" + x;
});
String s = stringCompletableFuture.get();
System.out.println(s);
}
子任务回调钩子
java.util.concurrent.CompletableFuture#exceptionally
,异常回调钩子函数java.util.concurrent.CompletableFuture#whenComplete
,
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "hello world";
}).thenApply((x) ->
{
int i = 19/0;
return "xw say:" + x;
});
stringCompletableFuture.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("运行结果: "+ s );
}
});
stringCompletableFuture.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println("执行异常:"+ throwable.getMessage());
return "未知异常";
}
});
}
}
handle处理异常和结果
java.util.concurrent.CompletableFuture#handle
,在执行任务的统一线程处理异常和结果。java.util.concurrent.CompletableFuture#handleAsync(java.util.function.BiFunction<? super T,java.lang.Throwable,? extends U>)
,可能不在执行任务的统一线程中处理异常和结果。java.util.concurrent.CompletableFuture#handleAsync(java.util.function.BiFunction<? super T,java.lang.Throwable,? extends U>, java.util.concurrent.Executor)
,在指定线程池处理异常和结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "hello world";
}).thenApply((x) ->
{
return "xw say:" + x;
});
stringCompletableFuture.handle(new BiFunction<String, Throwable, Void>() {
@Override
public Void apply(String s, Throwable throwable) {
System.out.println("结果 "+ s);
return null;
}
});
}
thenApply()
、thenRun()
、thenAccept()
这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>
、Runnable
、Consumer<? super T>
类型。但是,thenCompose()
方法与thenApply()
方法有本质的不同:
thenCompose()的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。具体来说,thenCompose()返回的是包装了普通异步方法的CompletionStage任务实例,通过该实例还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用。
thenApply()的返回值则简单多了,直接就是第二个任务的普通异步方法的执行结果,它的返回类型与第二步执行的普通异步方法的返回类型相同,通过thenApply()所返回的值不能进行下一轮CompletionStage链式(或者流式)调用。
异步任务合并执行
thenCombine()会在两个CompletionStage任务都执行完成后,把两个任务的结果一起交给thenCombine()来处理。
// future1跟future2是同时执行的,結果是12 public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { return "1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return "2"; }); CompletableFuture<Object> objectCompletableFuture = future1.thenCombine(future2, new BiFunction<String, String, Object>() { @Override public String apply(String s, String s2) { return s + s2; } }); System.out.println(objectCompletableFuture.get()); }
runAfterBoth方法与thenCombine类似,但是runAfterBoth方法都不接收参数。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("future1"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("future2"); }); CompletableFuture<Void> voidCompletableFuture = future1.runAfterBoth(future2, new Runnable() { @Override public void run() { System.out.println("1111111111"); } }); voidCompletableFuture.get(); }
thenAcceptBoth接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { return "1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return "2"; }); CompletableFuture<Void> voidCompletableFuture = future1.thenAcceptBoth(future2, new BiConsumer<String, String>() { @Override public void accept(String s, String s2) { System.out.println(s + s2); } }); System.out.println(voidCompletableFuture.get()); }
allOf()等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()。
applyToEither()方法。两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。
runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的
acceptEither()方法对applyToEither()方法和runAfterEither()方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()就用那个最快的CompletionStage的结果作为下一步的输入,但是第三步没有输出。