10、JDK 23 新特性:结构化并发(JEP 480):多线程编程的结构化设计与错误处理

写多线程程序的时候,最烦的就是任务管理了,特别是那些需要并发执行多个子任务的场景,每个任务的生命周期、错误处理、取消操作都得自己管,一不小心就出问题。JDK 23的JEP 480整了个新活,引入了结构化并发,让多线程编程变得更有条理,错误处理也更简单了。

鹏磊我之前做项目的时候,经常需要并发调用多个服务,比如同时查询用户信息、订单信息、库存信息,然后合并结果。传统方式得用ExecutorService、Future这些,代码写起来麻烦,错误处理也复杂,有时候任务没执行完就返回了,或者异常没处理好导致资源泄漏。

现在有了结构化并发,可以把相关的并发任务组织成一个工作单元,任务的生命周期被限制在代码块里,自动管理,出错自动取消,省事多了。特别是配合虚拟线程用,性能也好,代码也清晰。

JEP 480 的核心概念

结构化并发就是把不同线程里运行的相关任务,当成一个单一的工作单元来管理。这个工作单元有明确的生命周期,子任务的生命周期被限制在父任务的代码块里,父任务可以等待所有子任务完成,监控它们的执行状态。

结构化并发的原则

结构化并发有几个核心原则:

  1. 生命周期绑定:子任务的生命周期被限制在父任务的代码块里,父任务退出时,所有子任务都会被取消
  2. 自动错误传播:如果子任务出错,错误会自动传播到父任务,不会丢失
  3. 自动资源管理:使用try-with-resources模式,自动管理任务的生命周期
  4. 结构化取消:取消操作是结构化的,取消父任务会自动取消所有子任务
// 传统方式,任务生命周期不好管理
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future1 = executor.submit(() -> fetchUserInfo());
Future<String> future2 = executor.submit(() -> fetchOrderInfo());
try {
    String userInfo = future1.get();  // 可能阻塞
    String orderInfo = future2.get();  // 可能阻塞
    // 如果这里抛异常,future1和future2可能还在运行,资源泄漏
} catch (Exception e) {
    // 错误处理
} finally {
    executor.shutdown();  // 得手动关闭
}

// 结构化并发方式,生命周期自动管理
try (var scope = new StructuredTaskScope<String>()) {
    var userTask = scope.fork(() -> fetchUserInfo());  // 提交任务
    var orderTask = scope.fork(() -> fetchOrderInfo());  // 提交任务
    
    scope.join();  // 等待所有任务完成
    
    String userInfo = userTask.get();  // 获取结果
    String orderInfo = orderTask.get();  // 获取结果
}  // 自动关闭scope,取消未完成的任务

结构化并发让代码更清晰,资源管理也更安全。

StructuredTaskScope API

JEP 480的核心API是StructuredTaskScope,它提供了管理并发任务的框架。主要方法包括:

  • fork(Callable<T> task):提交一个任务并发执行,返回Subtask
  • join():等待所有提交的任务完成
  • joinUntil(Instant deadline):等待任务完成,直到指定的时间
  • shutdown():关闭scope,取消所有运行中的任务
  • close():关闭scope,释放资源
// 基本用法
try (var scope = new StructuredTaskScope<String>()) {
    // 提交多个任务
    var task1 = scope.fork(() -> {
        Thread.sleep(1000);
        return "Task 1 result";
    });
    
    var task2 = scope.fork(() -> {
        Thread.sleep(500);
        return "Task 2 result";
    });
    
    // 等待所有任务完成
    scope.join();
    
    // 获取结果
    String result1 = task1.get();
    String result2 = task2.get();
    
    println("Result 1: " + result1);
    println("Result 2: " + result2);
}  // 自动关闭,取消未完成的任务

错误处理策略

结构化并发提供了两种预定义的错误处理策略:ShutdownOnFailureShutdownOnSuccess

ShutdownOnFailure:遇到失败就停止

ShutdownOnFailure策略是:如果任何一个子任务失败,就立即关闭scope,取消所有其他任务。这适合需要所有任务都成功的场景。

// 使用ShutdownOnFailure策略
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> {
        // 模拟可能失败的任务
        if (Math.random() < 0.5) {
            throw new RuntimeException("Task 1 failed");
        }
        return "Task 1 success";
    });
    
    var task2 = scope.fork(() -> {
        Thread.sleep(1000);
        return "Task 2 success";
    });
    
    scope.join();  // 等待所有任务完成或第一个失败
    
    // 检查是否有失败
    scope.throwIfFailed();  // 如果有失败,抛出异常
    
    // 获取结果
    String result1 = task1.get();
    String result2 = task2.get();
    
    println("All tasks succeeded");
} catch (Exception e) {
    println("Some task failed: " + e.getMessage());
}

