Kanade's Dev Journal

多线程工具 CompletableFuture 使用指南

Published on
Published on
/6 mins read/---

CompletableFuture 简介

CompletableFuturejava.util.concurrent 包中的一个类,用于支持异步计算。它扩展了 Future,并新增了大量方法,使得异步操作更具可控性。

传统 Future 的局限性:

  • Future.get() 是阻塞的,不能进行回调。
  • Future 不能组合多个异步任务。
  • Future 不能手动完成(手动设置结果)。

CompletableFuture 解决方案:

  • 提供 thenApply()、thenAccept() 等方法实现链式回调。
  • 允许多个 CompletableFuture 组合,例如 thenCombine()、allOf() 等。
  • 提供 complete() 方法手动完成任务。

CompletableFuture 的基本使用

创建CompletableFuture

CompletableFuture 提供了多种创建异步任务的方式:

  • 使用 runAsync 执行无返回值的任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 异步执行的任务
    System.out.println("Running in a separate thread");
});
  • 使用 supplyAsync 执行有返回值的任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步执行的任务
    return "Result of the asynchronous computation";
});

获取任务结果

CompletableFuture 提供了 get() 方法来获取任务的结果,但这个方法会阻塞当前线程直到任务完成。为了避免阻塞,可以使用 join() 方法,它也会等待任务完成,但在任务未完成时不会抛出 InterruptedException。

String result = future.get(); // 阻塞直到任务完成
String result = future.join(); // 阻塞直到任务完成,但不抛出InterruptedException

任务处理完成后的操作

CompletableFuture 提供了多种方法来处理任务完成后的操作,比如 thenApplythenAcceptthenRun 等。

  • thenApply:处理任务结果并返回新的结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World");
 
System.out.println(future.join()); // 输出 "Hello World"
  • thenAccept:消费任务结果但不返回新结果
CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println(s + " World")); // 输出 "Hello World"
  • thenRun:在任务完成后执行一个操作,不关心任务结果
CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("Task completed")); // 输出 "Task completed"

异常处理

  • exceptionally:捕获异常并返回默认值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Error occurred");
    return "Hello";
}).exceptionally(ex -> {
    System.out.println("Exception: " + ex.getMessage());
    return "Default Value";
});
 
System.out.println(future.join()); // 输出 "Default Value"
  • handle:无论任务是否成功,都会执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Error occurred");
    return "Hello";
}).handle((result, ex) -> {
    if (ex != null) {
        System.out.println("Exception: " + ex.getMessage());
        return "Default Value";
    }
    return result;
});
 
System.out.println(future.join()); // 输出 "Default Value"

注意事项

线程池的使用

默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 来执行任务。如果任务较多或任务执行时间较长,可能会导致线程池资源耗尽。因此,建议根据实际情况自定义线程池。

ExecutorService executor = Executors.newFixedThreadPool(10);
 
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步执行的任务
    return "Result";
}, executor);

异常处理

CompletableFuture 的异常处理非常重要,尤其是在链式调用中。如果没有正确处理异常,可能会导致任务链中断,甚至出现未捕获的异常。

避免阻塞

CompletableFuture 的设计初衷是异步编程,因此尽量避免使用 get() 方法阻塞当前线程。可以使用 thenApplythenAccept 等方法来处理任务结果。

示例

在日常开发中,我们经常会遇到需要并发下发同一接口的场景。例如,在分布式系统中,一次配置请求可能需要同时下发到多个节点的设备接口,并等待所有节点执行完毕后返回最终结果。借助 CompletableFuture 的链式调用,我们可以优雅地实现并发请求,显著提高执行效率。

private static final List<String> NODE_LIST = Arrays.asList("江苏", "浙江", "上海");
private static final int THREAD_POOL_SIZE = 10;
private static final long CONFIG_DELAY_MS = 3000;
 
private static boolean dispatchConfig() {
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    List<CompletableFuture<Boolean>> futures = NODE_LIST.stream()
            .map(node -> CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(CONFIG_DELAY_MS);
                    log.info("[{}] 设备下发配置成功", node);
                    return true;
                } catch (Exception e) {
                    log.error("[{}] 设备下发配置异常", node, e);
                    return false;
                }
            }, executor).exceptionally(e -> {
                log.error("[{}] 设备下发配置异常", node, e);
                return false;
            }))
            .collect(Collectors.toList());
 
    CompletableFuture<Boolean> resultFuture = CompletableFuture
            .allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream().allMatch(CompletableFuture::join));
 
    executor.shutdown();
    return resultFuture.join();
}
 
public static void main(String[] args) {
    log.info("开始下发设备配置");
    log.info("全部设备下发完成,结果[{}]" ,dispatchConfig());
}
 

Happy coding!