我们先给出同步和异步的概念
查询商品详情页的逻辑非常复杂,此时数据的获取涉及到多个RPC远程调用,那么必然需要花费更多的时间。假如商品详情页的查询。
Java在JDK1.5之后引入了JUC包,里面包含了一个接口:Future,这算是Java中实现异步编程的开山鼻祖, 然而,Future
的局限性在于它的功能相对简单,无法很好地处理复杂的异步任务链。Future 的局限性在这里给出,有兴趣的朋友可以具体去了解,这里不重点说Future。
CompletableFuture
随着 Java 版本的演进,**CompletableFuture**
** **在 JDK 8 中引入,提供了更强大和灵活的异步编程支持。它不仅可以用来表示异步计算的结果,还提供了许多方便的方法来处理异步任务的执行和结果处理。以下是 CompletableFuture
的一些主要用途:
CompletableFuture
可以表示一个异步计算的结果,允许在计算完成后继续执行其他操作。例如,CompletableFuture.supplyAsync
可以异步执行一个任务并返回结果。Future
需要使用 get
方法阻塞等待结果不同,CompletableFuture
提供了非阻塞的方法,例如 thenApply
、thenAccept
和 thenRun
,允许在计算完成后执行回调函数。CompletableFuture
提供了方法来组合多个异步任务,例如 thenCombine
、thenCompose
和 allOf
等,允许对多个异步任务进行组合操作。CompletableFuture
提供了方法来处理异步计算中的异常,例如 exceptionally
和 handle
。CompletableFuture
允许构建复杂的异步流,简化异步编程模型,提高代码的可读性和可维护性。CompletableFuture学习
下面我们在具体学习一下CompletableFuture的使用
:runAsync
方法用于启动一个没有返回值的异步任务。该方法通常用于那些不需要返回结果的任务,例如记录日志、发送通知等。supplyAsync
方法用于启动一个有返回值的异步任务。该方法通常用于需要返回结果的任务,例如计算结果、获取数据等。java 代码解读复制代码public class Demo01 {
public static void main(String[] args) throws Exception {
supplyAsync();
System.out.println("main来了");
SleepUtils.sleep(3);
}
//发起一个异步请求
public static void runAsync() {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"你好runAsync");
SleepUtils.sleep(10);
}
});
}
//发起一个异步请求
public static void supplyAsync() throws Exception {
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
SleepUtils.sleep(2);
return "java0518";
}
});
//这是一个阻塞方法
System.out.println(Thread.currentThread().getName()+"--->:"+supplyFuture.get());
}
}
CompletableFuture
的 get()
方法是一个阻塞方法,因为它会等待异步任务的完成并返回结果。如果异步任务尚未完成,调用 get()
的线程会被阻塞,直到任务完成或抛出异常。
whenComplete
then
方法。Void
)和异常(如果没有异常则为 null
)。exceptionally
catch
方法。 代码解读复制代码public class Demo02 {
public static void main(String[] args) throws Exception {
runAsync();
System.out.println("main来了");
SleepUtils.sleep(3);
}
//发起一个异步请求
public static void runAsync() {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"你好runAsync");
//int a =10/0;
}
}).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void acceptVal, Throwable throwable) {
System.out.println("执行异步之后whenComplete接受值"+acceptVal);
System.out.println("执行异步之后whenComplete异常值"+throwable);
}
}).exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable throwable) {
System.out.println("执行异步之后exceptionally异常值"+throwable);
return null;
}
});
}
}
whenComplete(同步)
:
whenCompleteAsync(异步)
:
代码解读复制代码public class Demo03 {
public static void main(String[] args) throws Exception {
//runAsync1();
runAsync2();
System.out.println("main来了");
SleepUtils.sleep(3);
}
//发起一个异步请求
public static void runAsync1() {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"你好runAsync");
}
}).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void acceptVal, Throwable throwable) {
System.out.println(Thread.currentThread().getName()+"异步之后接受值"+acceptVal);
}
});
}
//发起一个异步请求
public static void runAsync2() {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"你好runAsync");
}
}).whenCompleteAsync(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void acceptVal, Throwable throwable) {
System.out.println(Thread.currentThread().getName()+"执行异步之后async接受值"+acceptVal);
}
});
}
}
thenAccept
:用于处理异步任务的结果,但不返回新的值。适合用于打印日志、执行操作等场景。thenApply
:用于处理异步任务的结果,并返回新的值。适合用于转换数据、链式处理等场景。java 代码解读复制代码public class Demo04 {
public static void main(String[] args) throws Exception {
supplyAsync();
System.out.println("main来了");
SleepUtils.sleep(8);
}
//发起一个异步请求
public static void supplyAsync() throws Exception {
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
SleepUtils.sleep(2);
return "java0518";
}
});
supplyFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName()+"第一个accept接受到的值"+acceptVal);
}
});
supplyFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName()+"第二个accept接受到的值"+acceptVal);
}
});
}
}java
代码解读复制代码public class Demo05 {
public static void main(String[] args) throws Exception {
supplyAsync();
System.out.println("main来了");
SleepUtils.sleep(8);
}
//发起一个异步请求
public static void supplyAsync() throws Exception {
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
SleepUtils.sleep(2);
return "java0518";
}
});
CompletableFuture<String> thenApply1 = supplyFuture.thenApply(new Function<String, String>() {
@Override
public String apply(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
return "apply1" + acceptVal;
}
});
CompletableFuture<String> thenApply2 = supplyFuture.thenApply(new Function<String, String>() {
@Override
public String apply(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
return "apply2" + acceptVal;
}
});
System.out.println(thenApply1.get());
System.out.println(thenApply2.get());
}
}
thenApply
:
CompletableFuture
的同一个线程或完成该 CompletableFuture
的线程来执行处理函数。thenApplyAsync
:
使用 ForkJoinPool.commonPool() 或者自定义的线程池来异步执行处理函数。
总是异步地执行处理函数,不管前面的阶段是否已经完成。
适用于需要异步执行、处理时间较长或需要非阻塞执行的任务。
执行上下文:thenApply
在同一个线程或调用线程中同步执行,而 thenApplyAsync
总是异步地执行,使用公共线程池或自定义的线程池。
适用场景:thenApply
适用于对执行时间要求不严格的短任务,thenApplyAsync
适用于需要非阻塞异步执行的长任务或需要使用特定线程池的任务。java
代码解读复制代码public class Demo06 {
public static void main(String[] args) throws Exception {
supplyAsync();
System.out.println("main来了");
SleepUtils.sleep(8);
}
//发起一个异步请求
public static void supplyAsync() throws Exception {
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
SleepUtils.sleep(2);
return "java0518";
}
});
CompletableFuture<String> thenApply1 = supplyFuture.thenApplyAsync(new Function<String, String>() {
@Override
public String apply(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
return "apply1" + acceptVal;
}
});
CompletableFuture<String> thenApply2 = supplyFuture.thenApplyAsync(new Function<String, String>() {
@Override
public String apply(String acceptVal) {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
return "apply2" + acceptVal;
}
});
System.out.println(thenApply1.get());
System.out.println(thenApply2.get());
}
}
runAsync
。supplyAsync
。whenComplete
或 whenCompleteAsync
。exceptionally
。thenAccept
。thenApply
或 thenApplyAsync
。根据是否需要异步执行选择同步或异步版本。带Async代表异步,异步就是启动另外线程去执行。有xxpply的就代表有返回值java
代码解读复制代码public class Demo07 {
public static void main(String[] args) throws Exception {
supplyAsync();
System.out.println("main来了");
SleepUtils.sleep(8);
}
//发起一个异步请求
public static void supplyAsync() throws Exception {
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
SleepUtils.sleep(2);
return "java0518";
});
CompletableFuture<String> thenApply1 = supplyFuture.thenApplyAsync(acceptVal->{
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
return "apply1" + acceptVal;
});
CompletableFuture<String> thenApply2 = supplyFuture.thenApplyAsync(acceptVal -> {
SleepUtils.sleep(2);
System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
return "apply2" + acceptVal;
});
System.out.println(thenApply1.get());
System.out.println(thenApply2.get());
}
}
我们对开头的例子运用线程池+CompletableFuture进行模拟编程在这里假设
代码解读复制代码import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AlbumDetailsDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
// 开始时间
long startTime = System.currentTimeMillis();
// 1. 获取商品的基本信息
CompletableFuture<Void> basicInfoFuture = CompletableFuture.runAsync(() -> {
sleep(1.5);//模拟远程调用feign
}, executor);
// 2. 获取商品统计信息
CompletableFuture<String> statsInfoFuture = CompletableFuture.supplyAsync(() -> {
sleep(0.5);//模拟远程调用feign
return "商品统计信息";
}, executor);
// 3. 获取商品分类信息 依赖于2的查询结果
CompletableFuture<Void> categoryInfoFuture = statsInfoFuture.thenAcceptAsync(statsInfoFutureInfo -> {
System.out.println("拿到商品统计信息"+statsInfoFutureInfo);
sleep(1.0);//模拟远程调用feign
}, executor);
// 4. 获取商品基本信息 依赖于2的查询结果
CompletableFuture<Void> userInfoFuture = statsInfoFuture.thenAcceptAsync(statsInfoFutureInfo -> {
System.out.println("拿到商品统计信息"+statsInfoFutureInfo);
sleep(0.5);//模拟远程调用feign
}, executor);
// 等待所有任务完成
CompletableFuture.allOf(basicInfoFuture, statsInfoFuture, categoryInfoFuture, userInfoFuture).join();
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("总耗时: " + (endTime - startTime) / 1000.0 + "秒");
// 关闭线程池
executor.shutdown();
}
// 模拟任务执行时间
private static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
我们可以看到只需1.578秒就可以完成业务,比3.5秒的同步运行进行了大大优化。
在微服务架构中,一个请求可能需要调用多个微服务来获取数据并进行处理。例如,在电商平台中,获取商品详情页信息时需要调用库存服务、价格服务、评论服务等。
在后台管理系统中,可能需要批量处理大量数据,例如批量导入用户数据,进行数据清洗和校验,并将结果写入数据库。
在一个内容聚合平台中,需要从多个数据源获取数据并进行合并返回给用户。例如获取新闻、社交媒体帖子和博客文章。
在一些复杂的业务流程中,需要依次执行多个步骤,每个步骤可能是异步的。例如在订单处理流程中,需要检查库存、扣减库存、创建订单、处理支付等。