百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

JUC并发—14.Future模式和异步编程分析二

cac55 2025-03-24 14:18 25 浏览 0 评论

大纲

1.FutureTask(Future/Callable)的使用例子

2.FutureTask(Future/Callable)的实现原理

3.FutureTask(Future/Callable)的源码分析

4.CompletableFuture的基本介绍

5.CompletionStage方法及作用说明

6.CompletableFuture的实现原理分析

7.CompletableFuture的核心源码分析


5.CompletionStage方法及作用说明

(1)CompletionStage示例

(2)CompletionStage的方法概述

(3)有传参但没返回值的方法

(4)有传参且有返回值的方法

(5)没传参也没返回值的方法

(6)组合起来串行执行的方法

(7)异常处理方法


(1)CompletionStage示例

CompletionStage表示任务执行的一个阶段,每个异步任务都会返回一个新的CompletionStage对象,可针对多个CompletionStage对象进行串行、并行、聚合等操作。简单来说,CompletionStage就是实现异步任务执行后的自动回调功能


下面的CompletionStage例子:首先需要调用一个远程方法获得结果,然后把返回结果保存到数据库。所以代码中先定义一个异步任务处理远程调用,并返回CompletionStage,接着调用thenAccept()方法把第一步的执行结果保存到数据库中。

public class CompletionStageExample {
    public static void main(String[] args) {
        CompletionStage cf = CompletableFuture.supplyAsync(() -> "远程调用的返回结果");
        cf.thenAccept(result -> {
            System.out.println("第一个异步任务的返回值是:" + result);
            System.out.println("把result保存到数据库");
        });
    }
}

可以看见和Future明显不一样的地方就是:thenAccept()方法传入的回调对象第一个异步任务执行完后自动触发的,不需要像Future那样阻塞当前线程等待返回结果,还可以使用thenAcceptAsync()方法让保存到数据库的任务使用独立线程池


(2)CompletionStage的方法概述

CompletionStage总共提供了38个方法来实现多个任务的串行、并行、聚合等功能,这些方法可以按功能进行如下的分类:

一.有传参但没返回值的方法

二.有传参且有返回值的方法

三.没传参也没返回值的方法

四.组合起来串行执行的方法

五.异常处理的方法


注意:Accept关键字有传参没有返回值Run关键字没传参没返回值


(3)有传参但没返回值的方法

有传参但没返回值的方法就是:用上一个异步任务的结果作为当前方法的参数进行下一步运算,并且当前方法会产生一个新的没有返回值的CompletionStage对象。有传参但没返回值的方法都包含Accept关键字


一.依赖单个CompletionStage任务完成

thenAccept()相关方法上一个任务的执行结果作为参数执行当前的action,这些方法接收的参数是一个函数式接口Consumer,表示一个待执行的任务。这些方法的返回值是CompletionStage,表示没有返回值


注意:方法以Async结尾,表示使用单独的线程池来执行action,否则使用执行当前任务的线程执行action

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed with this stage's result as the argument to the supplied action.
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage thenAccept(Consumer action);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with this stage's result as the argument to the supplied action.
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage thenAcceptAsync(Consumer action);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using the supplied Executor, 
    //with this stage's result as the argument to the supplied action.
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage thenAcceptAsync(Consumer action, Executor executor);
    ...
}

public class CompletionStageExample {
    //当cf实例的任务执行完成后,会回调传入thenAcceptAsync()方法中的回调函数
    //其中回调函数的result表示cf异步任务的返回结果
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { 
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> "thenAccept message");
        cf.thenAcceptAsync((result) -> {
            System.out.println(Thread.currentThread().getName() + "第一个异步任务的返回值:" + result);
        });
    }
}

二.依赖两个CompletionStage任务都完成

thenAcceptBoth()相关方法提供了与thenAccept()相关方法类似的功能。不同点在于thenAcceptBoth()相关方法多了一个CompletionStage参数,表示当两个CompletionStage任务都完成后,才执行后面的action。而且这个action可以接收两个参数,这两个参数分别表示两个任务的返回值thenAcceptBoth()相关方法相当于实现了两个异步任务的组合

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this and the other given stage both complete normally, 
    //is executed with the two results as arguments to the supplied action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param  the type of the other CompletionStage's result
    //@return the new CompletionStage
    public  CompletionStage thenAcceptBoth(CompletionStage other, BiConsumer action); 

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with the two results as arguments to the supplied action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param  the type of the other CompletionStage's result
    //@return the new CompletionStage
    public  CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action); 

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //is executed using the supplied executor, 
    //with the two results as arguments to the supplied function.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the type of the other CompletionStage's result
    //@return the new CompletionStage
    public  CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor); 
    ...
}

public class ThenAcceptBothExample {
    //task1和task2都执行完成后,会得到两个任务的返回值AcceptBoth和message,
    //接着开始执行thenAcceptBoth()中的action,
    //这个action会接收前面两个任务的执行结果r1和r2,并最终打印出:执行结果为"AcceptBoth+message"
    public static void main(String[] args) {
        CompletableFuture task1 = CompletableFuture.supplyAsync(() -> "AcceptBoth");
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> "message");
        task1.thenAcceptBoth(task2, (r1, r2) -> {
            System.out.println("执行结果" + r1 + "+" + r2);
        });
       
