当前位置:科学 > 正文

天天快报!CompletableFuture实现异步编排全面分析和总结

2023-03-13 15:50:13  来源:搬山道猿

一、CompletableFuture简介

✔本文的名词缩写:

CF:代表CompletableFutureCS:代表CompletionStage

二、CompletableFuture 核心接口API介绍

2.1 Future

使用Future局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;没有异常处理:Future接口中没有关于异常处理的方法;

方法


(资料图片仅供参考)

说明

描述

boolean

cancel (boolean mayInterruptIfRunning)

尝试取消执行此任务。

V

get()

如果需要等待计算完成,然后检索其结果。

V

get(long timeout, TimeUnit unit)

如果需要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。

boolean

isCancelled()

如果此任务在正常完成之前取消,则返回 true 。

boolean

isDone()

如果此任务完成,则返回 true 。

2.2 CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

JDK1.8 才新加入的一个实现类CompletableFuture,而CompletableFuture实现了两个接口(如上面代码所示):Future<T>、CompletionStage<T>,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。

Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤Stage,这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

2.3 CompletionStage

CompletionStage<T>接口提供了更多方法来更好的实现异步编排,并且大量的使用了JDK8引入的函数式编程概念。由stage执行的计算可以表示为Function,Consumer或Runnable(使用名称分别包括apply 、accept或run的方法 ),具体取决于它是否需要参数和/或产生结果。 例如:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()); 

三、使用CompletableFuture场景

3.1 应用场景

1️⃣执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度;
2️⃣使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常;
3️⃣如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个。

举个常见的案例,在APP查询首页信息的时候,一般会涉及到不同的RPC远程调用来获取很多用户相关信息数据,比如:商品banner轮播图信息、用户message消息信息、用户权益信息、用户优惠券信息等,假设每个rpc invoke()耗时是250ms,那么基于同步的方式获取到话,算下来接口的RT至少大于1s,这响应时长对于首页来说是万万不能接受的,因此,我们这种场景就可以通过多线程异步的方式去优化。

3.2 CompletableFuture依赖链分析

根据CompletableFuture依赖数量,可以分为以下几类:零依赖、单依赖、双重依赖和多重依赖

零依赖

下图Future1、Future2都是零依赖的体现:

单依赖:仅依赖于一个CompletableFuture

下图Future3、Future5都是单依赖的体现,分别依赖于Future1和Future2:

双重依赖:同时依赖于两个CompletableFuture

下图Future4即为双重依赖的体现,同时依赖于Future1和Future2:

多重依赖:同时依赖于多个CompletableFuture

下图Future6即为多重依赖的体现,同时依赖于Future3、Future4和Future5:

类似这种多重依赖的流程来说,结果依赖于三个步骤:Future3、Future4、Future5,这种多元依赖可以通过allOf()或anyOf()方法来实现,区别是当需要多个依赖全部完成时使用allOf(),当多个依赖中的任意一个完成即可时使用anyOf(),如下代码所示:

