【JUC】异步编程

异步回调 CompletableFuture

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调的方式在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

  • Future 接口不是通过回调的方式,而是使用 get() 方法阻塞判断业务是否执行完毕,所以本质还是同步
  • CompletableFuture接口通过回调的方式,并不会阻塞等待业务执行完毕,而是在业务执行完毕后回调通知主线程运行结果,这才是真正的异步。

image-20210917164341898

CompletableFuture实现了 Future, CompletionStage接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture

  • runAsync():调用没有返回值方法,主线程调用 get()方法时会阻塞(这种方式和 Future相似,本质还是同步)
  • supplyAsync():调用有返回值方法(回调的方式得到运行结果,程序不会阻塞,真正的异步)

具体案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 异步调用和同步调用
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 本质上是同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
});
// 尽管会创建另一个线程执行任务,但是主线程仍然会阻塞在get()方法处直到任务完成
completableFuture1.get();

// 消息队列
// 异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture2");
// 模拟异常
int i = 10/0;
return 1024;
});

// 回调的方式异步执行
completableFuture2.whenComplete((t,u)->{
System.out.println("------t="+t);
System.out.println("------u="+u);
}).exceptionally(f ->
{ System.out.println("------exception="+f.getMessage);
return 4444;})
.get();
}
}

whenComplete()方法的源码为:

1
2
3
4
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

其中,t为返回结果,u为异常信息(没有异常时为null)

Future 与 CompletableFuture

对比这两种方法,一个为同步一个为异步。

Futrue在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future里面有 isDone() 方法来判断任务是否处理结束,还有 get() 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成

CompletableFuture相比, Future的缺点:

(1)不支持手动完成

我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成

(2)不支持进一步的非阻塞调用

通过 Futureget() 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future不支持回调函数,所以无法实现这个功能

(3)不支持链式调用

对于 Future的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline调用,这在 Future中是没法实现的。

(4)不支持多个 Future 合并

比如我们有 10 个 Future并行执行,我们想在所有的 Future运行完毕之后,执行某些函数,是没法通过 Future实现的。

(5)不支持异常处理

Future的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的

创建 CompletableFuture 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为线程池执行异步代码。

  • runAsync方法用于没有返回值的任务,它以 Runnable 函数式接口类型为参数,所以CompletableFuture的计算结果为空。
  • supplyAsync方法用于有返回值的任务,以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为 U

计算结果完成时的处理

CompletableFuture 的计算结果完成,或者抛出异常的时候,有如下四个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
  • exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同。

除了上述四个方法之外,一组handle方法也可用于处理计算结果。当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。因此这组方法兼有whenComplete和转换的两个功能。

1
2
3
4
5
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn);

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn);

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor);

进行转换

我们可以将操作串联起来,或者将CompletableFuture组合起来。关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。

1
2
3
4
5
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);

public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);

public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果作为新的CompletableFuture计算结果。因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>

需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。

消费

上面的方法是当计算完成的时候,会生成新的计算结果 (thenApply, handle),或者返回同样的计算结果whenComplete。我们可以在每个CompletableFuture 上注册一个操作,该操作会在 CompletableFuture 完成执行后调用它。

1
2
3
4
5
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

CompletableFuture 通过 thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值做参数,只对结果执行Action,而不返回新的计算值,因此计算值为空:

下面一组方法当计算完成的时候会执行一个Runnable,与thenAccept不同,Runnable并不使用CompletableFuture计算的结果。

1
2
3
4
5
public CompletableFuture<Void> thenRun(Runnable action);

public CompletableFuture<Void> thenRunAsync(Runnable action);

public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

组合

通常,我们会有多个需要独立运行但又有所依赖的的任务。比如先等用于的订单处理完毕然后才发送邮件通知客户。

thenCompose 方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。你可以创建两个CompletableFutures 对象,对第一个 CompletableFuture 对象调用thenCompose ,并向其传递一个函数。当第一个CompletableFuture 执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个 CompletableFuture 的返回做输入计算出的第二个 CompletableFuture 对象。

1
2
3
4
5
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor);

多任务组合

  • allOf:等待所有任务完成
  • anyOf:只要有一个任务完成就可以
1
2
3
CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);

CompletableFuture<Void> anyOf = CompletableFuture.anyOf(future01, future02, future03);

异步编排案例

本章介绍使用异步编排的具体案例,具体业务见文章 【Project】云商城

业务介绍

image-20201109080935340