        //或者采用Fluent风格来写
        //CompletableFuture.supplyAsync(() -> "AcceptBoth").thenAcceptBoth(
        //    CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {
        //      System.out.println("执行结果:" + r1 + ", " + r2);
        //    }
        //);
    }
}

三.依赖两个CompletionStage任务中的任何一个完成

acceptEither()相关方法和thenAcceptBoth()相关方法几乎一样。它同样接收两个CompletionStage任务,但是只需要保证其中一个任务完成,就会回调acceptEither()方法中传入的action任务。这两个CompletionStage任务谁先完成就会获得谁的返回值,作为参数传给后续的action任务。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed with the corresponding result as argument to the supplied action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage acceptEither(CompletionStage other, Consumer action);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with the corresponding result as argument to the supplied action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed using the supplied executor, 
    //with the corresponding result as argument to the supplied function.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, Executor executor); 
    ...
}

(4)有传参且有返回值的方法

有传参且有返回值的方法就是:用上一个异步任务的执行结果作为当前方法的参数进行下一步计算,并且当前方法会产生一个新的有返回值的CompletionStage对象。


一.依赖单个CompletionStage任务完成

thenApply()这一组方法的功能是等上一个CompletionStage任务执行完后,就会把执行结果传递给函数fn,将函数fn作为一个新的执行任务去执行,最后返回一个新的有返回值的CompletionStage对象。


其中以Async结尾的方法表示函数fn这个任务将采用单独的线程池来执行

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed with this stage's result as the argument to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenApply(Function fn);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with this stage's result as the argument to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenApplyAsync(Function fn);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using the supplied Executor, 
    //with this stage's result as the argument to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenApplyAsync(Function fn, Executor executor); 
    ...
}

二.依赖两个CompletionStage任务都完成

thenCombine()这一组方法的功能类似于thenAcceptBoth()方法。它表示两个CompletionStage任务并行执行结束后,把这两个CompletionStage任务的执行结果传递给函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this and the other given stage both complete normally, 
    //is executed with the two results as arguments to the supplied function.
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the type of the other CompletionStage's result
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenCombine(CompletionStage other, BiFunction fn);

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with the two results as arguments to the supplied function.
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the type of the other CompletionStage's result
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn);

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //is executed using the supplied executor, 
    //with the two results as arguments to the supplied function.  
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the type of the other CompletionStage's result
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor); 
    ...
}

public class ThenCombineExample {
    public static void main(String[] args) {
        CompletableFuture task1 = CompletableFuture.supplyAsync(() -> "Combine");
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> "message");
        CompletableFuture cf = task1.thenCombineAsync(task2, (r1, r2) -> {
            System.out.println("执行结果:" + r1 + ", " + r2);
            return r1 + r2;
        });
        System.out.println(cf.get());
      
        //或者采用Fluent风格来写
        //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine").thenCombineAsync(
        //    CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {
        //        System.out.println("执行结果:" + r1 + ", " + r2);
        //        return r1 + r2;
        //    }
        //);
        //System.out.println(cf.get());
    }
}

三.依赖两个CompletionStage任务中的任何一个完成

applyToEither()方法表示两个CompletionStage任务中任意一个任务完成后,都执行传入applyToEither()方法中的函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed with the corresponding result as argument to the supplied function.
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage applyToEither(CompletionStage other, Function fn);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with the corresponding result as argument to the supplied function.
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage applyToEitherAsync(CompletionStage other, Function fn);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //is executed using the supplied executor, 
    //with the corresponding result as argument to the supplied function.
    //@param other the other CompletionStage
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage applyToEitherAsync(CompletionStage other, Function fn, Executor executor); 
    ...
}

(5)没传参也没返回值的方法

没传参也没返回值的方法就是:当前方法不依赖上一个异步任务的执行结果,只要上一个异步任务执行完成就执行当前方法,并且当前方法会产生一个新的没有返回值的CompletionStage对象,没传参也没返回值的方法都包含Run关键字


一.依赖单个CompletionStage任务完成

thenRun()方法只要上一个阶段的任务执行完成后,便立即执行指定action。thenRunAsync()表示采用ForkjoinPool.commonPool()线程池来执行action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this stage completes normally, executes the given action.
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage thenRun(Runnable action);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //executes the given action using this stage's default asynchronous execution facility.
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage thenRunAsync(Runnable action);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //executes the given action using the supplied Executor.
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage thenRunAsync(Runnable action, Executor executor);
    ...
}

二.依赖两个CompletionStage任务都完成

runAfterBoth()方法接收一个CompletionStage任务。该方法要保证两个CompletionStage任务都完成,再执行指定的action。action执行完成后会返回一个新的没有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this and the other given stage both complete normally, executes the given action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage runAfterBoth(CompletionStage other, Runnable action);

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //executes the given action using this stage's default asynchronous execution facility.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action);

    //Returns a new CompletionStage that, 
    //when this and the other given stage complete normally, 
    //executes the given action using the supplied executor.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor); 
    ...
}

三.依赖两个CompletionStage任务中的任何一个完成