如果task1失败了,scope会立即关闭,task2会被取消,不会继续执行。

ShutdownOnSuccess:第一个成功就停止

ShutdownOnSuccess策略是:如果任何一个子任务成功,就立即关闭scope,取消所有其他任务。这适合只需要一个任务成功的场景,比如多个服务提供相同功能,只要有一个成功就行。

// 使用ShutdownOnSuccess策略
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    // 尝试从多个服务获取数据,只要有一个成功就行
    var task1 = scope.fork(() -> {
        Thread.sleep(2000);  // 慢服务
        return fetchFromService1();
    });
    
    var task2 = scope.fork(() -> {
        Thread.sleep(500);  // 快服务
        return fetchFromService2();
    });
    
    var task3 = scope.fork(() -> {
        Thread.sleep(1000);
        return fetchFromService3();
    });
    
    scope.join();  // 等待第一个成功或全部失败
    
    // 获取第一个成功的结果
    String result = scope.result();  // 如果全部失败,会抛异常
    
    println("Got result from one service: " + result);
} catch (Exception e) {
    println("All services failed: " + e.getMessage());
}

如果task2先成功,scope会立即关闭,task1和task3会被取消,节省资源。

自定义错误处理

除了预定义的策略,也可以自定义错误处理逻辑:

// 自定义错误处理
try (var scope = new StructuredTaskScope<String>()) {
    var task1 = scope.fork(() -> fetchUserInfo());
    var task2 = scope.fork(() -> fetchOrderInfo());
    var task3 = scope.fork(() -> fetchInventoryInfo());
    
    scope.join();  // 等待所有任务完成
    
    // 自定义处理逻辑
    List<String> results = new ArrayList<>();
    List<Exception> errors = new ArrayList<>();
    
    for (var task : List.of(task1, task2, task3)) {
        try {
            if (task.state() == Subtask.State.SUCCESS) {
                results.add(task.get());  // 获取成功的结果
            } else if (task.state() == Subtask.State.FAILED) {
                errors.add(task.exception());  // 收集错误
            }
        } catch (Exception e) {
            errors.add(e);
        }
    }
    
    // 根据业务逻辑处理结果和错误
    if (results.size() >= 2) {
        // 至少2个成功,可以继续
        processResults(results);
    } else {
        // 失败太多,抛出异常
        throw new RuntimeException("Too many failures", errors.get(0));
    }
}

与虚拟线程结合

结构化并发特别适合和虚拟线程(JEP 444)结合使用。虚拟线程是轻量级线程,可以在一个操作系统线程上运行大量虚拟线程,性能好,资源占用少。

虚拟线程的优势

虚拟线程的优势:

  1. 轻量级:创建和销毁成本低,可以创建大量虚拟线程
  2. 阻塞友好:阻塞操作不会占用操作系统线程,可以高效处理大量并发
  3. 自动调度:由JVM自动调度,不需要手动管理线程池
// 使用虚拟线程和结构化并发
try (var scope = new StructuredTaskScope<String>()) {
    // 每个任务在虚拟线程上运行
    var tasks = new ArrayList<Subtask<String>>();
    
    // 提交大量任务,每个任务在虚拟线程上运行
    for (int i = 0; i < 1000; i++) {
        final int index = i;
        var task = scope.fork(() -> {
            // 这个任务在虚拟线程上运行
            Thread.sleep(100);  // 阻塞操作,不会占用OS线程
            return "Result " + index;
        });
        tasks.add(task);
    }
    
    scope.join();  // 等待所有任务完成
    
    // 处理结果
    for (var task : tasks) {
        if (task.state() == Subtask.State.SUCCESS) {
            println(task.get());
        }
    }
}

这样可以在一个OS线程上并发执行1000个任务,性能很好。

服务器应用场景

在服务器应用里,结构化并发和虚拟线程配合,可以高效处理大量并发请求:

// 服务器应用示例
void handleRequest(Request request) {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // 并发获取用户相关的各种信息
        var userTask = scope.fork(() -> userService.getUser(request.getUserId()));
        var orderTask = scope.fork(() -> orderService.getOrders(request.getUserId()));
        var profileTask = scope.fork(() -> profileService.getProfile(request.getUserId()));
        
        scope.join();  // 等待所有任务完成
        
        scope.throwIfFailed();  // 如果有失败,抛出异常
        
        // 合并结果
        User user = userTask.get();
        List<Order> orders = orderTask.get();
        Profile profile = profileTask.get();
        
        return new Response(user, orders, profile);
    } catch (Exception e) {
        // 统一错误处理
        return new ErrorResponse(e.getMessage());
    }
}