需求分析:通过 skuId 查询出商品的相关信息,图片、标题、价格,属性对应版本等等。在点击商城项目中的详情页后,前端将发出请求查询指定 skuId 的各种商品信息,包括:

  1. 当前 SKU 基本信息
  2. 当前 SKU 的图片信息
  3. 当前 SKU 所属的 SPU 的所有销售属性组合,展示在界面上
  4. 当前 KUS 所属的 SPU 的介绍信息
  5. SPU 的规格参数(基本属性)信息

其中,查询 3/4/5 前需要先完成查询 1,因为三者都需要 SKU 的信息,同时三者之间是没有依赖关系的,完全可以并行查询节省时间。查询 2 则和其余的四条查询没有任何依赖关系,也可以并行查询。

首先是没有异步的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public SkuItemVo item(Long skuId) {
SkuItemVo skuItemVo = new SkuItemVo();

// 1. 当前sku的基本信息获取,从pms_sku_info表中查
SkuInfoEntity skuInfoEntity = this.getById(skuId);
skuItemVo.setInfo(skuInfoEntity);

// 准备几个关键的id在下面进行查询
Long catalogId = skuInfoEntity.getCatalogId();
Long spuId = skuInfoEntity.getSpuId();

// 2. 当前sku的图片信息,从pms_sku_images表中查
List<SkuImagesEntity> imagesEntities = imagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);

// 3. 获取当前sku所属的spu的所有销售属性组合,展示在界面上
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(spuId);
skuItemVo.setSaleAttr(saleAttrVos);

// 4. 获取当前sku所属的spu的介绍信息,从pms_spu_info_desc表中查
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId);
skuItemVo.setDesc(spuInfoDescEntity);

// 5. 获取spu的规格参数(基本属性)信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(spuId, catalogId);
skuItemVo.setGroupAttrs(attrGroupVos);

return skuItemVo;
}

该版本的所有任务都是串行的,整条查询的耗时为五个查询的耗时累计和。下面将对该版本进行优化,使用异步编排优化查询效率。

自定义线程池

首先自定义线程池,并注入到 Spring 容器中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 加上下面的注解就不需要再在ConfigProperties类上加@Component注解了(加了反而会导致报错:重复注入同一个组件)
* @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
* 如果不加,也可以在入参中自动注入
*/
@Configuration
public class MyThreadConfig {

/**
* 向容器中注入一个自定义线程池,并使用配置文件中配置的参数
* @param pool 自动注入 ThreadPoolConfigProperties,其绑定了配置文件中的相关参数
* @return
*/
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(pool.getMaxSize(), pool.getMaxSize(), pool.getKeepAliveTime(),
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}
}

配置文件绑定类:

1
2
3
4
5
6
7
8
9
// 绑定配置文件中的 yunmall.thread 前缀
@Data
@Component // 如果上面加了 @EnableConfigurationProperties(xxx),这里就不能再加 @Component,否则会重复注入该组件
@ConfigurationProperties(prefix = "yunmall.thread")
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}

配置文件:

1
2
3
4
5
6
# 自定义线程池配置
yunmall:
thread:
core-size: 50
max-size: 200
keep-alive-time: 10

开启配置文件自动提示自定义的前缀:

1
2
3
4
5
6
<!--  加了以后,配置文件中就会有自定义的提示了 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

异步编排版本

使用 CompletableFuture 进行异步编排,将查询 3/4/5 作为查询 1 的子查询任务,改造后的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();

// 任务1:获取当前sku的基本信息(从pms_sku_info表中查)
// 该任务的的执行结果需要传给后面的三个子任务,所以需要设置为supplyAsync模式,为下面的三个任务提供info信息
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
SkuInfoEntity skuInfoEntity = this.getById(skuId);
skuItemVo.setInfo(skuInfoEntity);
// 后面的三个子任务需要用到该信息,所以要返回出去
return skuInfoEntity;
}, executor);

// 下面的三个子任务必须在任务1执行完毕后执行,并且相互之间是并行执行的,
// 三者都需要任务1提供的参数,并且自身不向外提供参数,所以使用acceptAsync模式

// 1.1 获取当前sku所属的spu的所有销售属性组合,展示在界面上
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(info -> {
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(info.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);

// 1.2 获取当前sku所属的spu的介绍信息,从pms_spu_info_desc表中查
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(info -> {
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(info.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);

// 1.3 获取spu的规格参数(基本属性)信息
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(info -> {
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(info.getSpuId(), info.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);


// 任务2和上面的四个任务都没关系
// 2. 当前sku的图片信息,从pms_sku_images表中查
CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = imagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);

// 阻塞等待所有任务都执行完毕才能返回skuItemVo,所以要用allOf().get()
CompletableFuture.allOf(saleAttrFuture, descFuture, baseAttrFuture, imagesFuture).get();

return skuItemVo;
}