runAfterEither()方法接收一个CompletionStage任务。它只需要保证两个任务中任意一个任务执行完成,即可执行指定的action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, executes the given action.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage runAfterEither(CompletionStage other, Runnable action);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //executes the given action using this stage's default asynchronous execution facility.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@return the new CompletionStage
    public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action);

    //Returns a new CompletionStage that, 
    //when either this or the other given stage complete normally, 
    //executes the given action using the supplied executor.
    //@param other the other CompletionStage
    //@param action the action to perform before completing the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor); 
    ...
}

(6)组合起来串行执行的方法

thenCompose()是多任务组合方法,它的作用是把两个CompletionStage任务进行组合达到串行执行的目的,也就是把第一个任务的执行结果作为参数传递给第二个任务执行。


thenCompose()方法有点类似于thenCombine()方法,但thenCompose()方法中的两个任务存在先后关系,而thenCombine()方法中的两个任务是并行执行的。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed with this stage as the argument to the supplied function.
    //@param fn the function returning a new CompletionStage
    //@param  the type of the returned CompletionStage's result
    //@return the CompletionStage
    public  CompletionStage thenCompose(Function<? super T, ? extends CompletionStage> fn);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using this stage's default asynchronous execution facility,
    //with this stage as the argument to the supplied function.
    //@param fn the function returning a new CompletionStage
    //@param  the type of the returned CompletionStage's result
    //@return the CompletionStage
    public  CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn);

    //Returns a new CompletionStage that, when this stage completes normally, 
    //is executed using the supplied Executor, 
    //with this stage's result as the argument to the supplied function.
    //@param fn the function returning a new CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the type of the returned CompletionStage's result
    //@return the CompletionStage
    public  CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor); 
    ...
}

public class ThenComposeExample {
    //下面使用supplyAsync()方法构建了一个异步带返回值的任务,返回值为"Compose Message";
    //接着使用thenCompose()方法组合另外一个任务,并把前面任务的返回值r作为参数传递给第二个任务
    //在第二个任务中同样使用supplyAsync()方法构建了一个新的任务将参数r转为大写
    //最后thenCompose()方法返回一个新的没有返回值的CompletionStage对象
    public static void main(String[] args) {
        CompletableFuture task1 = CompletableFuture.supplyAsync(() -> "Compose Message");
        CompletableFuture cf = task1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase()));
        System.out.println(cf.get());
      
        //或者采用Fluent风格来写
        //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Compose Message")
        //     .thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase())
        //);
        //System.out.println(cf.get());
    }
}

(7)异常处理方法

上述介绍的方法都是CompletionStage任务正常执行时的处理方法。如果依赖的前一个任务出现异常,那么会导致后续的任务无法正常执行。比如下述代码,如果前置任务cf出现异常,那么会影响后置任务的执行。

public class RunAfterBothExample {
    public static void main(String[] args) {
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Exception");
        }).runAfterBoth(CompletableFuture.supplyAsync(() -> "Message"), () -> {
            System.out.println("Done");
        });
        System.out.println(cf.get());
    }
}

CompletionStage提供了3类异常处理的方法。


一.whenComplete()方法

whenComplete()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都能触发执行指定action,最后返回一个没返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage with the same result or exception as this stage, 
    //that executes the given action when this stage completes.
    //@param action the action to perform
    //@return the new CompletionStage
    public CompletionStage whenComplete(BiConsumer action);

    //Returns a new CompletionStage with the same result or exception as this stage, 
    //that executes the given action using this stage's default asynchronous execution facility when this stage completes.
    //@param action the action to perform
    //@return the new CompletionStage
    public CompletionStage whenCompleteAsync(BiConsumer action);

    //Returns a new CompletionStage with the same result or exception as this stage, 
    //that executes the given action using the supplied Executor when this stage completes.
    //@param action the action to perform
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletionStage
    public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor); 
    ...
}

二.handle()方法

handle()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都会执行其中的函数fn,最后返回一个有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this stage completes either normally or exceptionally, 
    //is executed with this stage's result and exception as arguments to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage handle(BiFunction fn);

    //Returns a new CompletionStage that, 
    //when this stage completes either normally or exceptionally, 
    //is executed using this stage's default asynchronous execution facility, 
    //with this stage's result and exception as arguments to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage handleAsync(BiFunction fn);

    //Returns a new CompletionStage that, 
    //when this stage completes either normally or exceptionally, 
    //is executed using the supplied executor, 
    //with this stage's result and exception as arguments to the supplied function.
    //@param fn the function to use to compute the value of the returned CompletionStage
    //@param executor the executor to use for asynchronous execution
    //@param  the function's return type
    //@return the new CompletionStage
    public  CompletionStage handleAsync(BiFunction fn, Executor executor); 
    ...
}

public class HandleExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Exception");
        }).handleAsync((r, th) -> {
           return th != null ? "出现异常" : "正常执行";
        });
        System.out.println(cf.get());
    }
}

三.exceptionally()方法

exceptionally()方法接收一个函数fn,当上一个CompletionStage任务出现异常时,会把该异常作为参数传递给fn,最后返回一个有返回值的CompletionStage对象。