每个请求可以分配一个虚拟线程,在需要并发执行子任务时,为每个子任务分配新的虚拟线程,高效处理大量并发请求。

实际应用场景

场景1:并发调用多个服务

最常见的场景就是并发调用多个服务,然后合并结果:

// 并发调用多个服务
public UserDashboard getDashboard(String userId) {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // 并发获取各种信息
        var userTask = scope.fork(() -> userService.getUser(userId));
        var ordersTask = scope.fork(() -> orderService.getRecentOrders(userId));
        var notificationsTask = scope.fork(() -> notificationService.getUnreadCount(userId));
        var preferencesTask = scope.fork(() -> preferenceService.getPreferences(userId));
        
        scope.join();  // 等待所有任务完成
        scope.throwIfFailed();  // 检查是否有失败
        
        // 合并结果
        return new UserDashboard(
            userTask.get(),
            ordersTask.get(),
            notificationsTask.get(),
            preferencesTask.get()
        );
    } catch (Exception e) {
        // 统一错误处理
        throw new DashboardException("Failed to load dashboard", e);
    }
}

这样写代码清晰,错误处理也简单。

场景2:快速失败策略

有些场景需要快速失败,只要有一个任务失败就立即停止:

// 快速失败策略
public ValidationResult validateData(Data data) {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // 并发执行多个验证规则
        var rule1Task = scope.fork(() -> validateRule1(data));
        var rule2Task = scope.fork(() -> validateRule2(data));
        var rule3Task = scope.fork(() -> validateRule3(data));
        var rule4Task = scope.fork(() -> validateRule4(data));
        
        scope.join();
        scope.throwIfFailed();  // 如果有任何验证失败,立即抛出异常
        
        // 所有验证都通过
        return new ValidationResult(true, "All validations passed");
    } catch (Exception e) {
        return new ValidationResult(false, e.getMessage());
    }
}

如果任何一个验证规则失败,其他验证会被立即取消,节省资源。

场景3:第一个成功策略

有些场景只需要一个任务成功就行,比如从多个服务获取相同的数据:

// 第一个成功策略
public String fetchData(String key) {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        // 尝试从多个数据源获取数据
        scope.fork(() -> cacheService.get(key));  // 最快,但可能没有
        scope.fork(() -> databaseService.get(key));  // 较慢,但可靠
        scope.fork(() -> remoteService.get(key));  // 最慢,作为备份
        
        scope.join();  // 等待第一个成功
        
        return scope.result();  // 返回第一个成功的结果
    } catch (Exception e) {
        // 所有数据源都失败
        throw new DataFetchException("All data sources failed", e);
    }
}

如果cacheService先返回结果,其他服务会被取消,节省资源。

场景4:超时控制

可以结合超时控制,避免任务执行时间过长:

// 超时控制
public Result processWithTimeout(List<Task> tasks, Duration timeout) {
    try (var scope = new StructuredTaskScope<Result>()) {
        // 提交所有任务
        var subtasks = tasks.stream()
            .map(task -> scope.fork(() -> executeTask(task)))
            .toList();
        
        // 等待任务完成,但有超时限制
        Instant deadline = Instant.now().plus(timeout);
        scope.joinUntil(deadline);
        
        // 检查哪些任务完成了
        List<Result> results = new ArrayList<>();
        for (var subtask : subtasks) {
            if (subtask.state() == Subtask.State.SUCCESS) {
                results.add(subtask.get());
            } else if (subtask.state() == Subtask.State.FAILED) {
                println("Task failed: " + subtask.exception().getMessage());
            } else {
                println("Task cancelled or still running");
            }
        }
        
        return new Result(results);
    }
}

如果超时了,还在运行的任务会被取消。

与传统方式的对比

代码复杂度对比

传统方式用ExecutorService,代码比较复杂:

// 传统方式
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> futures = new ArrayList<>();