CompletableFuture<Void> Future6 = CompletableFuture.allOf(Future3, Future4, Future5);CompletableFuture<String> result = Future6.thenApply(v -> {    //这里的join并不会阻塞,因为传给thenApply的函数是在Future3、Future4、Future5全部完成时,才会执行 。    result3 = Future3.join();    result4 = Future4.join();    result5 = Future5.join();        // 返回result3、result4、result5组装后结果    return assamble(result3, result4, result5);});

四、CompletableFuture异步编排

在分析CompletableFuture异步编排之前,我跟大家理清一下CompletionStage接口下 (thenRun、thenApply、thenAccept、thenCombine、thenCompose)、(handle、whenComplete、exceptionally)相关方法的实际用法和它们之间的区别是什么? 带着你的想法往下看吧!!!

4.1 《异步编排API》

thenRun:【执行】直接开启一个异步线程执行任务,不接收任何参数,回调方法没有返回值;thenApply:【提供】可以提供返回值,接收上一个任务的执行结果,作为下一个Future的入参,回调方法是有返回值的;thenAccept:【接收】可以接收上一个任务的执行结果,作为下一个Future的入参,回调方法是没有返回值的;thenCombine:【结合】可以结合不同的Future的返回值,做为下一个Future的入参,回调方法是有返回值的;thenCompose:【组成】将上一个Future实例结果传递给下一个实例中。

✔异步回调建议使用自定义线程池

/** * 线程池配置 * * @author: austin * @since: 2023/3/12 1:32 */@Configurationpublic class ThreadPoolConfig {    /**     * @Bean中声明的value不能跟定义的实例同名     *     */    @Bean(value = "customAsyncTaskExecutor")    public ThreadPoolTaskExecutor asyncThreadPoolExecutor() {        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();        threadPoolTaskExecutor.setCorePoolSize(5);        threadPoolTaskExecutor.setMaxPoolSize(10);        threadPoolTaskExecutor.setKeepAliveSeconds(60);        threadPoolTaskExecutor.setQueueCapacity(2048);        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);        threadPoolTaskExecutor.setThreadNamePrefix("customAsyncTaskExecutor-");        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        return threadPoolTaskExecutor;    }    @Bean(value = "threadPoolExecutor")    public ThreadPoolExecutor threadPoolExecutor() {        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,                new ArrayBlockingQueue<>(10000), new ThreadPoolExecutor.CallerRunsPolicy());        return threadPoolExecutor;    }}

如果所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。通过自定义线程池customAsyncTaskExecutor,后面不同的异步编排方法,我们可以通过指定对应的线程池。

1️⃣ runAsync()、thenRun()