public interface CompletionStage {
    ...
    //Returns a new CompletionStage that, 
    //when this stage completes exceptionally, 
    //is executed with this stage's exception as the argument to the supplied function.
    //Otherwise, if this stage completes normally, 
    //then the returned stage also completes normally with the same value.
    //@param fn the function to use to compute the value of the returned CompletionStage if this CompletionStage completed exceptionally
    //@return the new CompletionStage
    public CompletionStage exceptionally(Function fn);
    ...
}

public class ExceptionallyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Exception");
        }).exceptionally(e -> {
            log.error(e);
            return "ExceptionallyExample";
        });
        System.out.println(cf.get());
    }
}


6.CompletableFuture的实现原理分析

(1)CompletableFuture实现回调的例子

(2)CompletableFuture如何存储任务

(3)Completion的几个实现类

(4)Completion的栈结构存储回调任务

(5)Completion中的回调任务的执行和总结


(1)CompletableFuture实现回调的例子

CompletableFuture实现了Future接口CompletionStage接口CompletionStage接口为CompletableFuture提供了丰富的异步回调接口,CompletableFuture可以使用这些接口来实现复杂的异步计算工作。


下面是一个使用CompletableFuture回调的例子。其中构建了两个CompletionStage任务,第一个任务是返回"thenAccept message"字符串,第二个任务是打印第一个任务的返回值。注意:Accept关键字有传参没有返回值Run关键字没传参没返回值


这两个任务建立了串行执行的关系,第二个任务相当于第一个任务执行结束后的异步回调,并且多个CompletionStage任务可以使用链式风格串联。

public class CompletionStageExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> "thenAccept message")
            .thenAcceptAsync((result) -> {
                System.out.println("第一个异步任务的返回值:" + result);
            });
        cf.get();
    }
}

(2)CompletableFuture如何存储任务

一.CompletableFuture的成员变量

CompletableFuture的成员变量只有两个:resultstack

public class CompletableFuture implements Future, CompletionStage {
    ...
    //表示CompletionStage任务的返回结果或者一个异常的封装对象AltResult
    volatile Object result;//Either the result or boxed AltResult
    
    //表示依赖操作栈的栈顶,链式调用中传递的任务都会被压入这个stack中
    volatile Completion stack;//Top of Treiber stack of dependent actions
    ...
}

二.表示具体执行任务的Completion

成员变量stack是一个存储Completion对象Treiber Stack结构Treiber Stack是一种基于CAS机制实现的无锁并发栈


Completion表示一个具体的执行任务。每个回调任务都会封装成Completion对象,然后放入Treiber Stack中。Completion中的成员变量next保存了栈中的下一个回调任务

public class CompletableFuture implements Future, CompletionStage {
    ...
    abstract static class Completion extends ForkJoinTask implements Runnable, AsynchronousCompletionTask {
        volatile Completion next;//Treiber stack link

        //Performs completion action if triggered, returning a dependent that may need propagation, if one exists.
        //@param mode SYNC, ASYNC, or NESTED
        abstract CompletableFuture tryFire(int mode);

        //Returns true if possibly still triggerable. Used by cleanStack.
        abstract boolean isLive();

        public final void run() {
            tryFire(ASYNC);
        }

        public final boolean exec() {
            tryFire(ASYNC);
            return true;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
      
        }
    }
    ...
}

(3)Completion的几个实现类

一.UniCompletion

当使用如thenRun()thenApply()等方法处理单个任务的,那么这些任务就会封装成UniCompletion对象


二.CoCompletion

当使用如thenCombine()applyToEither()等方法处理两个任务的,那么这些任务会封装成CoCompletion对象


三.Signaller

当使用如get()join()等方法处理任务时,那么调用方也会作为任务被封装成Signaller对象


(4)Completion的栈结构存储回调任务

以如下创建一个CompletableFuture任务为例,说明在CompletableFuture中是如何存储这些回调任务的。注意:该例子在Debug中没有发现baseFuture的成员变量stack的变化