try {
    futures.add(executor.submit(() -> fetchData1()));
    futures.add(executor.submit(() -> fetchData2()));
    futures.add(executor.submit(() -> fetchData3()));
    
    List<String> results = new ArrayList<>();
    for (Future<String> future : futures) {
        try {
            results.add(future.get());  // 可能阻塞
        } catch (ExecutionException e) {
            // 错误处理复杂
            executor.shutdownNow();  // 得手动取消其他任务
            throw new RuntimeException(e.getCause());
        }
    }
    
    return results;
} finally {
    executor.shutdown();  // 得手动关闭
    try {
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            executor.shutdownNow();  // 强制关闭
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

结构化并发方式,代码简洁多了:

// 结构化并发方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> fetchData1());
    var task2 = scope.fork(() -> fetchData2());
    var task3 = scope.fork(() -> fetchData3());
    
    scope.join();
    scope.throwIfFailed();
    
    return List.of(task1.get(), task2.get(), task3.get());
}

代码量少了很多,逻辑也更清晰。

错误处理对比

传统方式的错误处理比较复杂,得手动管理:

// 传统方式,错误处理复杂
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> futures = new ArrayList<>();
boolean hasError = false;

try {
    futures.add(executor.submit(() -> fetchData1()));
    futures.add(executor.submit(() -> fetchData2()));
    
    List<String> results = new ArrayList<>();
    for (Future<String> future : futures) {
        try {
            results.add(future.get());
        } catch (ExecutionException e) {
            hasError = true;
            // 得手动取消其他任务
            for (Future<String> f : futures) {
                f.cancel(true);
            }
            throw new RuntimeException(e.getCause());
        }
    }
    
    return results;
} finally {
    executor.shutdown();
    // 复杂的关闭逻辑...
}

结构化并发方式,错误处理自动管理:

// 结构化并发方式,错误处理自动
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> fetchData1());
    var task2 = scope.fork(() -> fetchData2());
    
    scope.join();
    scope.throwIfFailed();  // 自动处理错误,自动取消其他任务
    
    return List.of(task1.get(), task2.get());
}  // 自动关闭,自动取消未完成的任务

错误处理简单多了,资源管理也安全。

启用预览特性

JEP 480是预览特性,得先启用才能用:

# 编译时启用预览特性
javac --enable-preview --release 23 MyClass.java

# 运行时启用预览特性
java --enable-preview MyClass

Maven配置

如果用Maven,可以在pom.xml里配置:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.11.0</version>
            <configuration>
                <release>23</release>
                <compilerArgs>
                    <arg>--enable-preview</arg>
                </compilerArgs>
            </configuration>
        </plugin>
    </plugins>
</build>

模块依赖

如果项目是模块化的,需要在module-info.java里声明依赖:

module com.example.myapp {
    requires java.base;
    // StructuredTaskScope在java.base模块里(JDK 23)
}

最佳实践

1. 使用try-with-resources

一定要用try-with-resources模式,确保scope自动关闭:

// 好的做法:使用try-with-resources
try (var scope = new StructuredTaskScope<String>()) {
    var task = scope.fork(() -> doWork());
    scope.join();
    return task.get();
}  // 自动关闭

// 不好的做法:手动管理
var scope = new StructuredTaskScope<String>();
try {
    var task = scope.fork(() -> doWork());
    scope.join();
    return task.get();
} finally {
    scope.close();  // 容易忘记
}

2. 选择合适的错误处理策略

根据业务需求选择合适的策略:

// 需要所有任务都成功
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // ...
}

// 只需要一个任务成功
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    // ...
}

// 自定义错误处理
try (var scope = new StructuredTaskScope<String>()) {
    // 自己处理错误
}

3. 配合虚拟线程使用

结构化并发和虚拟线程是绝配,配合使用效果更好:

// 配合虚拟线程使用
try (var scope = new StructuredTaskScope<String>()) {
    // 每个任务在虚拟线程上运行
    for (int i = 0; i < 100; i++) {
        scope.fork(() -> doWork());  // 在虚拟线程上运行
    }
    scope.join();
}

4. 避免在scope外使用Subtask

Subtask只能在创建它的scope内使用,不要在scope外使用:

// 错误:在scope外使用Subtask
StructuredTaskScope.Subtask<String> task;
try (var scope = new StructuredTaskScope<String>()) {
    task = scope.fork(() -> doWork());
    scope.join();
}
String result = task.get();  // 错误!scope已经关闭了

// 正确:在scope内使用
try (var scope = new StructuredTaskScope<String>()) {
    var task = scope.fork(() -> doWork());
    scope.join();
    String result = task.get();  // 正确
    return result;
}

总结

JEP 480引入的结构化并发,确实让多线程编程变得更有条理了。特别是错误处理和资源管理,自动处理,不用再手动管理,代码也简洁多了。

鹏磊我觉得这个特性还是挺有用的,特别是那些需要并发执行多个任务的场景,用结构化并发代码清晰,错误处理也简单。配合虚拟线程用,性能也好,资源占用也少。

总的来说,结构化并发是Java并发编程的一个很好的补充,让并发编程变得更安全、更简单。虽然现在还是预览特性,但是未来应该会稳定下来,成为Java并发编程的常用工具。

本文章最后更新于 2025-11-28