背景
? ? 随着订单量的持续上升,商家端提供了商家接单、配送等一系列核心功能,业务对系统吞吐量的要求也越来越高。商家端 API 服务是流量入口,所有商家端流量都会由其调度、聚合,对外面向商家提供功能接口,对内调度各个下游服务获取数据进行聚合,具有鲜明的 I/O 密集型(I/O Bound)特点。同步加载弊端逐渐显现,因此考虑改为异步加载。
为何需要并行加载
? ? ? 外卖商家端 API 服务是典型的 I/O 密集型(I/O Bound)服务。除此之外,该服务的交易业务还有两个比较大的特点:
1、服务端必须一次返回订单卡片所有内容。需要从下游三十多个服务中获取数据。在特定条件下,如第一次登录和长时间没登录的情况下,客户端会分页拉取多个订单,这样发起的远程调用会更多。
2、商家端和服务端交互频繁。商家对订单状态变化敏感,多种推拉机制保证每次变更能够触达商家,导致 App 和服务端的交互频繁,每次变更需要拉取订单最新的全部内容。
并行加载的实现方式
? ? 并行从下游获取数据,从 IO 模型上来讲分为同步模型和异步模型。
同步模型
? ? ? 从各个服务获取数据最常见的是同步调用。在同步调用的场景下,接口耗时长、性能差,接口响应时长 T > T1+T2+T3+……+Tn,这时为了缩短接口的响应时间,一般会使用线程池的方式并行获取数据,商家端订单卡片的组装正是使用了这种方式。
产生的问题
1、CPU 资源大量浪费在阻塞等待上,导致 CPU 资源利用率低。在 Java 8 之前,一般会通过回调的方式来减少阻塞,但是大量使用回调,导致代码可读性和可维护性大大降低。
2、为了增加并发度,会引入更多额外的线程池,随着 CPU 调度线程数的增加,会导致更严重的资源争用,宝贵的 CPU 资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。
3、同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。
NIO 异步模型
主要通过以下两种方式来减少线程池的调度开销和阻塞时间:
● 通过 RPC NIO 异步调用的方式可以降低线程数,从而降低调度(上下文切换)开销。
● 通过引入 CompletableFuture(下文简称 CF)对业务流程进行编排,降低依赖之间的阻塞。
CompletableFuture 使用与原理
? ? ? ? CompletableFuture 是由 Java 8 引入的,之前一般通过 Future 实现异步。
● Future 用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8 之前若要设置回调一般会使用 guava 的ListenableFuture,回调的引入又会导致臭名昭著的回调地狱(下面的例子会通过 ListenableFuture 的使用来具体进行展示)。
● CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
? ? ? ? 使用 CompletableFuture 也是构建依赖树的过程。一个 CompletableFuture 的完成会触发另外一系列依赖它的 CompletableFuture 的执行。根据 CompletableFuture 依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。
零依赖
//1、使用 runAsync 或 supplyAsync 发起异步调用
CompletableFuture<String>cf1=CompletableFuture.supplyAsync(()->{
return“result1”;
},executor);
一元依赖:依赖一个 CF
CompletableFuture<String>cf3=cf1.thenApply(result1->{
//result1 为 CF1 的结果
//......
return“result3”;
});
二元依赖:依赖两个 CF
CompletableFuture<String>cf4=cf1.thenCombine(cf2, (result1,result2)->{
//result1 和 result2 分别为 cf1 和 cf2 的结果
return“result4”;
});
多元依赖:依赖多个 CF
CompletableFuture<Void>cf6=CompletableFuture.allOf(cf3,cf4,cf5);
CompletableFuture<String>result=cf6.thenApply(v->{
// 这里的 join 并不会阻塞,因为传给 thenApply 的函数是在 CF3、CF4、CF5 全部完成时,才会执行 。
result3=cf3.join();
result4=cf4.join();
result5=cf5.join();
// 根据 result3、result4、result5 组装最终 result;
return“result”;
});
CompletableFuture 原理
? ? ? ? CompletableFuture 中包含两个字段:result 和 stack。result 用于存储当前 CF的结果,stack(Completion)表示当前 CF 完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的 CF 的计算,依赖动作可以有多个(表示有多个依赖它的 CF),以栈(Treiber stack)的形式存储,stack 表示栈顶元素。这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion 子类中。
CompletableFuture 的设计思想
? ? ? 按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply 为例,不再枚举全部回调类型。
线程阻塞问题
? ? ? CompletableFuture 实现了 CompletionStage 接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。
1、同步方法(即不带 Async 后缀的方法)有两种情况。
● 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
● 如果注册时被依赖的操作还未执行完,则由回调线程执行。
2、异步方法(即带 Async 后缀的方法):可以选择是否传递线程池参数 Executor 运行在指定线程池中;当不传递 Executor 时,会使用 ForkJoinPool 中的共用线程池CommonPool(CommonPool 的大小是 CPU 核数 -1,如果是 IO 密集的应用,线程数可能成为瓶颈)。
线程池须知
1、异步回调要传线程池
? ? ? ? 异步回调方法可以选择是否传递线程池参数 Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。当不传递线程池时,会使用 ForkJoinPool 中的公共线程池 CommonPool,这里所有调用将共用该线程池,核心线程数 = 处理器数量 -1(单核核心线程数为 1),所有异步回调都会共用该 CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。
? ? ? ? 手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。
2、线程池循环引用会导致死锁
? ? ? 若出现父子任务 ,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。
3、异步 RPC 调用注意不要阻塞 IO 线程池
? ? ? 服务异步化后很多步骤都会依赖于异步 RPC 调用的结果,这时需要特别注意一点,如果是使用基于 NIO(比如 Netty)的异步 RPC,则返回结果是由 IO 线程负责设置的,即回调方法由 IO 线程触发,CompletableFuture 同步回调(如 thenApply、thenAccept 等无 Async 后缀的方法)。
? ? ? ? 如果依赖的同步 RPC 调用的返回结果,那么这些同步回调将运行在 IO 线程上,而整个服务只有一个 IO 线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO 线程将一直被占用,影响整个服务的响应。
异常处理
? ? ? 由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过 try\catch 捕获异常。CompletableFuture提供了异常捕获回调 exceptionally,相当于同步调用中的 try\catch。
其他
若涉及相关案例代码,请从以下链接查阅https://blog.csdn.net/qq_36010886/article/details/130403824