public class CompletionStackExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //创建一个CompletableFuture任务对象
        CompletableFuture baseFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("开始执行入栈baseFuture的第一个异步任务");
                Thread.sleep(5000);
                System.out.println("第一个异步任务执行完毕");
            } catch (Exception e) {
            }
            return "BaseFuture";
        });
        System.out.println("主线程第一次打印");

        baseFuture.thenApply(r -> {
            System.out.println("开始执行入栈baseFuture的第二个异步任务");
            try {
                Thread.sleep(5000);
                System.out.println("第二个异步任务执行完毕");
            } catch (Exception e) {
            }
            return "Then Apply";
        });//输出结果中没有"Then Apply",因为没有任务使用"Then Apply"这个返回值
        System.out.println("主线程第二次打印");

        baseFuture.thenAccept(r -> {
            System.out.println("开始执行入栈baseFuture的第三个异步任务");
            try {
                Thread.sleep(5000);
                System.out.println("第三个异步任务执行完毕: " + r);
            } catch (Exception e) {
            }
        }).thenAccept(Void -> {
            System.out.println("baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务");
            try {
                Thread.sleep(5000);
                System.out.println("第三个异步任务的子任务执行完毕");
            } catch (Exception e) {
            }
        });
        System.out.println("主线程第三次打印");

        baseFuture.thenApply(r -> {
            System.out.println("开始执行入栈baseFuture的第四个异步任务");
            try {
                Thread.sleep(5000);
                System.out.println("第四个异步任务执行完毕");
            } catch (Exception e) {
            }
            return "Apply Message";
        }).thenAccept(r -> {
            System.out.println("baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务");
            try {
                Thread.sleep(5000);
                System.out.println("第四个异步任务的子任务执行完毕: " + r);
            } catch (Exception e) {
            }
        });

        System.out.println("主线程第四次打印");
        System.out.println("finish: " + baseFuture.get());
    }
    //输出的结果如下:
    //主线程第一次打印
    //开始执行入栈baseFuture的第一个异步任务
    //主线程第二次打印
    //主线程第三次打印
    //主线程第四次打印
    //第一个异步任务执行完毕
    //开始执行入栈baseFuture的第四个异步任务
    //开始执行入栈baseFuture的第三个异步任务
    //第四个异步任务执行完毕
    //第三个异步任务执行完毕: BaseFuture
    //baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务
    //baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务
    //第三个异步任务的子任务执行完毕
    //第四个异步任务的子任务执行完毕: Apply Message
    //开始执行入栈baseFuture的第二个异步任务
    //第二个异步任务执行完毕
    //finish: BaseFuture
}

一.第一阶段的Completion Stack结构

主线程第一次打印和第二次打印执行完成后,会创建如下图所示的结构。此时Completion类型是UniCompletion,因为thenApply()方法只接收一个任务。

二.第二阶段的Completion Stack结构

主线程第三次打印执行完成后,就会创建如下图所示的结构。


首先使用baseFuture.thenAccept()方法在baseFuture上增加一个回调,此时会把这个回调对应的Completion压入baseFuture的stack的栈顶


然后会产生一个新的CompletableFuture对象实例继续执行thenAccept(),由于这个新的CompletableFuture对象实例是在栈顶的Completion中产生的,因此在栈顶的Completion中会有一个dep属性指向这个新的对象实例


新的CompletableFuture对象中又调用thenAccept()构建一个回调任务,所以又会有一个新的Completion Stack结构

三.第三阶段的Completion Stack结构

主线程第四次打印执行完成后,就会创建如下图所示的结构。


首先是在baseFuture上使用thenApply()方法创建一个带有返回值的回调,这个回调对应的Completion同样会压入baseFuture的stack的栈顶。然后同样会创建一个新的CompletableFuture对象实例。接着在这个新的对象实例中继续使用thenAccept()方法添加另外一个回调,这个回调对应的Completion会压入新的CompletableFuture的stack的栈顶

(5)Completion中的回调任务的执行和总结

从Completion Stack的栈顶中逐个出栈来执行。


如果当前出栈的Completion存在一个子Completion Stack,那么就优先执行这一条链路的Completion任务。


CompletableFuture中的回调任务,是基于Completion来实现的。针对CompletionStage中不同类型的方法,Completion有不同的子类处理。


Completion表示一个具体的回调任务,这些Completion采用了一种Treiber Stack结构来存储。由于每个Completion都可能会产生新的CompletableFuture,所以整个结构看起来像一棵很深的树。


7.CompletableFuture的核心源码分析

(1)CompletableFuture的核心源码

(2)CompletableFuture对象的创建

(3)Completion Stack的构建

(4)Completion任务的执行流程

(5)Completion任务的执行结果获取

(6)总结


(1)CompletableFuture的核心源码

CompletableFuture的源码主要分四部分:

一.CompletableFuture对象的创建

二.Completion Stack的构建

三.get()方法获取任务处理结果时阻塞和唤醒线程

四.当前置任务执行完成后,Completion Stack的执行流程


(2)CompletableFuture对象的创建

假设使用supplyAsync()方法来创建一个CompletableFuture对象。那么在执行supplyAsync()方法时触发调用的asyncSupplyStage()方法中,便会使用线程池来执行一个由AsyncSupply()构造方法构建的任务,这个线程池默认情况下是由ForkJoinPool的commonPool()方法返回的。


线程池执行由AsyncSupply()构造方法构建的任务时,会调用AsyncSupply的run()方法来执行具体的任务。


在AsyncSupply的run()方法中:首先会使用f.get()获得Supplier这个函数式接口执行结果,然后通过执行CompletableFuture的completeValue()方法,把执行结果通过CAS设置到CompletableFuture的成员变量result中。最后调用CompletableFuture的postComplete()方法表示执行完成,该postComplete()方法会执行Completion Stack中的所有回调任务

//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().
//@param  the type of results supplied by this supplier
@FunctionalInterface
public interface Supplier {
    //Gets a result.
    //@return a result
    T get();
}

public class CompletableFuture implements Future, CompletionStage {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    
    //Returns a new CompletableFuture that is asynchronously completed by a task 
    //running in the ForkJoinPool#commonPool() with the value obtained by calling the given Supplier.
    //@param supplier a function returning the value to be used to complete the returned CompletableFuture
    //@param  the function's return type
    //@return the new CompletableFuture
    public static  CompletableFuture supplyAsync(Supplier supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    static  CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture d = new CompletableFuture();
        //使用线程池来执行一个由AsyncSupply()方法构建的任务
        e.execute(new AsyncSupply(d, f));
        //返回一个新的CompletableFuture对象
        return d;
    }
    