@RestControllerpublic class CompletableFutureCompose {    @Resource    private ThreadPoolTaskExecutor customAsyncTaskExecutor;    @RequestMapping(value = "/thenRun")    public void thenRun() {        CompletableFuture.runAsync(() -> {            System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");        }, customAsyncTaskExecutor).thenRun(() -> {            System.out.println("thread name:" + Thread.currentThread().getName() + " second step...");        }).thenRunAsync(() -> {            System.out.println("thread name:" + Thread.currentThread().getName() + " third step...");        });    }}

接口输出结果:

thread name:customAsyncTaskExecutor-1 first step...thread name:customAsyncTaskExecutor-1 second step...thread name:ForkJoinPool.commonPool-worker-3 third step...

2️⃣ thenApply()

@RequestMapping(value = "/thenApply")public void thenApply() {    CompletableFuture.supplyAsync(() -> {        System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");        return "hello";    }, customAsyncTaskExecutor).thenApply((result1) -> {        String targetResult = result1 + " austin";        System.out.println("first step result: " + result1);        System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);        return targetResult;    });}

接口输出结果:

thread name:customAsyncTaskExecutor-2 first step...first step result: hello// thenApply虽然没有指定线程池,但是默认是复用它上一个任务的线程池的thread name:customAsyncTaskExecutor-2 second step..., targetResult: hello austin

3️⃣ thenAccept()

@RequestMapping(value = "/thenAccept")public void thenAccept() {    CompletableFuture.supplyAsync(() -> {        System.out.println("thread name:" + Thread.currentThread().getName() + " first step...");        return "hello";    }, customAsyncTaskExecutor).thenAccept((result1) -> {        String targetResult = result1 + " austin";        System.out.println("first step result: " + result1);        System.out.println("thread name:" + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult);    });}

接口输出结果:

thread name:customAsyncTaskExecutor-3 first step...first step result: hello// thenAccept在没有指定线程池的情况下,并未复用它上一个任务的线程池thread name:http-nio-10032-exec-9 second step..., targetResult: hello austin

thenAccept()和thenApply()的用法实际上基本上一致,区别在于thenAccept()回调方法是没有返回值的,而thenApply()回调的带返回值的。

细心的朋友可能会发现,上面thenApply()和thenAccept()请求线程池在不指定的情况下,两者的不同表现,thenApply()在不指定线程池的情况下,会沿用上一个Future指定的线程池customAsyncTaskExecutor,而thenAccept()在不指定线程池的情况,并没有复用上一个Future设置的线程池,而是重新创建了新的线程来实现异步调用。

4️⃣ thenCombine()

@RequestMapping(value = "/thenCombine")public void thenCombine() {    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {        System.out.println("执行future1开始...");        return "Hello";    }, asyncThreadPoolExecutor);    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {        System.out.println("执行future2开始...");        return "World";    }, asyncThreadPoolExecutor);    future1.thenCombine(future2, (result1, result2) -> {        String result = result1 + " " + result2;        System.out.println("获取到future1、future2聚合结果:" + result);        return result;    }).thenAccept(result -> System.out.println(result));}

接口访问,打印结果:

thread name:customAsyncTaskExecutor-4 执行future1开始...thread name:customAsyncTaskExecutor-5 执行future2开始...thread name:http-nio-10032-exec-8 获取到future1、future2聚合结果:Hello WorldHello World复制代码

5️⃣ thenCompose()

我们先有future1,然后和future2组成一个链:future1 -> future2,然后又组合了future3,形成链:future1 -> future2 -> future3。这里有个隐藏的点:future1、future2、future3它们完全没有数据依赖关系,我们只不过是聚合了它们的结果。

@RequestMapping(value = "/thenCompose")public void thenCompose() {    CompletableFuture.supplyAsync(() -> {        // 第一个Future实例结果        System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future1开始...");        return "Hello";    }, customAsyncTaskExecutor).thenCompose(result1 -> CompletableFuture.supplyAsync(() -> {        // 将上一个Future实例结果传到这里        System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future2开始..., 第一个实例结果:" + result1);        return result1 + " World";    })).thenCompose(result12 -> CompletableFuture.supplyAsync(() -> {        // 将第一个和第二个实例结果传到这里        System.out.println("thread name:" + Thread.currentThread().getName() + " 执行future3开始..., 第一第二个实现聚合结果:" + result12);        String targetResult = result12 + ", I am austin!";        System.out.println("最终输出结果:" + targetResult);        return targetResult;    }));}

接口访问,打印结果:

thread name:customAsyncTaskExecutor-1 执行future1开始...thread name:ForkJoinPool.commonPool-worker-3 执行future2开始..., 第一个实例结果:Hellothread name:ForkJoinPool.commonPool-worker-3 执行future3开始..., 第一第二个实现聚合结果:Hello World最终输出结果:Hello World, I am austin!

4.2 《CompletableFuture实例化创建》

// 返回一个新的CompletableFuture,由线程池ForkJoinPool.commonPool()中运行的任务异步完成,不会返回结果。public static CompletableFuture<Void> runAsync(Runnable runnable);// 返回一个新的CompletableFuture,运行任务时可以指定自定义线程池来实现异步,不会返回结果。public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);// 返回由线程池ForkJoinPool.commonPool()中运行的任务异步完成的新CompletableFuture,可以返回异步线程执行之后的结果。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

CompletableFuture有两种方式实现异步,一种是supply开头的方法,一种是run开头的方法:

supply开头:该方法可以返回异步线程执行之后的结果;run开头:该方法不会返回结果,就只是执行线程任务。

4.3 《获取CompletableFuture结果》

public  T   get()public  T   get(long timeout, TimeUnit unit)public  T   getNow(T valueIfAbsent)public  T   join()public CompletableFuture<Object> allOf()public CompletableFuture<Object> anyOf()

使用方式,演示:

CompletableFuture<Integer> future = new CompletableFuture<>();Integer integer = future.get();
get()阻塞式获取执行结果,如果任务还没有完成则会阻塞等待知直到任务执行完成get(long timeout, TimeUnit unit):带超时的阻塞式获取执行结果getNow():如果已完成,立刻返回执行结果,否则返回给定的valueIfAbsentjoin():该方法和get()方法作用一样, 不抛异常的阻塞式获取异步执行结果allOf():当给定的所有CompletableFuture都完成时,返回一个新的CompletableFutureanyOf():当给定的其中一个CompletableFuture完成时,返回一个新的CompletableFuture

4.4 《结果处理》

当使用CompletableFuture异步调用计算结果完成、或者是抛出异常的时候,我们可以执行特定的Action做进一步处理,比如:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

4.5 《异常处理》

使用CompletableFuture编写代码时,异常处理很重要,CompletableFuture提供了三种方法来处理它们:handle()、whenComplete() 和 exceptionly()

handle:返回一个新的CompletionStage,当该阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行,不会将内部异常抛出whenComplete:返回与此阶段具有相同结果或异常的新CompletionStage,该阶段在此阶段完成时执行给定操作。与方法handle不同,会将内部异常往外抛出exceptionally:返回一个新的CompletableFuture,CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try/catch。
@Autowiredprivate RemoteDictService remoteDictService;public CompletableFuture<Dict> getDictDataAsync(long dictId) {    CompletableFuture<DictResult> resultFuture = remoteDictService.findDictById(dictId);    // 业务方法,内部会发起异步rpc调用    return resultFuture            .exceptionally(error -> {                //通过exceptionally捕获异常,打印日志并返回默认值                log.error("RemoteDictService.getDictDataAsync Exception dictId = {}", dictId, error);                return null;            });}

handle() VS whenComplete(), 两者之间的区别

核心区别在于whenComplete不消费异常,而handle消费异常

翻译过来就是:

两种方法形式支持处理触发阶段是否 正常完成 或 异常完成:

whenComplete:可以访问当前CompletableFuture的 结果异常作为参数,使用它们并执行您想要的操作。此方法并不能转换完成的结果,会内部抛出异常handle:当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数来执行。当此阶段完成时,以 该阶段的结果该阶段的异常作为参数调用给定函数,并且函数的结果用于完成返回的阶段,不会把异常外抛出来

这里我通过代码演示一下:

public class CompletableFutureExceptionHandler {    public static CompletableFuture handle(int a, int b) {        return CompletableFuture.supplyAsync(() -> a / b)                .handle((result, ex) -> {                    if (null != ex) {                        System.out.println("handle error: " + ex.getMessage());                        return 0;                    } else {                        return result;                    }                });    }    public static CompletableFuture whenComplete(int a, int b) {        return CompletableFuture.supplyAsync(() -> a / b)                .whenComplete((result, ex) -> {                    if (null != ex) {                        System.out.println("whenComplete error: " + ex.getMessage());                    }                });    }    public static void main(String[] args) {        try {            System.out.println("success: " + handle(10, 5).get());            System.out.println("fail: " + handle(10, 0).get());        } catch (Exception e) {            System.out.println("catch exception= " + e.getMessage());        }        System.out.println("------------------------------------------------------------------");        try {            System.out.println("success: " + whenComplete(10, 5).get());            System.out.println("fail: " + whenComplete(10, 0).get());        } catch (Exception e) {            System.out.println("catch exception=" + e.getMessage());        }    }}

运行结果如下显示

success: 2handle error: java.lang.ArithmeticException: / by zerofail: 0------------------------------------------------------------------success: 2whenComplete error: java.lang.ArithmeticException: / by zerocatch exception=java.lang.ArithmeticException: / by zero

✔可以看到,handle处理,当程序发生异常的时候,即便是catch获取异常期望输出,但是并未跟实际预想那样,原因是handle不会把内部异常外抛出来,而whenComplete会将内部异常抛出。

五、CompletableFuture线程池须知

六、基于CompletableFuture实现接口异步revoke

案例实现Controller层

@RestController@RequestMapping("/index")public class IndexWebController {    @Resource    private ThreadPoolExecutor asyncThreadPoolExecutor;    @RequestMapping(value = "/homeIndex", method = {RequestMethod.POST, RequestMethod.GET})    public String homeIndex(@RequestParam(required = false) String userId, @RequestParam(value = "lang") String lang) {        ResultData<HomeVO> result = new ResultData<>();        // 获取Banner轮播图信息        CompletableFuture<List<BannerVO>> future1 = CompletableFuture.supplyAsync(() -> this.buildBanners(userId, lang), asyncThreadPoolExecutor);        // 获取用户message通知信息        CompletableFuture<NotificationVO> future2 = CompletableFuture.supplyAsync(() -> this.buildNotifications(userId, lang), asyncThreadPoolExecutor);        // 获取用户权益信息        CompletableFuture<List<BenefitVO>> future3 = CompletableFuture.supplyAsync(() -> this.buildBenefits(userId, lang), asyncThreadPoolExecutor);         // 获取优惠券信息        CompletableFuture<List<CouponVO>> future4 = CompletableFuture.supplyAsync(() -> this.buildCoupons(userId), asyncThreadPoolExecutor);                CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futrue1, futrue2, futrue3, future4);        HomeVo finalHomeVO = homeVO;        CompletableFuture<HomeVO> resultFuture = allOfFuture.thenApply(v -> {            try {                finalHomeVo.setBanners(future1.get());                finalHomeVo.setNotifications(future2.get());                finalHomeVo.setBenefits(future3.get());                finalHomeVo.setCoupons(future4.get());                return finalHomeVO;            } catch (Exception e) {                logger.error("[Error] assemble homeVO data error: {}", e);                throw new RuntimeException(e);            }        });        homeVO = resultFuture.join();        result.setData(homeVO);        return writeJson(result);    }}

Service层

@SneakyThrowspublic List<BannerVO> buildBanners(String userId, String lang) {    // 模拟请求耗时0.5秒    Thread.sleep(500);    return new List<BannerVO>();}@SneakyThrowspublic List<NotificationVO> buildNotifications(String userId, String lang) {    // 模拟请求耗时0.5秒    Thread.sleep(500);    return new List<NotificationVO>();}@SneakyThrowspublic List<BenefitVO> buildBenefits(String userId, String lang) {    // 模拟请求耗时0.5秒    Thread.sleep(500);    return new List<BenefitVO>();}@SneakyThrowspublic List<CouponVO> buildCoupons(String userId) {    // 模拟请求耗时0.5秒    Thread.sleep(500);    return new List<CouponVO>();}

六、异步化带来的性能提升

通过异步化改造,原本同步获取数据的API性能得到明显提升,大大减少了接口的响应时长(RT)。接口的吞吐量大幅度提升。

七、总结

关键词:

推荐阅读

北京上空现三个太阳 古代幻日现象预兆什么?

北京上空现三个太阳北京上空现三个太阳 专家释疑今日登上热搜,主要是在12月29日有网友拍到北京上空出现了三个太阳。对于这一现象气象专家 【详细】

十大名车车标 世界十大名车车标简介

十大名车车标 世界十大名车车标简介很多爱车人士对于车标是十分熟悉的,基本可以做到看一眼就知道是哪个品牌的车,世界名车更是如此,许多 【详细】

塑料袋属于什么 四种垃圾分类简介

塑料袋属于什么塑料袋是干垃圾。湿垃圾是指易腐烂的垃圾,通常是厨房垃圾。塑料袋不容易腐烂降解,是干垃圾。就是我们常说的白色污染,所以 【详细】

特斯拉的最低价是多少? 其他车型的最低价格是多少?

特斯拉作为一个豪华电动车品牌,你知道特斯拉价格多少钱一辆吗?目前特斯拉销售的主要Model S、Model X以及国产Model 3,那么,特拉斯最 【详细】

通用设备介绍 通用设备包括什么?

通用设备介绍一、通用设备。办公和商务通用设备,包括文化办公机械、消防设备、电机、变压器、锅炉、空调设备、清洁卫生设备、通讯设备、视 【详细】

关于我们  |  联系方式  |  免责条款  |  招聘信息  |  广告服务  |  帮助中心

联系我们:85 572 98@qq.com备案号:粤ICP备18023326号-40

科技资讯网 版权所有