跳至主要內容

CompletableFuture

xw大约 4 分钟JAVACompletableFuture

概述

CompletableFuture可以显式完成的 Future(设置其值和状态),并且可以用作 CompletionStage,支持在其完成时触发的相关函数和操作。

类结构

CompletableFuture的UML图如下所示:

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。
  • Future代表异步计算的结果。

CompletionStage

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:

  • Function。有输入、有输出。
  • Runnable。无输入、无输出
  • Consumer。有输入、无输出

示例

创建子任务

CompletionStage子任务的创建是通过CompletableFuture完成的,

    //子任务包装一个supplier实例,并使用ForkJoinPool.commonPool线程池调用
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(ASYNC_POOL, supplier);
    }

     //子任务包装一个supplier实例,并使用指定的线程池调用
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

    //子任务包装一个Runnable实例,并使用ForkJoinPool.commonPool线程池调用
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(ASYNC_POOL, runnable);
    }

    //子任务包装一个Runnable实例,并使用指定的线程池调用
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

基础示例如下所示:

// result is xw say:hello world
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello world";
        }).thenApply((x) ->
        {
            return "xw say:" + x;
        });
        String s = stringCompletableFuture.get();
        System.out.println(s);

    }

子任务回调钩子

  • java.util.concurrent.CompletableFuture#exceptionally,异常回调钩子函数
  • java.util.concurrent.CompletableFuture#whenComplete
 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello world";
        }).thenApply((x) ->
        {
                int i = 19/0;
                return "xw say:" + x;
        });
        stringCompletableFuture.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                  System.out.println("运行结果: "+ s );
            }
        });

        stringCompletableFuture.exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
               System.out.println("执行异常:"+ throwable.getMessage());
               return "未知异常";
            }
        });
    }
}

handle处理异常和结果

  • java.util.concurrent.CompletableFuture#handle,在执行任务的统一线程处理异常和结果。
  • java.util.concurrent.CompletableFuture#handleAsync(java.util.function.BiFunction<? super T,java.lang.Throwable,? extends U>),可能不在执行任务的统一线程中处理异常和结果。
  • java.util.concurrent.CompletableFuture#handleAsync(java.util.function.BiFunction<? super T,java.lang.Throwable,? extends U>, java.util.concurrent.Executor),在指定线程池处理异常和结果。
  public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello world";
        }).thenApply((x) ->
        {
            return "xw say:" + x;
        });
        stringCompletableFuture.handle(new BiFunction<String, Throwable, Void>() {
            @Override
            public Void apply(String s, Throwable throwable) {
                System.out.println("结果 "+ s);
                return null;
            }
        });
    }

thenApply()thenRun()thenAccept()这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>RunnableConsumer<? super T>类型。但是,thenCompose()方法与thenApply()方法有本质的不同:

  • thenCompose()的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。具体来说,thenCompose()返回的是包装了普通异步方法的CompletionStage任务实例,通过该实例还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用。

  • thenApply()的返回值则简单多了,直接就是第二个任务的普通异步方法的执行结果,它的返回类型与第二步执行的普通异步方法的返回类型相同,通过thenApply()所返回的值不能进行下一轮CompletionStage链式(或者流式)调用。

异步任务合并执行

  • thenCombine()会在两个CompletionStage任务都执行完成后,把两个任务的结果一起交给thenCombine()来处理。

    // future1跟future2是同时执行的,結果是12   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                return "1";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                return "2";
            });
    
            CompletableFuture<Object> objectCompletableFuture = future1.thenCombine(future2, new BiFunction<String, String, Object>() {
                @Override
                public String apply(String s, String s2) {
                    return s + s2;
                }
            });
    
            System.out.println(objectCompletableFuture.get());
        }
    
  • runAfterBoth方法与thenCombine类似,但是runAfterBoth方法都不接收参数。

     public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future1");
            });
    
            CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future2");
            });
    
            CompletableFuture<Void> voidCompletableFuture = future1.runAfterBoth(future2, new Runnable() {
                @Override
                public void run() {
                    System.out.println("1111111111");
                }
            });
    
            voidCompletableFuture.get();
        }
    
  • thenAcceptBoth接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                return "1";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                return "2";
            });
    
            CompletableFuture<Void> voidCompletableFuture = future1.thenAcceptBoth(future2, new BiConsumer<String, String>() {
                @Override
                public void accept(String s, String s2) {
                    System.out.println(s + s2);
                }
            });
    
    
            System.out.println(voidCompletableFuture.get());
        }
    
    
  • allOf()等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()。

  • applyToEither()方法。两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。

  • runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的

  • acceptEither()方法对applyToEither()方法和runAfterEither()方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()就用那个最快的CompletionStage的结果作为下一步的输入,但是第三步没有输出。