    static final class AsyncSupply extends ForkJoinTask implements Runnable, AsynchronousCompletionTask {
        CompletableFuture dep;
        Supplier fn;
        AsyncSupply(CompletableFuture dep, Supplier fn) {
            this.dep = dep;
            this.fn = fn;
        }
       
        public final Void getRawResult() {
            return null;
        }
        public final void setRawResult(Void v) {
            
        }
        public final boolean exec() {
            run();
            return true;
        }
       
        public void run() {
            CompletableFuture d;
            Supplier f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        //首先使用f.get()来获得Supplier这个函数式接口中的执行结果
                        //然后通过执行CompletableFuture的completeValue()方法,
                        //把执行结果设置到CompletableFuture的成员变量result中;
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务
                d.postComplete();
            }
        }
    }
    
    //Completes with a non-exceptional result, unless already completed.
    final boolean completeValue(T t) {
        return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
    }
    
    //Pops and tries to trigger all reachable dependents. Call only when known to be done.
    final void postComplete() {
        //On each step, variable f holds current dependents to pop and run.  
        //It is extended along only one path at a time, pushing others to avoid unbounded recursion.
        CompletableFuture f = this;
        Completion h;
        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture d;
            Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
    
    private static final sun.misc.Unsafe UNSAFE;
    private static final long RESULT;
    private static final long STACK;
    private static final long NEXT;
    static {
        try {
            final sun.misc.Unsafe u;
            UNSAFE = u = sun.misc.Unsafe.getUnsafe();
            Class k = CompletableFuture.class;
            RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
            STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
            NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));
        } catch (Exception x) {
            throw new Error(x);
        }
    }
    ...
}

(3)Completion Stack的构建

假设已经使用了CompletableFuture的supplyAsync()方法创建了源任务,接着需要使用CompletionStage的thenApply()等方法构建回调任务


源任务 -> Supplier接口的实现类对象(get()方法),回调任务 -> Function接口的实现类对象(apply()方法)。


CompletableFuture的thenApply()方法会触发执行uniApplyStage()方法。在uniApplyStage()方法中,首先会创建一个新的CompletableFuture对象,然后根据CompletableFuture的uniApply()方法判断源任务是否已经完成。如果源任务已经完成则不需要入栈,直接执行回调任务的apply()方法。如果源任务还没执行完成,才将回调任务封装为UniApply对象入栈


源任务还没执行完成的处理过程具体如下:首先把回调任务封装成一个UniApply对象,然后调用CompletableFuture的push()方法,把UniApply对象压入源任务所在CompletableFuture对象中的stack的栈顶,最后调用UniApply的tryFire()方法来尝试执行该回调任务。


注意:UniApply对象其实是一个Completion对象,因为UniApply类继承自UniCompletion类,而UniCompletion类又继承自Completion类。

//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
//@param  the type of the input to the function
//@param  the type of the result of the function
@FunctionalInterface
public interface Function {
    //Applies this function to the given argument.
    //@param t the function argument
    //@return the function result
    R apply(T t);
    ...
}
    
public class CompletableFuture implements Future, CompletionStage {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    
    public  CompletableFuture thenApply(Function fn) {
        return uniApplyStage(null, fn);
    }
    
    private  CompletableFuture uniApplyStage(Executor e, Function f) {
        if (f == null) throw new NullPointerException();
        //创建一个新的CompletableFuture对象
        CompletableFuture d =  new CompletableFuture();
        //根据CompletableFuture的uniApply()方法判断源任务是否已经完成
        //如果源任务已经完成,则不需要入栈
        if (e != null || !d.uniApply(this, f, null)) {
            //首先把回调任务f封装成一个UniApply对象
            UniApply c = new UniApply(e, d, this, f);
            //然后调用CompletableFuture的push()方法
            //把UniApply对象压入源任务所在的CompletableFuture对象中的stack的栈顶
            push(c);
            //最后调用UniApply的tryFire()方法来尝试执行该回调任务
            c.tryFire(SYNC);
        }
        return d;
    }
    
    final  boolean uniApply(CompletableFuture a, Function f, UniApply c) { 
        Object r;
        Throwable x;
        //如果任务还没完成(result == null),直接返回false
        if (a == null || (r = a.result) == null || f == null) {
            return false;
        }
        tryComplete: if (result == null) {
            //判断result是否为异常类型
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    //如果result是异常类型,则使用completeThrowable()方法处理,并返回true
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            //如果result不为空,任务已经执行完成,并且没有出现异常
            try {
                if (c != null && !c.claim()) {
                    return false;
                }
                //把源任务的执行结果s作为参数传给回调任务f
                //直接执行回调任务的apply()方法,并将结果设置到CompletableFuture对象的成员变量result中
                @SuppressWarnings("unchecked") S s = (S) r;
                completeValue(f.apply(s)); 
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }
    
    //Pushes the given completion (if it exists) unless done.
    final void push(UniCompletion c) {
        if (c != null) {
            while (result == null && !tryPushStack(c)) {
                lazySetNext(c, null); // clear on failure
            }
        }
    }
    
    final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);
    }
    
