知识 - CompletableFuture
# Future VS CompletableFuture
# 工具类
为了便于后续更好地调试和学习,我们需要定义一个工具类辅助我们对知识的理解。
public class CommonUtils {
/**
* 根据指定路径读取文件内容
*/
public static String readFile(String pathToFile) {
try {
return new String(Files.readAllBytes(Paths.get(pathToFile)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 睡眠都是毫秒
*/
public static void sleepMillis(long millis) {
try {
TimeUnit.MICROSECONDS.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 睡眠都是秒
*/
public static void sleepSeconds(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static String getCurrentTime() {
LocalTime now = LocalTime.now();
return now.format(DateTimeFormatter.ofPattern("[HH:mm:ss.SSS]"));
}
/**
* 打印日志信息
*/
public static void printThreadLog(String message) {
// 时间戳 | 线程 ID、 线程名 | 日志信息
StringJoiner result = new StringJoiner(" | ")
.add(getCurrentTime())
.add(String.format("%2d", Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(message);
System.out.println(result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Future 局限性
需求: 替换新闻稿 news.txt
中敏感词汇,把敏感词汇替换成 *
,敏感词存储在 filter-words.txt
中。
我们在和 pom.xml
同级的目录下创建这两个文件。
news.txt
文件:
oh my god! CompletableFuture 真 TM 好用!
filter-words.txt
文件:
TM
然后创建一个 Demo,测试 Future 的使用:
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(16), new ThreadPoolExecutor.AbortPolicy());
// 1:读取敏感词汇
Future<String[]> filterWordFuture = threadPoolExecutor.submit(() -> {
String content = CommonUtils.readFile("filter-words.txt");
return content.split(",");
});
// 2:读取新闻稿
Future<String> newFuture = threadPoolExecutor.submit(() -> CommonUtils.readFile("news.txt"));
// 3:替换操作
Future<String> replaceFuture = threadPoolExecutor.submit(() -> {
String[] words = filterWordFuture.get();
String news = newFuture.get();
for (String word : words) {
if (news.contains(word)) {
news = news.replace(word, "**");
}
}
return news;
});
// 4:打印替换后的新闻稿
CommonUtils.printThreadLog(replaceFuture.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
输出:
17:13:28.046 | 1 | main | oh my god! CompletableFuture 真 ** 好用!
通过上面的代码,我们会发现,Future 相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足至少表现如下:
- 在没有阻塞的情况下,无法对 Future 的结果执行进一步的操作。Future 不会告知你它什么时候完成,你如果想要得到结果,必须通过一个
get()
方法,该方法会阻塞直到结果可用为止。它不具备将回调函数附加到 Future 后并在 Future 的结果可用时自动调用回调的能力 - 无法解决任务相互依赖的问题。
filterWordFuture
和newFuture
的结果不能自动发送给replaceFuture
,需要在replaceFuture
中手动获取,所以使用 Future 不能轻而易举地创建异步工作流 - 不能将多个 Future 合并在一起。假设你有多种不同的 Future,你想在它们全部并行完成后然后再运行某个函数,Future 很难独立完成这一需要
- 没有异常处理。Future 提供的方法中没有专门的 API 应对异常处理,还是需要开发者自己手动异常处理
# CompletableFuture 优势
CompletableFuture
实现了 Future
和 CompletionStage
接口
CompletableFuture 相对于 Future 具有以下优势:
- 为快速创建、链接依赖和组合多个 Future 提供了大量的便利方法
- 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持
- 无缝衔接和亲和 Lambda 表达式和
Stream - API
- 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅
# 创建异步任务
# runAsync 方法
如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用 CompletableFuture.runAsync()
方法。它接受一个 Runnable 接口的实现类对象,方法返回 CompletableFuture<Void>
对象。
static CompletableFuture<Void> runAsync(Runnable runnable);
演示案例: 创建一个不从任务中返回任何内容的 CompletableFuture 异步任务对象
public class RunAsyncDemo1 {
public static void main(String[] args) {
CommonUtils.printThreadLog("main start");
// runAsync 启动异步任务
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
CommonUtils.printThreadLog("读取文件开始");
// 通过睡眠来模拟一个长时间的工作任务(例如读取文件、网络请求等)
CommonUtils.sleepSeconds(3);
CommonUtils.printThreadLog("读取文件结束");
}
});
CommonUtils.printThreadLog("这里阻塞,main continue");
// 等待 CompletableFuture.runAsync 执行完成,否则主线程结束后,异步任务自动销毁
CommonUtils.sleepSeconds(4);
CommonUtils.printThreadLog("main end");
/**
* CompletableFuture 中的异步任务底层通过开启线程的方式完成的
*/
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
输出:
17:13:46.028 | 1 | main | main start
17:13:46.044 | 1 | main | 这里阻塞,main continue
17:13:46.044 | 20 | ForkJoinPool.commonPool-worker-25 | 读取文件开始
17:13:49.051 | 20 | ForkJoinPool.commonPool-worker-25 | 读取文件结束
17:13:50.062 | 1 | main | main end
2
3
4
5
我们也可以以 Lambda 表达式的形式传递 Runnable 接口实现类对象
public class RunAsyncDemo1 {
public static void main(String[] args) {
CommonUtils.printThreadLog("main start");
// runAsync 启动异步任务
CompletableFuture.runAsync(() -> {
CommonUtils.printThreadLog("读取文件开始");
// 通过睡眠来模拟一个长时间的工作任务(例如读取文件、网络请求等)
CommonUtils.sleepSeconds(3);
CommonUtils.printThreadLog("读取文件结束");
});
CommonUtils.printThreadLog("这里阻塞,main continue");
// 等待 CompletableFuture.runAsync 执行完成,否则主线程结束后,异步任务自动销毁
CommonUtils.sleepSeconds(4);
CommonUtils.printThreadLog("main end");
/**
* CompletableFuture 中的异步任务底层通过开启线程的方式完成的
*/
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
需求:使用 CompletableFuture 开启异步任务读取 news.txt
文件中的新闻稿,并打印输出
public class RunAsyncDemo2 {
public static void main(String[] args) {
CommonUtils.printThreadLog("main start");
// runAsync 启动异步任务
CompletableFuture.runAsync(() -> {
CommonUtils.printThreadLog("读取文件开始");
String content = CommonUtils.readFile("news.txt");
CommonUtils.printThreadLog("读取内容:" + content);
CommonUtils.printThreadLog("读取文件结束");
});
CommonUtils.printThreadLog("这里阻塞,main continue");
// 等待 CompletableFuture.runAsync 执行完成,否则主线程结束后,异步任务自动销毁
CommonUtils.sleepSeconds(4);
CommonUtils.printThreadLog("main end");
/**
* 疑问:异步任务是并发执行还是并行执行?
* 如果是单核 CPU,那么异常任务之间就是并发指向性,如果是多核 CPU,异步任务就是并行执行
* 作为开发者,我们只需要清楚如何开启异步任务,CPU 硬件会把异步任务合理的分配给 CPU 上的核运行。
*/
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Lambda 表达式会经常被使用。
输出:
17:14:07.771 | 1 | main | main start
17:14:07.786 | 1 | main | 这里阻塞,main continue
17:14:07.786 | 20 | ForkJoinPool.commonPool-worker-25 | 读取文件开始
17:14:07.793 | 20 | ForkJoinPool.commonPool-worker-25 | 读取内容:oh my god! CompletableFuture 真 TM 好用!
17:14:07.793 | 20 | ForkJoinPool.commonPool-worker-25 | 读取文件结束
17:14:11.797 | 1 | main | main end
2
3
4
5
6
# supplyAsync 方法
CompletableFuture.runAsync
开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时 CompletableFuture.supplyAsync()
是你最好的选择了。
static CompletableFuture<U> supplyAsync(Supplier<U> supplier)
它入参一个 Supplier<U>
供给者,用于供给带返回值的异步任务并返回 CompletableFuture<U>
,其中 U
是供给者给程序供给值的类型。
需求: 开启异步任务读取 news.txt
文件中的新闻稿,返回文件中内容并在主线程打印输出。
我们使用 Java8 的 Lambda 表达式使代码更简洁。
public class SupplyAsyncDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
// supplyAsync 启动异步任务
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> CommonUtils.readFile("news.txt"));
CommonUtils.printThreadLog("这里阻塞,main continue");
System.out.println(newsFuture.get());
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
输出:
17:14:33.751 | 1 | main | main start
17:14:33.769 | 1 | main | 这里阻塞,main continue
oh my god! CompletableFuture 真 TM 好用!
17:14:33.774 | 1 | main | main end
2
3
4
如果想要获取 newsFuture
结果,可以调用 CompletableFuture.get()
方法,get
方法将阻塞,直到 newsFuture
完成。
# 异步任务的线程池
大家已经知道,runAsync()
和 supplyAsync()
方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗?不是吗!
CompletableFuture 会从全局的 ForkJoinPool.commonPool()
线程池获取线程来执行这些任务。
当然,你也可以创建一个线程池,并将其传递给 runAsync()
和 supplyAsync()
方法,以使它们在从您指定的线程池获得的线程中执行任务。
CompletableFuture API 中的所有方法都有两种变体,一种是接受传入的 Executor 参数作为指定的线程池,而另一种则使用默认的线程池 (ForkJoinPool.commonPool()
)。
// runAsync() 的重载方法
static CompletableFuture<Void> runAsync(Runnable runnabTe)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync() 的重载方法
static <U> CompletableFuture<U> supplyAsync(supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(supplier<U> supplier, Executor executor)
2
3
4
5
6
7
需求: 指定线程池,开启异步任务读取 news.txt
中的新闻,返回文件中内容并在主线程打印输出
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("异步读取文件开始");
String news = CommonUtils.readFile("news.txt");
CommonUtils.printThreadLog("异步读取文件完成");
return news;
}, executor);
2
3
4
5
6
7
8
最佳实践: 创建属于自己的业务线程池
如果所有 CompletableFuture 共享一个线程池,那么一旦有异步任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。
所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相千扰。
# 异步编程思想
综合上述,看到了吧,我们没有显式地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识吧,以上专业的说法是: 线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员,专人专事。
异步编程是可以让程序并行(也可能是并发)运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。
作为开发者,只需要有一个意识:
开发者只需要把耗时的操作交给 CompletableFuture 开启一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为 异步编程思想。
# 任务异步回调
CompletableFuture.get()
方法是阻塞的。调用时它会阻塞等待直到这个 Future 完成,并在完成后返回结果。但是,很多时候这不是我们想要的,对于构建异步系统,我们应该能够将回调附加到 CompletableFuture 上,当这个 Future 完成时,该回调应自动被调用。这样,我们就不必等待结果了,我们可以在 Future 的回调函数内部编写完成 Future 之后需要执行的逻辑。您可以使用 thenApply()
、thenAccept()
和 thenRun()
方法将回调函数附到 CompletableFuture。
# thenApply 方法
使用 thenApply()
方法可以处理和转换 CompletableFuture 的结果。它以 Funtion<T,R>
作为参数。Function<T,R>
是一个函数式接口,表示一个转换操作,它接受类型 T 的参数并产生类型 R 的结果。
CompletableFuture<R> thenApply(Function<T,R> fn)
需求:异步读取 filter_words.txt
文件中的内容,读取完成后,把内容转换成数组(敏感词数组),异步任务返回敏感词数组
public class ThenApplyDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture<String> readFilesFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
});
CompletableFuture<String[]> filterWordsFuture = readFilesFuture.thenApply(content -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
});
CommonUtils.printThreadLog("main continue");
CommonUtils.printThreadLog(Arrays.toString(filterWordsFuture.get()));
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
输出:
17:15:02.575 | 1 | main | main start
17:15:02.592 | 20 | ForkJoinPool.commonPool-worker-25 | 读取 filter-words 文件
17:15:02.592 | 1 | main | main continue
17:15:02.596 | 20 | ForkJoinPool.commonPool-worker-25 | 文件内容转换成敏感数组
17:15:02.597 | 1 | main | [TM]
17:15:02.597 | 1 | main | main end
2
3
4
5
6
你还可以通过附加一系列 thenApply()
回调方法,在 CompletableFuture 上编写一系列转换序列。一个 thenApply()
方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递。
public class ThenApplyDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenApply(content -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
});
CommonUtils.printThreadLog("main continue");
CommonUtils.printThreadLog(Arrays.toString(filterWordsFuture.get()));
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
输出结果一样。
# thenAccept 方法
如果你不想从回调函数返回结果,而只想在 Future 完成后运行一些代码,则可以使用 thenAccept()
这些方法是入参一个 Consumer<T>
,它可以对异步任务的执行结果进行消费使用,方法返回 CompletableFuture<Void>
。
CompletableFuture<Void> thenAccept(Consumer<T> action)
通常用作回调链中的最后一个回调。
需求:异步读取 filter-words.txt
文件中的内容,读取完成后,转换成敏感词数组,然后打印敏感词数组
public class ThenAcceptDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenApply(content -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
}).thenAccept(content -> {
CommonUtils.printThreadLog(Arrays.toString(content));
});
CommonUtils.printThreadLog("main continue");
CommonUtils.sleepSeconds(4);
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
输出:
17:17:37.951 | 1 | main | main start
17:17:37.970 | 20 | ForkJoinPool.commonPool-worker-25 | 读取 filter-words 文件
17:17:37.971 | 1 | main | main continue
17:17:37.974 | 20 | ForkJoinPool.commonPool-worker-25 | 文件内容转换成敏感数组
17:17:37.974 | 20 | ForkJoinPool.commonPool-worker-25 | [TM]
17:17:41.983 | 1 | main | main end
2
3
4
5
6
# thenRun 方法
前面我们已经知道,通过 thenApply(Function<T,R>)
对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果。
通过 thenAccept(Consumer<T>)
对链式操作中上一个异步任务的结果进行消费使用,不返回新结果。
如果我们只是想从 CompletableFuture
的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么 CompletableFuture.thenRun()
会是你最佳的选择,它需要一个 Runnable 并返回 CompletableFuture<Void>
。
CompletableFuture<Void> thenRun(Runnable action);
演示案例:我们仅仅想知道 filter-words.txt
的文件是否读取完成
public class ThenRunDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenRun(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件完成");
});
CommonUtils.printThreadLog("main continue");
CommonUtils.sleepSeconds(4);
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
输出:
17:20:52.647 | 1 | main | main start
17:20:52.664 | 20 | ForkJoinPool.commonPool-worker-25 | 读取 filter-words 文件
17:20:52.665 | 1 | main | main continue
17:20:52.668 | 20 | ForkJoinPool.commonPool-worker-25 | 读取 filter-words 文件完成
17:20:56.666 | 1 | main | main end
2
3
4
5
更进一步提升并行化
CompletableFuture
提供的所有回调方法都有两个异步变体
CompTetabTeFuture<U> thenApp1y(Function<T,U> fn)
// 回调方法的异步变体(异步回调)
CompTetabTeFuture<U> thenApplyAsync(Function<T,U> fn)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor)
2
3
4
注意: 这些带了 Async 的异步回调 通过在单独的线程中执行回调任务 来帮助您进一步促进并行化计算。
回顾需求: 异步读取 flter-words.txt
文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组
public class ThenApplyDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenApply(content -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
});
CommonUtils.printThreadLog("main continue");
CommonUtils.printThreadLog(Arrays.toString(filterWordsFuture.get()));
CommonUtils.printThreadLog("main end");
}
/**
* 总结
* 一般而言,commonPool 为了提高性能水
* thenApply 中回调任务和 supplyAsync 中的异步任务使用的是同一个线程
* 特殊情况:如 supplyAsync 中的任务是立即返回结果(不是耗时的任务),thenApply 回调任务也会在主线程执行
*/
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
要更好地控制执行回调任务的线程,可以使用异步回调。如果使用 thenApplyAsync()
回调,那么它将在 ForkJoinPool.commonPool()
获得的另一个线程中执行。
public class ThenApplyDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenApplyAsync(content -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
}, executorService);
CommonUtils.printThreadLog("main continue");
CommonUtils.printThreadLog(Arrays.toString(filterWordsFuture.get()));
CommonUtils.printThreadLog("main end");
}
/**
* 总结
* 一般而言,commonPool 为了提高性能水
* thenApply 中回调任务和 supplyAsync 中的异步任务使用的是同一个线程
* 特殊情况:如 supplyAsync 中的任务是立即返回结果(不是耗时的任务),thenApply 回调任务也会在主线程执行。
*/
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 异步任务编排
# thenCompose 方法
thenCompose 方法作用是编排 2 个 依赖关系 的异步任务。
回顾需求:异步读取 filter-words.txt
文件中的内容,读取完成后,转换成敏感词数组让主线程待用。
关于读取和解析内容,假设使用以下的 readFileFuture(String)
和 splitFuture(String)
方法完成。
public static CompletableFuture<String> readFileFuture(String fileName) {
return CompletableFuture.supplyAsync(() -> CommonUtils.readFile(fileName));
}
public static CompletableFuture<String[]> splitFuture(String context) {
return CompletableFuture.supplyAsync(() -> context.split(","));
}
2
3
4
5
6
7
现在,让我们先了解如果使用 thenApp1y()
结果会发生什么
CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("filter-words .txt")
.thenApply(context -> splitFuture(context));
2
回顾在之前的案例中,thenApply(FunctionT,R)
中 Function 回调会对上一步任务结果转换后得到一个简单值,但现在这种情况下,最终结果是嵌套的 CompletableFuture,所以这是不符合预期的,那怎么办呢?
我们想要的是:把上一步异步任务的结里,转成一个 CompletableFuture 对象,这个 CompletableFuture 对象中包含本次异步任务外理后的结果。也就是说,我们想组合上一步异步任务的结果到下一个新的异步任务中,结果由这个新的异步任务返回。
此时,你需要使用 thenCompose()
方法代替,我们可以把它理解为异步任务的组合。
CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> func)
所以,thencompose()
用来连接两个有依赖关系的异步任务,结果由第二个任务返回
CompletableFuture<String[]> future = readFileFuture("filter-words.txt")
.thenCompose(context -> splitFuture(context));
2
因此,这里积累了一个经验:
如果我们想连接(编排)两个依关系的异步任务(CompletableFuture 对象),请使用 thenCompose()
方法当然,thenCompose
也存在异步回调变体版本:
CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn, Executor executor)
2
3
# thenCombine 方法
thenCombine 方法作用是编排 2 个 非依赖关系 的异步任务。
我们已经知道,当其中一个 Future 依赖于另一个 Future,使用 thenCompose
用于组合两 Future,如果两 Future 之间没有依赖关系,你希望两个 Future 独立运行并在两者都完成之后执行回调操作时,则使用 thenCombine()
// T 是第一个任务的结果,U 是第二个任务的结果,V 是经 BiFunction 应用转换后的结果
CompletableFuture<v> thenCombine(CompletableFuture<u> other, BiFunction<T,U,V> func)
2
需求:替换新闻稿 news.txt
中敏感词汇,把敏感词汇替换成 *
,敏感词存储在 flter-words.txt
中
public class ThenComposeDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CommonUtils.printThreadLog("main start");
CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 filter-words 文件");
return CommonUtils.readFile("filter-words.txt");
}).thenCompose(content -> CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("文件内容转换成敏感数组");
return content.split(",");
}));
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
CommonUtils.printThreadLog("读取 news.txt 文件");
return CommonUtils.readFile("news.txt");
});
CompletableFuture<String> combineFuture = filterWordsFuture.thenCombine(newsFuture, (filterWords, newsContent) -> {
CommonUtils.printThreadLog("替换操作");
for (String filterWord : filterWords) {
if (newsContent.contains(filterWord)) {
newsContent = newsContent.replace(filterWord, "**");
}
}
return newsContent;
});
CommonUtils.printThreadLog("main continue");
CommonUtils.printThreadLog(combineFuture.get());
CommonUtils.printThreadLog("main end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
输出:
17:45:03.648 | 1 | main | main start
17:45:03.667 | 20 | ForkJoinPool.commonPool-worker-25 | 读取 filter-words 文件
17:45:03.667 | 21 | ForkJoinPool.commonPool-worker-18 | 读取 news.txt 文件
17:45:03.668 | 1 | main | main continue
17:45:03.672 | 21 | ForkJoinPool.commonPool-worker-18 | 文件内容转换成敏感数组
17:45:03.672 | 21 | ForkJoinPool.commonPool-worker-18 | 替换操作
17:45:03.673 | 1 | main | oh my god! CompletableFuture 真 ** 好用!
17:45:03.673 | 1 | main | main end
2
3
4
5
6
7
8
# 合并多个异步任务 allOf
我们使用 thenCompose()
和 thencombine()
将两个 CompletableFuture 组合和合并在一起。
如果要编排任意数量的 CompletableFuture 怎么办? 可以使用以下方法来组合任意数量的 CompletableFuture。
public static CompletableFuture<Void> alOf(CompletableFuture<?>... cfs)
CompletableFuture.allOf()
用于以下情形中: 有多人需要独立并行运行的 Future,并在所有这些 Future 都完成后执行一些操作。
需求: 统计 news1.txt
、new2.txt
、new3.txt
文件中包含 CompletableFuture 关键字的文件的个数。
new1.txt
文件:
测试 1
new2.txt
文件:
测试 2
new3.txt
文件:
测试 3
代码:
public class AllOfDemo {
public static void main(String[] args) {
// 1、创建 List 集合存储文件名
List<String> fileList = Arrays.asList("news1.txt", "news2.txt", "news3.txt");
// 2、根据文件名调用 readFileFuture 创建多个 CompletableFuture,并存入 List 集合中
List<CompletableFuture<String>> readFileFutureList = fileList.stream().map(fileName -> readFileFuture(fileName)).collect(Collectors.toList());
// 3、把 List 集合转换成数组待用,以便传人 allOf 方法中
CompletableFuture[] readFileFutureArr = readFileFutureList.toArray(new CompletableFuture[0]);
// 4、AllOf 成功回调读取结果
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileFutureArr);
// 5、当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数
CompletableFuture<Integer> sizeFuture = allOfFuture.thenApply(v -> {
readFileFutureList.forEach(file -> {
String content = file.join();
System.out.println("内容:" + content);
});
return readFileFutureList.size();
});
// 6、主线程打印输出文件个数
System.out.println("长度:" + sizeFuture.join());
/**
* aLlOf 特别适合合并多个异步任务,当所有异步任务都完成时可以进一步操
*/
}
public static CompletableFuture<String> readFileFuture(String fileName) {
return CompletableFuture.supplyAsync(() -> CommonUtils.readFile(fileName));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
输出:
内容:测试 1
内容:测试 2
内容:测试 3
长度:3
2
3
4
# 合并多个异步任务 anyOf
顾名思义,当给定的多个异步任务中的有任意 Future 一个完成时,需要执行一些操作,可以使用 anyOf
方法
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
anyOf()
返回一个新的 CompletableFuture, 新的 CompletableFuture 的结果和 cfs 中已完成的那个异步任务结果相同。
演示案例: anyOf
执行过程
public class AnyOfDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> allOfFuture1 = CompletableFuture.supplyAsync(() -> {
CommonUtils.sleepSeconds(1);
return "结果 1";
});
CompletableFuture<String> allOfFuture2 = CompletableFuture.supplyAsync(() -> {
CommonUtils.sleepSeconds(2);
return "结果 2";
});
CompletableFuture<String> allOfFuture3 = CompletableFuture.supplyAsync(() -> {
CommonUtils.sleepSeconds(3);
return "结果 3";
});
// 任意一个 异步任务 完成,则返回结果
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(allOfFuture1, allOfFuture2, allOfFuture3);
Object result = anyOfFuture.get();
// 结果 1
System.out.println(result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
输出:
结果 1
在上面的示例中,当三个 CompletableFuture 中的任意一人完成时,anyOfFuture 就完成了。由于 allOfFuture1 的睡眠时间最少,因此它将首先完成,最终结果将是 结果 1。
注意:
anyOf()
方法返回类型必须是CompletableFuture<Object>
anyOf()
的问题在于,如果您拥有返回不同类型结果的 CompletableFuture,那么您将不知道最终 CompletableFuture 的类型
# 异步任务的异常处理
在前面的章节中,我们并没有更多地关心异常处理的问题,其实,CompletableFuture 提供了优化处理异常的方式。
首先,让我们了解 异常如何在回调链中传播。
public class ExceptionChainDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result 1")
.thenApply(result -> {
int i = 1 / 0;
return result + " result 2";
})
.thenApply(result -> result + " result 3");
System.out.println(future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
如果在 supplyAsync
任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将转入异常处理。
如果在第一个 thenApply 任务中出现异常,第二个 thenApply 和最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常外理,依次类推。
# exceptionally 方法
exceptionally 用于处理回调链上的异常,回调链上出现的任何异常,回调链不继续向下执行,都在 exceptionally 中处理异常。
// Throwable 表示具体的异常对象 e
CompletableFuture<R> exceptionally(Function<Throwable,R> func)
2
Demo
public class ExceptionChainDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 如果回调链中出现任意异常,回调链不再向下执行,而是立即转入异常处理 exceptionally,并将异常处理的返回值进行返回
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result 1")
.thenApply(result -> {
int i = 1 / 0;
return result + " result 2";
})
.thenApply(result -> result + " result 3")
.exceptionally(exception -> {
System.out.println("出现异常:" + exception.getMessage());
return "UnKnown";
});
System.out.println(future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
输出:
出现异常:java.lang.ArithmeticException: / by zero
UnKnown
2
因为 exceptionally 只处理一次异常,所以常常用在回调链的末端。
# handle 方法
CompletableFuture APl 还提供了一种更通用的方法 handle()
表示从异常中恢复。
handle()
常常被用来恢复回调链中的一次特定的异常,回调链恢复后可以进一步向下传递。
public class HandleDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 异步任务不管是否发生异常,handle 方法都会执行,第一个参数是异步任务的返回值,如果出现异常,则异常信息在第二个参数里
* 所以,handle 核心作用在于对上一步异步任务进行现场恢复
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result 1")
.thenApply(result -> {
int i = 1 / 0;
return result + " result 2";
})
.thenApply(result -> result + " result 3")
.handle((result, exception) -> {
CommonUtils.printThreadLog("处理异常");
if (Objects.nonNull(exception)) {
System.out.println("出现异常:" + exception.getMessage());
return "UnKnown";
}
return result;
});
System.out.println(future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
输出:
18:04:48.231 | 1 | main | 处理异常
出现异常:java.lang.ArithmeticException: / by zero
UnKnown
2
3
如果发生异常,则 result 参数将为 null,否则 exception 参数将为 null。
需求:对回调链中的一次异常进行恢复处理
public class HandleDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 异步任务不管是否发生异常,handle 方法都会执行,第一个参数是异步任务的返回值,如果出现异常,则异常信息在第二个参数里
* 所以,handle 核心作用在于对上一步异步任务进行现场恢复
*/
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "result 1";
}).handle((result, exception) -> {
if (Objects.nonNull(exception)) {
System.out.println("出现异常:" + exception.getMessage());
return "UnKnown1";
}
return result;
}).thenApply(result -> {
int i = 10 / 0;
return result + " result 2";
}).handle((result, exception) -> {
if (Objects.nonNull(exception)) {
System.out.println("出现异常:" + exception.getMessage());
return "UnKnown2";
}
return result;
})
.thenApply(result -> result + " result 3");
System.out.println(future.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
输出:
出现异常:java.lang.ArithmeticException: / by zero
出现异常:java.lang.ArithmeticException: / by zero
UnKnown2 result 3
2
3
和以往一样,为了提升并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本
CompTetabTeFuture<R> exceptionally(Function<Throwable,R> fn)
CompTetabTeFuture<R> exceptionallyAsync(Function<Throwable,R> fn) // jdk17+
CompTetabTeFuture<R> exceptionallyAsync(Function<Throwable,R> fn, Executor executor) // jdk17+
CompTetabTeFuture<R> handle(BiFunction<T,Throwable,R> fn)
CompTetabTeFuture<R> handleAsync(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn, Executor executor)
2
3
4
5
6
7
# 异步任务的交互
异步任务交互指将异步任务对 获取结果的速度相比较,按一定的规则(先到先用)进行下一步处理。
# applyToEither 方法
applyToEither()
把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。
CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
演示案例: 使用最先完成的异步任务的结果:
public class applyEitherDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSeconds(x);
CommonUtils.printThreadLog("任务 1 耗时:" + x + " 秒");
return x;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSeconds(y);
CommonUtils.printThreadLog("任务 2 耗时:" + y + " 秒");
return y;
});
CompletableFuture<Object> future = future1.applyToEither(future2, result -> {
CommonUtils.printThreadLog("最先到达的结果:" + result);
return result;
});
CommonUtils.sleepSeconds(4);
System.out.println("最终结果:" + future.join());
/**
* 异步任务交互指两个异步任务,哪个结果先到,就使用哪个结果(先到先用)
*/
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
输出:
19:21:55.946 | 20 | ForkJoinPool.commonPool-worker-25 | 任务 1 耗时:1 秒
19:21:55.946 | 21 | ForkJoinPool.commonPool-worker-18 | 任务 2 耗时:2 秒
19:21:55.956 | 20 | ForkJoinPool.commonPool-worker-25 | 最先到达的结果:1
最终结果:1
2
3
4
速记心法:任务 1、任务 2 就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交
以下是 applyToEither 和其对应的异步回调版本
CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<t> other, Function<T,R> func, Executor executor)
2
3
# acceptEither
acceptEither()
把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作(消费使用)。
CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)
completableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action, Executor executor)
2
3
演示案例:使用最先完成的异步任务的结果
public class acceptEitherDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSeconds(x);
CommonUtils.printThreadLog("任务 1 耗时:" + x + " 秒");
return x;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSeconds(y);
CommonUtils.printThreadLog("任务 2 耗时:" + y + " 秒");
return y;
});
future1.acceptEither(future2, result -> CommonUtils.printThreadLog("最先到达的结果:" + result));
ExecutorService executorService = Executors.newFixedThreadPool(4);
future1.acceptEitherAsync(future2, result -> CommonUtils.printThreadLog("最先到达的结果(Async 调用):" + result), executorService);
executorService.shutdown();
CommonUtils.sleepSeconds(4);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
输出:
19:27:47.716 | 21 | ForkJoinPool.commonPool-worker-18 | 任务 2 耗时:2 秒
19:27:47.716 | 20 | ForkJoinPool.commonPool-worker-25 | 任务 1 耗时:2 秒
19:27:47.725 | 20 | ForkJoinPool.commonPool-worker-25 | 最先到达的结果:2
19:27:47.725 | 22 | pool-1-thread-1 | 最先到达的结果(Async 调用):2
2
3
4
# runAfterEither
如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither()
,以下是它的相关方法:
CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)
2
3
提示
异步任务交互的三个方法和之前学习的异步的回调方法
thenApply
、thenAccept
、thenRun
有异曲同工之妙。
public class RuntEitherDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(3);
CommonUtils.sleepSeconds(x);
CommonUtils.printThreadLog("任务 1 耗时:" + x + " 秒");
return x;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(3);
CommonUtils.sleepSeconds(y);
CommonUtils.printThreadLog("任务 2 耗时:" + y + " 秒");
return y;
});
// acceptEither 方法
future1.runAfterEither(future2, () -> CommonUtils.printThreadLog("有一个异步任务完成了"));
CommonUtils.sleepSeconds(4);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
输出:
19:37:01.480 | 21 | ForkJoinPool.commonPool-worker-18 | 任务 2 耗时:0 秒
19:37:01.480 | 20 | ForkJoinPool.commonPool-worker-25 | 任务 1 耗时:0 秒
19:37:01.494 | 21 | ForkJoinPool.commonPool-worker-18 | 有一个异步任务完成了
2
3
# runAfterBoth
和 runAfterEither
相反的方法是 runAfterBoth
,所有异步任务都完成了,才得到通知。
# get 和 join 区别
get()
和 join()
都是 CompletableFuture 提供的以阻塞方式。
获取结果的方法那么该如何选用呢? 请看如下案例:
public class GetOrJoinDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
String getResult = null;
// 抛出检查时异常,必须手动处理
try {
getResult = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("result = " + getResult);
// 抛出运行时异常,可以不处理
String joinResult = future.join();
System.out.println("join = " + joinResult);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
输出:
result = hello
join = hello
2
使用时,我们发现,get()
抛出检查时异常,需要程序必须处理;而 join()
方法抛出运行时异常,程序可以不处理。所以 join()
更适合用在流式编程中。
# ParallelStream VS CompletableFuture
CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?
同时,对于 Stream API 本身就提供了并行流 ParallelStream,它们有什么不同呢?
我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚 CompletableFuture 在流式操作的优势。
需求: 创建 10 个 MyTask 耗时的任务,统计它们执行完的总耗时。
定义一个 MyTask 类,来模拟耗时的长任务
public class MyTask {
private int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int doWork() {
CommonUtils.printThreadLog("doWork");
CommonUtils.sleepSeconds(duration);
return duration;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
同时,我们创建 10 个任务,每个持续 1 秒
public class SequenceDemo {
public static void main(String[] args) {
// 需求:创建 10 个 MyTask 耗时任务,统计执行完的耗时长
// 创建 10 个 MyTask 对象,每个任务持续 1s。存入 List 集合
IntStream range = IntStream.range(0, 10);
List<MyTask> tasks = range.mapToObj(item -> new MyTask(1))
.collect(Collectors.toList());
long start = System.currentTimeMillis();
List<Integer> results = tasks.stream().map(myTask -> myTask.doWork())
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("任务数:" + tasks.size() + ",耗时:" + (end - start) / 1000.0);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
输出:
19:47:00.753 | 1 | main | doWork
19:47:01.772 | 1 | main | doWork
19:47:02.786 | 1 | main | doWork
19:47:03.798 | 1 | main | doWork
19:47:04.806 | 1 | main | doWork
19:47:05.820 | 1 | main | doWork
19:47:06.834 | 1 | main | doWork
19:47:07.844 | 1 | main | doWork
19:47:08.857 | 1 | main | doWork
19:47:09.870 | 1 | main | doWork
任务数:10,耗时:10.144
2
3
4
5
6
7
8
9
10
11
它花费了 10 秒,因为每个任务在主线程一个接一个的执行。
# 并行流的局限性
因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 para1lelstream()
。
public class ParallelStreamDemo {
public static void main(String[] args) {
// 创建 10 个 MyTask 耗时任务,统计执行完的耗时长
// 创建 10 个 MyTask 对象,每个任务持续 1s。存入 List 集合
IntStream range = IntStream.range(0, 10);
List<MyTask> tasks = range.mapToObj(item -> new MyTask(1))
.collect(Collectors.toList());
long start = System.currentTimeMillis();
// parallelStream 并行执行,如果电脑有 10 个核,则只需要 1s 左右,每个核都能处理一个 map 循环
List<Integer> results = tasks.parallelStream().map(myTask -> myTask.doWork())
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("任务数:" + tasks.size() + ",耗时:" + (end - start) / 1000.0);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
输出:
19:47:43.983 | 24 | ForkJoinPool.commonPool-worker-22 | doWork
19:47:43.983 | 28 | ForkJoinPool.commonPool-worker-1 | doWork
19:47:43.983 | 25 | ForkJoinPool.commonPool-worker-29 | doWork
19:47:43.983 | 26 | ForkJoinPool.commonPool-worker-15 | doWork
19:47:43.983 | 23 | ForkJoinPool.commonPool-worker-4 | doWork
19:47:43.983 | 22 | ForkJoinPool.commonPool-worker-11 | doWork
19:47:43.983 | 21 | ForkJoinPool.commonPool-worker-18 | doWork
19:47:43.983 | 27 | ForkJoinPool.commonPool-worker-8 | doWork
19:47:43.983 | 1 | main | doWork
19:47:43.983 | 20 | ForkJoinPool.commonPool-worker-25 | doWork
任务数:10,耗时:1.034
2
3
4
5
6
7
8
9
10
11
它花费了 1 秒多,因为此次并行执行使用了 10 个线程,9 个是 ForkJoinPool 线程池中的,一个是 main 线程),需要注意是: 运行结果由自己电 脑 CPU 的核数决定。
# CompletableFuture 在流式操作的优势
让我们看看使用 CompletableFuture 是否执行的更有效率
public class CompletableFutureDemo1 {
public static void main(String[] args) {
// 需求:创建 10 个 MyTask 耗时任务,统计执行完的耗时长
// 1、创建 10 个 MyTask 对象,每个任务持续 1s。存入 List 集合
IntStream range = IntStream.range(0, 10);
List<MyTask> tasks = range.mapToObj(item -> new MyTask(1))
.collect(Collectors.toList());
// 获取电脑线程池数量
int CPU_N = Runtime.getRuntime().availableProcessors();
// 准备线程池
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(tasks.size(), CPU_N * 2));
// 2、根据 MyTask 对象创建 10 个异步任务
List<CompletableFuture<Integer>> futures = tasks.stream()
.map(myTask -> CompletableFuture.supplyAsync(() -> myTask.doWork(), executorService))
.collect(Collectors.toList());
// 3、执行异步任务,执行完成后,获取异步任务的结果,存入 List 集合中,统计总耗时
long start = System.currentTimeMillis();
List<Integer> results = futures.stream().map(future -> future.join())
.collect(Collectors.toList());
long end = System.currentTimeMillis();
System.out.println("任务数:" + tasks.size() + ",耗时:" + (end - start) / 1000.0);
// 关闭线程池
executorService.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
输出:
19:50:47.813 | 27 | pool-1-thread-8 | doWork
19:50:47.813 | 23 | pool-1-thread-4 | doWork
19:50:47.813 | 20 | pool-1-thread-1 | doWork
19:50:47.813 | 28 | pool-1-thread-9 | doWork
19:50:47.813 | 21 | pool-1-thread-2 | doWork
19:50:47.813 | 29 | pool-1-thread-10 | doWork
19:50:47.813 | 26 | pool-1-thread-7 | doWork
19:50:47.812 | 22 | pool-1-thread-3 | doWork
19:50:47.812 | 24 | pool-1-thread-5 | doWork
19:50:47.812 | 25 | pool-1-thread-6 | doWork
任务数:10,耗时:1.033
2
3
4
5
6
7
8
9
10
11
测试代码时,如果电脑配置是 10 核,当创建的线程池中线程数最少也是 10 个,每个线程负责一个任务(耗时),总体来说处理 10 个任务总共需要约 1 秒。
# 合理配置线程池中的线程数
正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能。
问题 1: 如何选用 CompletableFuture 和 ParallelStream
如果你的任务是 IO 密集型的,你应该使用 CompletableFuture。
如果你的任务是 CPU 密集型的,使用比处理器更多的线程是没有意义的,所以选择 Parallelstream ,因为它不需要创建线程池,更容易使用。
问题 2: IO 密集型任务和 CPU 密集型任务的区别
CPU 密集型也叫计算密集型,此时,系统运行时大部分的状况是 CPU 占用率近乎 100%,IO 在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU 使用率很高。比如说要计算 1 + 2 + 3 + ... + 10 万亿
、天文计算、圆周率后几十位等,都是属于 CPU 密集型程序。
CPU 密集型任务的特点: 大量计算,CPU 占用率一般都很高,IO 时间很短
IO 密集型指大部分的状况是 CPU 在等 IO(硬盘/内存)的读写操作,但 CPU 的使用率不高。
简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。
IO 密集型任务的特点: 大量网络请求,文件操作,CPU 运算少,很多时候 CPU 在等待资源才能进一步操作。
问题 3: 既然要控制线程池的数量,多少合适呢
如果是 CPU 密集型任务,就需要尽量压榨 CPU,参考值可以设为 Ncpu + 1
。
如果是 IO 密集型任务,参考值可以设置为 2 * Ncpu
,其中 Ncpu 表示 CPU 核心数。
注意的是:以上给的是参考值,具体业务具体分析。
# 大数据商品比价案例
需求描述:实现一个大数据比价服务,价格数据可以从京东、天猫、拼多多等平台去获取指定商品的价格、优惠金额,然后计算出实际付款金额(商品价格优惠金额),最终返回价格最优的平台与价格信息。
# 构建工具类和实体类
价格实体类(用到了 Lombok):
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class PriceResult {
private int price;
private int discount;
private int realPrice;
private String platform;
public PriceResult(String platform) {
this.platform = platform;
}
@Override
public String toString() {
return "PriceResult{" +
"平台='" + platform + '\'' +
", 价格=" + price +
", 折扣=" + discount +
", 最终价=" + realPrice +
'}';
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
工具类依然不变:
public class CommonUtils {
/**
* 根据指定路径读取文件内容
*/
public static String readFile(String pathToFile) {
try {
return new String(Files.readAllBytes(Paths.get(pathToFile)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 睡眠都是毫秒
*/
public static void sleepMillis(long millis) {
try {
TimeUnit.MICROSECONDS.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 睡眠都是秒
*/
public static void sleepSeconds(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static String getCurrentTime() {
LocalTime now = LocalTime.now();
return now.format(DateTimeFormatter.ofPattern("[HH:mm:ss.SSS]"));
}
/**
* 打印日志信息
*/
public static void printThreadLog(String message) {
// 时间戳 | 线程 ID、 线程名 | 日志信息
StringJoiner result = new StringJoiner(" | ")
.add(getCurrentTime())
.add(String.format("%2d", Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(message);
System.out.println(result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 构建 HttpRequest
HttpRequest 用于模拟网络请求(耗时的操作)
ublic class HttpRequest {
private static void mockCostTimeOperations() {
CommonUtils.sleepSeconds(1);
}
/**
* 过去淘宝平台的商品价格
*/
public static PriceResult getTaoBaoPrice(String productName) {
CommonUtils.printThreadLog("获取淘宝上 " + productName + " 价格");
mockCostTimeOperations();
PriceResult priceResult = new PriceResult("淘宝");
priceResult.setPrice(5199);
CommonUtils.printThreadLog("获取淘宝上 " + productName + " 价格完成:5199");
return priceResult;
}
/**
* 获取淘宝平台的优惠
*/
public static int getTaoBaoDiscount(String productName) {
CommonUtils.printThreadLog("获取淘宝上 " + productName + " 优惠");
mockCostTimeOperations();
CommonUtils.printThreadLog("获取淘宝上 " + productName + " 优惠完成:-200");
return 200;
}
/**
* 获取京东平台的商品价格
*/
public static PriceResult getJDongPrice(String productName) {
CommonUtils.printThreadLog("获取京东上 " + productName + " 价格");
mockCostTimeOperations();
PriceResult priceResult = new PriceResult("京东");
priceResult.setPrice(5299);
CommonUtils.printThreadLog("获取京东上 " + productName + " 价格完成:5299");
return priceResult;
}
/**
* 获取京东平台的优惠
*/
public static int getJDongDiscount(String productName) {
CommonUtils.printThreadLog("获取京东上 " + productName + " 优惠");
mockCostTimeOperations();
CommonUtils.printThreadLog("获取京东上 " + productName + " 优惠完成:-150");
return 150;
}
/**
* 获取拼多多平台的商品价格
*/
public static PriceResult getPDDPrice(String productName) {
CommonUtils.printThreadLog("获取拼多多上 " + productName + " 价格");
mockCostTimeOperations();
PriceResult priceResult = new PriceResult("拼多多");
priceResult.setPrice(5399);
CommonUtils.printThreadLog("获取拼多多上 " + productName + " 价格完成:5399");
return priceResult;
}
/**
* 获取拼多多平台的优惠
*/
public static int getPDDDiscount(String productName) {
CommonUtils.printThreadLog("获取拼多多上 " + productName + " 优惠");
mockCostTimeOperations();
CommonUtils.printThreadLog("获取拼多多上 " + productName + " 优惠完成:-5300");
return 5300;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 操作商品比价
下面有三种方案进行比价:
- 串行方式
- Future + 线程池方式
- CompletableFuture 方式
public class ComparePriceService {
/**
* 方案一:串行方式操作商品比价
*/
public PriceResult getCheapestPlatformPrice1(String productName) {
PriceResult priceResult;
int discount;
// 获取淘宝平台的商品价格和优惠
priceResult = HttpRequest.getTaoBaoPrice(productName);
discount = HttpRequest.getTaoBaoDiscount(productName);
PriceResult taoaoPriceResult = computeRealPrice(priceResult, discount);
// 获取京东平台的商品价格和优惠
priceResult = HttpRequest.getJDongPrice(productName);
discount = HttpRequest.getJDongDiscount(productName);
PriceResult jDongPriceResult = computeRealPrice(priceResult, discount);
// 获取拼多多平台的商品价格和优惠
priceResult = HttpRequest.getPDDPrice(productName);
discount = HttpRequest.getPDDDiscount(productName);
PriceResult pddPriceResult = computeRealPrice(priceResult, discount);
// 计算最优的平台和价格
return Stream.of(taoaoPriceResult, jDongPriceResult, pddPriceResult)
.min(Comparator.comparing(PriceResult::getRealPrice))
.get();
}
/**
* 方案二:使用 Future + 线程池并行方式操作商品比价
*/
public PriceResult getCheapestPlatformPrice2(String productName) {
// 线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 获取淘宝平台的商品价格和优惠
Future<PriceResult> taoBaoFuture = executorService.submit(() -> {
PriceResult priceResult = HttpRequest.getTaoBaoPrice(productName);
int discount = HttpRequest.getTaoBaoDiscount(productName);
return computeRealPrice(priceResult, discount);
});
// 获取京东平台的商品价格和优惠
Future<PriceResult> jDongFuture = executorService.submit(() -> {
PriceResult priceResult = HttpRequest.getJDongPrice(productName);
int discount = HttpRequest.getJDongDiscount(productName);
return computeRealPrice(priceResult, discount);
});
// 获取拼多多平台的商品价格和优惠
Future<PriceResult> pddFuture = executorService.submit(() -> {
PriceResult priceResult = HttpRequest.getPDDPrice(productName);
int discount = HttpRequest.getPDDDiscount(productName);
return computeRealPrice(priceResult, discount);
});
// 计算最优的平台和价格
return Stream.of(taoBaoFuture, jDongFuture, pddFuture)
.map(future -> {
try {
return future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
return null;
} finally {
executorService.shutdownNow();
}
})
.filter(Objects::nonNull)
.min(Comparator.comparing(PriceResult::getRealPrice))
.get();
}
/**
* 方案三:使用 CompletableFuture 方式操作商品比价
*/
public PriceResult getCheapestPlatformPrice3(String productName) {
// 获取淘宝平台的商品价格和优惠
CompletableFuture<PriceResult> taoBaoCompletableFuture = CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoDiscount(productName)), this::computeRealPrice);
// 获取京东平台的商品价格和优惠
CompletableFuture<PriceResult> jDongCompletableFuture = CompletableFuture.supplyAsync(() -> HttpRequest.getJDongPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getJDongDiscount(productName)), this::computeRealPrice);
// 获取拼多多平台的商品价格和优惠
CompletableFuture<PriceResult> pddCompletableFuture = CompletableFuture.supplyAsync(() -> HttpRequest.getPDDPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getPDDDiscount(productName)), this::computeRealPrice);
// 计算最优的平台和价格
return Stream.of(taoBaoCompletableFuture, jDongCompletableFuture, pddCompletableFuture)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.min(Comparator.comparing(PriceResult::getRealPrice))
.get();
}
/**
* 计算商品的最终价格 = 平台价格 - 优惠价
*/
public PriceResult computeRealPrice(PriceResult priceResult, int discount) {
priceResult.setRealPrice(priceResult.getPrice() - discount);
priceResult.setDiscount(discount);
CommonUtils.printThreadLog(priceResult.getPlatform() + "最终价格计算完成:" + priceResult.getRealPrice());
return priceResult;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# 测试
public class ComparePriceDemo {
public static void main(String[] args) {
// 方案一耗时 6.123s
// test1();
// 方案二耗时 2.067s
// test2();
// 方案三耗时 1.059s
// test3();
// 异步任务的批量操作
List<String> list = Arrays.asList("iPhone14 黑色", "iPhone14 白色", "iPhone14 玫瑰红");
ComparePriceService comparePriceService = new ComparePriceService();
PriceResult priceResult = comparePriceService.batchComparePrice(list);
System.out.println("priceResult = " + priceResult);
}
/**
* 方案一测试:串行方式操作商品比价
*/
public static void test1() {
ComparePriceService comparePriceService = new ComparePriceService();
long start = System.currentTimeMillis();
PriceResult priceResult = comparePriceService.getCheapestPlatformPrice1("iPhone14");
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) / 1000.0);
System.out.println("priceResult = " + priceResult);
}
/**
* 方案二测试;使用 Future + 线程池并行方式操作商品比价
*/
public static void test2() {
ComparePriceService comparePriceService = new ComparePriceService();
long start = System.currentTimeMillis();
PriceResult priceResult = comparePriceService.getCheapestPlatformPrice2("iPhone14");
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) / 1000.0);
System.out.println("priceResult = " + priceResult);
}
/**
* 方案三测试:使用 CompletableFuture 方式操作商品比价
*/
public static void test3() {
ComparePriceService comparePriceService = new ComparePriceService();
long start = System.currentTimeMillis();
PriceResult priceResult = comparePriceService.getCheapestPlatformPrice3("iPhone14");
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) / 1000.0);
System.out.println("priceResult = " + priceResult);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
最终发现 CompletableFuture 方法耗时是最短的。
耗时方案对比:串行方式(6.123s) < Future(2.067s) + 线程池方式 < CompletableFuture 方式(1.059s)。
# Stream API 操作批量商品比价
public class ComparePriceService {
public PriceResult batchComparePrice(List<String> productNames) {
// 1、遍历每个商品的名字,根据商品名称开启异步任务获取最终价,归集到 List 集合中
List<CompletableFuture<PriceResult>> completableFutures = productNames.stream()
.map(productName -> CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoPrice(productName))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequest.getTaoBaoDiscount(productName)), this::computeRealPrice)
).collect(Collectors.toList());
// 把多个商品的最终价格进行排序获取最小值
return completableFutures.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparing(PriceResult::getRealPrice))
.findFirst()
.get();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
测试
public class ComparePriceDemo {
public static void main(String[] args) {
// 异步任务的批量操作
List<String> list = Arrays.asList("iPhone14 黑色", "iPhone14 白色", "iPhone14 玫瑰红");
ComparePriceService comparePriceService = new ComparePriceService();
PriceResult priceResult = comparePriceService.batchComparePrice(list);
System.out.println("priceResult = " + priceResult);
}
}
2
3
4
5
6
7
8
9
输出:
20:07:16.317 | 20 | ForkJoinPool.commonPool-worker-25 | 获取淘宝上 iPhone14 黑色 价格
20:07:16.317 | 22 | ForkJoinPool.commonPool-worker-11 | 获取淘宝上 iPhone14 白色 价格
20:07:16.317 | 23 | ForkJoinPool.commonPool-worker-4 | 获取淘宝上 iPhone14 白色 优惠
20:07:16.317 | 21 | ForkJoinPool.commonPool-worker-18 | 获取淘宝上 iPhone14 黑色 优惠
20:07:16.317 | 24 | ForkJoinPool.commonPool-worker-22 | 获取淘宝上 iPhone14 玫瑰红 价格
20:07:16.317 | 25 | ForkJoinPool.commonPool-worker-29 | 获取淘宝上 iPhone14 玫瑰红 优惠
20:07:17.322 | 21 | ForkJoinPool.commonPool-worker-18 | 获取淘宝上 iPhone14 黑色 优惠完成:-200
20:07:17.322 | 20 | ForkJoinPool.commonPool-worker-25 | 获取淘宝上 iPhone14 黑色 价格完成:5199
20:07:17.322 | 22 | ForkJoinPool.commonPool-worker-11 | 获取淘宝上 iPhone14 白色 价格完成:5199
20:07:17.322 | 24 | ForkJoinPool.commonPool-worker-22 | 获取淘宝上 iPhone14 玫瑰红 价格完成:5199
20:07:17.322 | 25 | ForkJoinPool.commonPool-worker-29 | 获取淘宝上 iPhone14 玫瑰红 优惠完成:-200
20:07:17.322 | 23 | ForkJoinPool.commonPool-worker-4 | 获取淘宝上 iPhone14 白色 优惠完成:-200
20:07:17.322 | 25 | ForkJoinPool.commonPool-worker-29 | 淘宝最终价格计算完成:4999
20:07:17.322 | 20 | ForkJoinPool.commonPool-worker-25 | 淘宝最终价格计算完成:4999
20:07:17.322 | 23 | ForkJoinPool.commonPool-worker-4 | 淘宝最终价格计算完成:4999
priceResult = PriceResult{平台='淘宝', 价格=5199, 折扣=200, 最终价=4999}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16