多线程工具 CompletableFuture 使用指南
- Published on
- Published on
- /6 mins read/---
CompletableFuture 简介
CompletableFuture
是 java.util.concurrent
包中的一个类,用于支持异步计算。它扩展了 Future,并新增了大量方法,使得异步操作更具可控性。
传统 Future 的局限性:
- Future.get() 是阻塞的,不能进行回调。
- Future 不能组合多个异步任务。
- Future 不能手动完成(手动设置结果)。
CompletableFuture 解决方案:
- 提供 thenApply()、thenAccept() 等方法实现链式回调。
- 允许多个 CompletableFuture 组合,例如 thenCombine()、allOf() 等。
- 提供 complete() 方法手动完成任务。
CompletableFuture 的基本使用
创建CompletableFuture
CompletableFuture
提供了多种创建异步任务的方式:
- 使用 runAsync 执行无返回值的任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步执行的任务
System.out.println("Running in a separate thread");
});
- 使用 supplyAsync 执行有返回值的任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步执行的任务
return "Result of the asynchronous computation";
});
获取任务结果
CompletableFuture
提供了 get()
方法来获取任务的结果,但这个方法会阻塞当前线程直到任务完成。为了避免阻塞,可以使用 join()
方法,它也会等待任务完成,但在任务未完成时不会抛出 InterruptedException。
String result = future.get(); // 阻塞直到任务完成
String result = future.join(); // 阻塞直到任务完成,但不抛出InterruptedException
任务处理完成后的操作
CompletableFuture
提供了多种方法来处理任务完成后的操作,比如 thenApply
、thenAccept
、thenRun
等。
- thenApply:处理任务结果并返回新的结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World");
System.out.println(future.join()); // 输出 "Hello World"
- thenAccept:消费任务结果但不返回新结果
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println(s + " World")); // 输出 "Hello World"
- thenRun:在任务完成后执行一个操作,不关心任务结果
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task completed")); // 输出 "Task completed"
异常处理
- exceptionally:捕获异常并返回默认值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Error occurred");
return "Hello";
}).exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return "Default Value";
});
System.out.println(future.join()); // 输出 "Default Value"
- handle:无论任务是否成功,都会执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Error occurred");
return "Hello";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("Exception: " + ex.getMessage());
return "Default Value";
}
return result;
});
System.out.println(future.join()); // 输出 "Default Value"
注意事项
线程池的使用
默认情况下,CompletableFuture
使用 ForkJoinPool.commonPool()
来执行任务。如果任务较多或任务执行时间较长,可能会导致线程池资源耗尽。因此,建议根据实际情况自定义线程池。
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步执行的任务
return "Result";
}, executor);
异常处理
CompletableFuture
的异常处理非常重要,尤其是在链式调用中。如果没有正确处理异常,可能会导致任务链中断,甚至出现未捕获的异常。
避免阻塞
CompletableFuture
的设计初衷是异步编程,因此尽量避免使用 get()
方法阻塞当前线程。可以使用 thenApply
、thenAccept
等方法来处理任务结果。
示例
在日常开发中,我们经常会遇到需要并发下发同一接口的场景。例如,在分布式系统中,一次配置请求可能需要同时下发到多个节点的设备接口,并等待所有节点执行完毕后返回最终结果。借助 CompletableFuture 的链式调用,我们可以优雅地实现并发请求,显著提高执行效率。
private static final List<String> NODE_LIST = Arrays.asList("江苏", "浙江", "上海");
private static final int THREAD_POOL_SIZE = 10;
private static final long CONFIG_DELAY_MS = 3000;
private static boolean dispatchConfig() {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<CompletableFuture<Boolean>> futures = NODE_LIST.stream()
.map(node -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(CONFIG_DELAY_MS);
log.info("[{}] 设备下发配置成功", node);
return true;
} catch (Exception e) {
log.error("[{}] 设备下发配置异常", node, e);
return false;
}
}, executor).exceptionally(e -> {
log.error("[{}] 设备下发配置异常", node, e);
return false;
}))
.collect(Collectors.toList());
CompletableFuture<Boolean> resultFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().allMatch(CompletableFuture::join));
executor.shutdown();
return resultFuture.join();
}
public static void main(String[] args) {
log.info("开始下发设备配置");
log.info("全部设备下发完成,结果[{}]" ,dispatchConfig());
}
Happy coding!