    static void lazySetNext(Completion c, Completion next) {
        UNSAFE.putOrderedObject(c, NEXT, next);
    }
    
    ...
    
    static final class UniApply extends UniCompletion {
        Function fn;
        UniApply(Executor executor, CompletableFuture dep, CompletableFuture src, Function fn) {
            super(executor, dep, src);
            this.fn = fn;
        }
        //尝试执行当前CompletableFuture中的Completion
        final CompletableFuture tryFire(int mode) {
            CompletableFuture d;
            CompletableFuture a;
            //执行CompletableFuture的uniApply()方法尝试执行回调任务
            if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) {
                return null;
            }
            dep = null;
            src = null;
            fn = null;
            //执行CompletableFuture的postFire()方法
            return d.postFire(a, mode);
        }
    }
    
    final CompletableFuture postFire(CompletableFuture a, int mode) {
        if (a != null && a.stack != null) {
            if (mode < 0 || a.result == null) {
                a.cleanStack();
            } else {
                a.postComplete();
            }
        }
        if (result != null && stack != null) {
            if (mode < 0) {
                return this;
            } else {
                postComplete();
            }
        }
        return null;
    }
    
    abstract static class UniCompletion extends Completion {
        Executor executor;//执行当前任务的线程池
        CompletableFuture dep;//构建当前任务的CompletableFuture对象
        CompletableFuture src;//指向源任务

        UniCompletion(Executor executor, CompletableFuture dep, CompletableFuture src) {
            this.executor = executor;
            this.dep = dep;
            this.src = src;
        }

        //判断是否使用单独的线程池来执行任务
        final boolean claim() {
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null) {
                    return true;
                }
                executor = null; // disable
                e.execute(this);
            }
            return false;
        }
        //判断任务是否存活
        final boolean isLive() {
            return dep != null;
        }
    }
}

(4)Completion任务的执行流程

一.CompletableFuture的postComplete()方法

CompletableFuture中的任务完成后即源任务完成后,会通过
CompletableFuture.postComplete()方法
来完成后置逻辑,也就是把当前CompletableFuture.stack中存储的Completion逐项出栈执行


postComplete()方法会触发stack中所有可执行的回调任务Completion,该方法会遍历整个stack,并通过Completion任务的tryFire()方法来尝试执行。

public class CompletableFuture implements Future, CompletionStage {
    volatile Object result;
    volatile Completion stack;
    
    public static  CompletableFuture supplyAsync(Supplier supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    static  CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture d = new CompletableFuture();
        //使用线程池来执行一个由AsyncSupply()方法构建的任务
        e.execute(new AsyncSupply(d, f));
        //返回一个新的CompletableFuture对象
        return d;
    }
    
    static final class AsyncSupply extends ForkJoinTask implements Runnable, AsynchronousCompletionTask {
        CompletableFuture dep;
        Supplier fn;
        AsyncSupply(CompletableFuture dep, Supplier fn) {
            this.dep = dep;
            this.fn = fn;
        }
        ...
        public void run() {
            CompletableFuture d;
            Supplier f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        //首先使用f.get()来获得Supplier这个函数式接口中的执行结果
                        //然后通过执行CompletableFuture的completeValue()方法,
                        //把执行结果设置到CompletableFuture的成员变量result中;
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务
                d.postComplete();
            }
        }
    }
    
    final boolean completeValue(T t) {
        return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
    }
    
    final void postComplete() {
        CompletableFuture f = this;
        Completion h;
        //如果stack不为空,则不断循环从stack中出栈Completion任务
        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture d;
            Completion t;
            //通过CAS逐个取出stack中的Completion任务并重置stack
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    //表示h.tryFire()返回了另外一个CompleableFuture对象
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                //执行指定Completion的tryFire()方法,比如UniApply.tryFire()方法
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
    ...
}

二.Completion任务的执行流程图

(5)Completion任务的执行结果获取

可以通过get()或join()方法获取CompletableFuture的执行结果。当任务还没执行结束时(r == null),则调用waitingGet()方法进行阻塞等待。主要会先自旋256次判断执行是否结束,如果不是才挂起线程进行阻塞,从而避免直接挂起线程带来的性能开销。

public class CompletableFuture implements Future, CompletionStage {
    ...
    //Waits if necessary for this future to complete, and then returns its result.
    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
    
    //Returns raw result after waiting, or null if interruptible and interrupted.
    private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0 spins='(Runtime.getRuntime().availableProcessors()'> 1) ? 1 << 8 : 0 else if spins> 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0) {
                    --spins;
                }
            } else if (q == null) {
                q = new Signaller(interruptible, 0L, 0L);
            } else if (!queued) {
                queued = tryPushStack(q);
            } else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            } else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible) {
                    r = null; // report interruption
                } else {
                    Thread.currentThread().interrupt();
                }
            }
        }
        postComplete();
        return r;
    }
    ...
}

(6)总结

CompletableFuture的核心在于CompletionStage,CompletionStage提供了最基础的异步回调机制。也就是主线程不需要通过阻塞方式等待异步任务的执行结果,而是当异步任务执行完成主动通知触发执行下一个任务。此外,CompletionStage全部采用了函数式接口的方式来实现,可以通过链式的方式来对多个CompletionStage进行组合。

相关推荐

用闲置电脑当软路由安装OpenWRT(小白教程)

话说软路由系统OpenWRT用起来真是香,里面的好多功能都是普通路由无法实现的,由于众所周知的原因,在这里就不细说,等安装完自己体验吧。今天就介绍用一台闲置的电脑(自带两个网口)充当软路由,安装Ope...

一招把废旧路由器改成交换机(用旧路由器做交换机)

家里面的路由器用个几年,就会WIFI变卡,新路由器买回来,旧路由器就没什么用了?我在这里教大家把老路由器变成交换机。近两年新出的路由器,基本都是2个LAN口,接网络设备还需要买交换机,淘汰下来的路由器...

如何将PC电脑变成web服务器:将内网主机映射到外网实现远程访问

我是艾西,今天跟大家分享内容还是比较多人问的一个问题:如何将PC电脑变成web服务器。内网主机作为web服务器,内容包括本地内网映射、多层内网映射解决方案、绕过电信80端口封锁、DDNS功能的实现(非...

电脑怎么改Wi-Fi密码(电脑怎么改wifi密码视频教程)

一.电脑打开“任意浏览器ie/google浏览器等”——>地址栏里输入管理ip地址然后按“回车键”打开该地址,如下图所示。二.输入正确的管理员密码——>点击“登录”即可(下图是PC版本的路...

旧路由器不要扔,可当电脑无线网卡使用,你还不知道吧!

家里有旧路由器,卖二手又不值钱,扔了又可惜。想不到路由器还有以下这些功能:扩大Wifi覆盖范围;充当电脑无线网卡;把这个技巧学起来,提升网络冲浪的幸福感!导航栏路由器恢复出厂设置(通用教程)有线桥接无...

硬件大师AIDA64 5.60.3716更新下载:“认准”Win10

著名硬件测试工具AIDA64更新至5.60.3716Beta版,本次更新修复了Win10Build版本号检测错误问题,识别更准确。另外还添加了对ITEIT8738F传感器、ASRock主板、NVI...

互联网病毒木马与盗版软件流量产业链(一)

A.相关地下产业链整体深度分析可能很多用户都有这样的经历,就是不管打开什么网站,甚至根本就没有打开浏览器,都会跳出来一堆的弹窗广告。那么,这个用户要么是中的病毒木马,或者是使用了盗版软件。不管是...

穿越火线tenparty.dat文件损坏怎么办?

很多玩家在玩火线的时候经常会因弹出错误代码,而被退出游戏。下面就教大家一些常见错误代码的解决方案。方法/步骤1SX提示码提示说明:您的电脑出现1,xxx,0(xxx代表任意数字)提示码,存在游...

办公小技巧015:如何关闭Windows Defender安全中心

WindowsDefenderWindowsDefender是Widows中自带杀毒软件,可以检测及清除潜藏在操作系统里的间谍软件及广告软件。为电脑提供最高强度的安全防护,也被誉为Windows的...

Win7/8.1/10团灭:微软发现严重漏洞

据外媒报道称,微软已经停止为Windows7发布新的安全更新了,理由是IE存在严重漏洞。存在严重漏洞的IE按照微软的说法,这个远程代码执行漏洞存在于IE浏览器处理脚本引擎对象的内存中。该漏洞可能以一...

WinCC flexible 2008 SP4 的安装步骤及系统要求

1、软件安装过程安装注意事项(必须严格遵守):软件仅支持以下操作系统(必须是微软原版的操作系统,Ghost版系统不支持,如番茄花园、雨林木风、电脑城装机版等):WinCCflexible2008...

Windows三方杀毒防护软件可能问题以及使用建议

在处理ECSWindows相关案例中,我们遇到很多奇怪的操作系统问题,例如软件安装失败,无法激活操作系统,无法访问本地磁盘,网络访问受到影响,系统蓝屏,系统Hang等,排查发现这与客户安装的各类杀...

杀毒软件被指泄露个人隐私(杀毒软件查出来一定是毒吗)

最近的多篇报道显示,你使用的杀毒软件在监视着你,而不仅仅是你计算机上的文件。2014年的一项研究使用虚拟机监视了杀毒软件产品向企业发送了什么信息。他们发现,所有测试的杀毒软件都给电脑分配了一个唯一的识...

开源杀毒软件ClamAV在推出约20年后终于到达1.0版本

ClamAV是一个开源的反病毒引擎,用于检测木马、病毒、恶意软件和其他恶意威胁。与商业Windows反恶意软件程序相比,它的检测水平相当低,但开发工作已经持续了几十年。该工具可用于所有平台,尽管它主要...

【Excel函数使用】时分秒时间怎么转换成秒?(二)

本节主要分享的函数是IFERROR和NUMBERVALUE上回我们用MID和FIND函数已经将数值提取出来,但是一些错误的返回值显示“#VALUE!”,此时我们需要检验错误返回值,并将错误值返回指定值...

取消回复欢迎 发表评论: