兄弟们,鹏磊今天来聊聊 Java 22 里的结构化并发(Structured Concurrency)这个新特性,这玩意儿是 JEP 462 引入的,专门用来简化多线程编程的。说实话,写多线程代码这么多年了,最头疼的就是线程管理、异常处理、资源清理这些破事儿,一个不小心就内存泄漏、线程泄漏,调试起来贼费劲。结构化并发就是为了解决这些问题来的,它把一组相关的并发任务当成一个工作单元来管理,这样代码更清晰、更可靠,也更容易观察和调试。
结构化并发是 Java 22 的预览特性,它提供了一种结构化的方式来组织和管理并发任务。核心思想是:如果任务 A 创建了任务 B 和任务 C,那么任务 A 必须等待 B 和 C 都完成(或者失败)才能结束。这种结构化的方式让并发代码的依赖关系更清晰,错误处理更简单,而且能自动处理线程泄漏问题。
为什么需要结构化并发
先说说传统多线程编程的问题。咱们平时写并发代码,最常用的就是 ExecutorService、Future、CompletableFuture 这些玩意儿,虽然功能强大,但确实有不少坑:
// 传统方式的并发任务执行
ExecutorService executor = Executors.newCachedThreadPool(); // 创建线程池
// 提交多个任务
Future<String> future1 = executor.submit(() -> fetchData("url1")); // 提交第一个任务
Future<String> future2 = executor.submit(() -> fetchData("url2")); // 提交第二个任务
Future<String> future3 = executor.submit(() -> fetchData("url3")); // 提交第三个任务
try {
// 获取结果
String result1 = future1.get(); // 等待第一个任务完成
String result2 = future2.get(); // 等待第二个任务完成
String result3 = future3.get(); // 等待第三个任务完成
// 处理结果
processResults(result1, result2, result3); // 处理所有结果
} catch (ExecutionException e) {
// 异常处理很麻烦,得一个个检查
System.err.println("任务执行失败: " + e.getCause()); // 输出异常信息
} finally {
// 问题1:容易忘记关闭线程池,导致资源泄漏
executor.shutdown(); // 必须手动关闭,否则线程会一直存在
}
// 问题2:如果某个任务失败,其他任务可能还在运行,浪费资源
// 问题3:异常处理复杂,得分别处理每个 Future 的异常
// 问题4:线程泄漏风险,如果忘记关闭 executor,线程会一直存在
结构化并发就是为了解决这些问题。它通过结构化的作用域来管理并发任务,确保所有任务都在同一个作用域内完成,作用域结束时自动清理资源,而且异常处理更简单。
核心概念:StructuredTaskScope
结构化并发的核心是 StructuredTaskScope 类,它位于 java.util.concurrent 包里。这个类允许你把一组并发子任务当成一个单元来协调管理。
基本用法
StructuredTaskScope 的基本用法很简单,在 try-with-resources 语句里创建作用域,然后 fork 子任务,最后 join 等待所有任务完成:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.Callable; // 导入可调用接口
import java.util.concurrent.ExecutionException; // 导入执行异常类
// 使用结构化并发执行多个任务
public String fetchUserData(String userId) throws InterruptedException {
// 在 try-with-resources 中创建作用域,作用域结束时自动关闭
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域,任何一个任务失败就关闭
// 定义子任务,每个任务返回不同的数据
Callable<String> userInfoTask = () -> fetchUserInfo(userId); // 获取用户基本信息
Callable<String> userOrdersTask = () -> fetchUserOrders(userId); // 获取用户订单
Callable<String> userPreferencesTask = () -> fetchUserPreferences(userId); // 获取用户偏好设置
// Fork 子任务,每个任务在独立的线程中运行
var userInfoFuture = scope.fork(userInfoTask); // Fork 第一个任务,返回 Future
var userOrdersFuture = scope.fork(userOrdersTask); // Fork 第二个任务
var userPreferencesFuture = scope.fork(userPreferencesTask); // Fork 第三个任务
// Join 等待所有任务完成(或者任何一个失败)
scope.join(); // 等待所有任务完成,如果任何一个失败会抛出异常
// 检查是否有任务失败
scope.throwIfFailed(); // 如果有任务失败,抛出异常
// 获取所有任务的结果
String userInfo = userInfoFuture.get(); // 获取用户信息
String userOrders = userOrdersFuture.get(); // 获取订单信息
String userPreferences = userPreferencesFuture.get(); // 获取偏好设置
// 组合结果
return combineResults(userInfo, userOrders, userPreferences); // 组合所有结果返回
}
// 作用域结束时,所有未完成的任务会自动取消,线程会自动清理,不用担心资源泄漏
}
这玩意儿的好处是,所有任务都在同一个作用域内,作用域结束时自动清理,不用担心线程泄漏。而且如果任何一个任务失败,可以立即取消其他任务,避免浪费资源。
ShutdownOnFailure:任何一个失败就关闭
ShutdownOnFailure 是 StructuredTaskScope 的一个子类,它的策略是:如果任何一个子任务失败,就立即关闭作用域,取消所有未完成的任务。这适合需要所有任务都成功的场景。
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException; // 导入执行异常类
import java.time.Instant; // 导入时间类
// 使用 ShutdownOnFailure 执行多个必须全部成功的任务
public String processOrder(String orderId) throws InterruptedException {
Instant deadline = Instant.now().plusSeconds(30); // 设置30秒超时时间
// 创建 ShutdownOnFailure 作用域,任何一个任务失败就关闭
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务
var validateTask = scope.fork(() -> validateOrder(orderId)); // 验证订单
var checkInventoryTask = scope.fork(() -> checkInventory(orderId)); // 检查库存
var calculatePriceTask = scope.fork(() -> calculatePrice(orderId)); // 计算价格
var checkPaymentTask = scope.fork(() -> checkPayment(orderId)); // 检查支付
// 等待所有任务完成,或者超时
scope.joinUntil(deadline); // 等待所有任务完成,或者到达截止时间
// 检查是否有任务失败,如果有就抛出异常
scope.throwIfFailed(e -> new OrderProcessingException("订单处理失败", e)); // 如果有失败,抛出业务异常
// 所有任务都成功了,获取结果
String validation = validateTask.get(); // 获取验证结果
String inventory = checkInventoryTask.get(); // 获取库存检查结果
String price = calculatePriceTask.get(); // 获取价格计算结果
String payment = checkPaymentTask.get(); // 获取支付检查结果
// 处理所有结果
return processOrderResults(validation, inventory, price, payment); // 处理所有结果
}
// 如果任何一个任务失败,作用域会自动取消其他任务,然后关闭
}
这个场景下,如果验证订单失败,就不需要继续检查库存、计算价格了,直接取消所有任务,节省资源。
ShutdownOnSuccess:第一个成功就关闭
ShutdownOnSuccess 是另一个子类,它的策略是:只要有一个子任务成功,就立即关闭作用域,取消其他未完成的任务。这适合只需要一个结果就够的场景,比如从多个数据源获取数据,只要有一个成功就行。
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException; // 导入执行异常类
// 使用 ShutdownOnSuccess 从多个数据源获取数据,只要一个成功就行
public String fetchDataFromMultipleSources(String query) throws InterruptedException {
// 创建 ShutdownOnSuccess 作用域,第一个任务成功就关闭
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { // 创建作用域,泛型是结果类型
// Fork 多个子任务,从不同的数据源获取数据
scope.fork(() -> fetchFromDatabase(query)); // 从数据库获取
scope.fork(() -> fetchFromCache(query)); // 从缓存获取
scope.fork(() -> fetchFromAPI(query)); // 从 API 获取
scope.fork(() -> fetchFromFile(query)); // 从文件获取
// 等待第一个任务成功
scope.join(); // 等待第一个任务成功,然后自动取消其他任务
// 获取第一个成功的结果
String result = scope.result(e -> new DataFetchException("所有数据源都失败了", e)); // 获取结果,如果都失败就抛出异常
return result; // 返回第一个成功的结果
}
// 第一个任务成功后,其他任务会自动取消,作用域关闭
}
这个场景下,如果从缓存获取数据成功了,就不需要继续从数据库、API、文件获取了,直接取消其他任务,提高响应速度。
实际应用场景
场景1:并行查询多个数据源
最常见的场景就是并行查询多个数据源,然后组合结果。比如查询用户信息,需要同时从数据库、缓存、外部 API 获取数据:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.List; // 导入列表类
import java.util.ArrayList; // 导入数组列表类
// 用户信息记录
record UserData(String basicInfo, String orders, String preferences) {} // 用户数据记录,包含基本信息、订单、偏好
// 并行查询用户数据
public UserData fetchUserDataParallel(String userId) throws InterruptedException {
// 创建 ShutdownOnFailure 作用域,需要所有数据都成功
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 三个子任务,并行查询
var basicInfoTask = scope.fork(() -> { // Fork 第一个任务
// 模拟从数据库查询用户基本信息
Thread.sleep(100); // 模拟网络延迟
return "用户ID: " + userId + ", 姓名: 张三"; // 返回基本信息
});
var ordersTask = scope.fork(() -> { // Fork 第二个任务
// 模拟从订单服务查询订单
Thread.sleep(150); // 模拟网络延迟
return "订单1, 订单2, 订单3"; // 返回订单信息
});
var preferencesTask = scope.fork(() -> { // Fork 第三个任务
// 模拟从偏好服务查询偏好设置
Thread.sleep(120); // 模拟网络延迟
return "主题: 暗色, 语言: 中文"; // 返回偏好设置
});
// 等待所有任务完成
scope.join(); // 等待所有任务完成
// 检查是否有任务失败
scope.throwIfFailed(e -> new RuntimeException("查询用户数据失败", e)); // 如果有失败,抛出异常
// 获取所有结果
String basicInfo = basicInfoTask.get(); // 获取基本信息
String orders = ordersTask.get(); // 获取订单信息
String preferences = preferencesTask.get(); // 获取偏好设置
// 组合结果
return new UserData(basicInfo, orders, preferences); // 返回组合后的用户数据
}
// 作用域结束时,所有任务都已完成或已取消,资源自动清理
}
这样写的好处是,三个查询任务并行执行,总耗时是三个任务中最慢的那个,而不是三个任务的时间相加。而且如果任何一个任务失败,其他任务会自动取消,不会浪费资源。
场景2:快速失败:第一个成功就返回
有时候我们只需要一个结果就够了,比如从多个镜像服务器下载文件,只要一个成功就行:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.io.IOException; // 导入 IO 异常类
// 从多个镜像服务器下载文件,只要一个成功就行
public byte[] downloadFileFast(String fileName) throws InterruptedException {
// 创建 ShutdownOnSuccess 作用域,第一个成功就关闭
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<byte[]>()) { // 创建作用域,结果类型是字节数组
// Fork 多个子任务,从不同的镜像服务器下载
scope.fork(() -> downloadFromMirror1(fileName)); // 从镜像1下载
scope.fork(() -> downloadFromMirror2(fileName)); // 从镜像2下载
scope.fork(() -> downloadFromMirror3(fileName)); // 从镜像3下载
scope.fork(() -> downloadFromMirror4(fileName)); // 从镜像4下载
// 等待第一个任务成功
scope.join(); // 等待第一个任务成功
// 获取第一个成功的结果
byte[] fileData = scope.result(e -> { // 获取结果
// 如果所有任务都失败,抛出异常
throw new IOException("所有镜像服务器都下载失败", e); // 抛出 IO 异常
});
return fileData; // 返回第一个成功下载的文件数据
}
// 第一个任务成功后,其他任务会自动取消,节省带宽和资源
}
// 模拟从镜像服务器下载文件
private byte[] downloadFromMirror1(String fileName) throws IOException {
// 模拟下载逻辑
Thread.sleep(200); // 模拟网络延迟
return ("文件内容 from mirror1: " + fileName).getBytes(); // 返回文件数据
}
private byte[] downloadFromMirror2(String fileName) throws IOException {
Thread.sleep(150); // 模拟网络延迟,这个更快
return ("文件内容 from mirror2: " + fileName).getBytes(); // 返回文件数据
}
private byte[] downloadFromMirror3(String fileName) throws IOException {
Thread.sleep(300); // 模拟网络延迟,这个最慢
return ("文件内容 from mirror3: " + fileName).getBytes(); // 返回文件数据
}
private byte[] downloadFromMirror4(String fileName) throws IOException {
Thread.sleep(250); // 模拟网络延迟
return ("文件内容 from mirror4: " + fileName).getBytes(); // 返回文件数据
}
这个场景下,如果镜像2最快下载成功,其他三个镜像的下载任务会自动取消,节省带宽和服务器资源。
场景3:超时控制
结构化并发还支持超时控制,可以设置一个截止时间,如果在这个时间之前任务还没完成,就取消所有任务:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.time.Instant; // 导入时间类
import java.time.Duration; // 导入时长类
// 带超时的并行任务执行
public String fetchDataWithTimeout(String query) throws InterruptedException {
// 设置5秒超时
Instant deadline = Instant.now().plus(Duration.ofSeconds(5)); // 计算截止时间
// 创建 ShutdownOnFailure 作用域
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务
var task1 = scope.fork(() -> slowQuery(query)); // Fork 第一个任务,可能很慢
var task2 = scope.fork(() -> fastQuery(query)); // Fork 第二个任务,比较快
var task3 = scope.fork(() -> mediumQuery(query)); // Fork 第三个任务,中等速度
// 等待所有任务完成,或者超时
scope.joinUntil(deadline); // 等待所有任务完成,或者到达截止时间
// 检查是否超时
if (scope.isShutdown()) { // 如果作用域已关闭(可能是超时或失败)
throw new TimeoutException("任务执行超时"); // 抛出超时异常
}
// 检查是否有任务失败
scope.throwIfFailed(e -> new RuntimeException("任务执行失败", e)); // 如果有失败,抛出异常
// 获取所有结果
String result1 = task1.get(); // 获取第一个任务的结果
String result2 = task2.get(); // 获取第二个任务的结果
String result3 = task3.get(); // 获取第三个任务的结果
// 组合结果
return result1 + " | " + result2 + " | " + result3; // 组合所有结果
}
// 如果超时,所有未完成的任务会自动取消
}
// 模拟慢查询
private String slowQuery(String query) throws InterruptedException {
Thread.sleep(6000); // 模拟6秒延迟,会超时
return "慢查询结果"; // 返回结果
}
// 模拟快查询
private String fastQuery(String query) throws InterruptedException {
Thread.sleep(1000); // 模拟1秒延迟
return "快查询结果"; // 返回结果
}
// 模拟中等速度查询
private String mediumQuery(String query) throws InterruptedException {
Thread.sleep(3000); // 模拟3秒延迟
return "中等查询结果"; // 返回结果
}
这个场景下,如果5秒内所有任务都完成了,就正常返回结果;如果5秒到了还有任务没完成,就取消所有任务,抛出超时异常。
场景4:异常处理和错误传播
结构化并发让异常处理变得更简单,所有子任务的异常都会自动传播到主任务:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException; // 导入执行异常类
// 处理异常和错误传播
public void processWithErrorHandling(String data) throws InterruptedException {
// 创建 ShutdownOnFailure 作用域
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务
var validateTask = scope.fork(() -> validateData(data)); // 验证数据
var processTask = scope.fork(() -> processData(data)); // 处理数据
var saveTask = scope.fork(() -> saveData(data)); // 保存数据
// 等待所有任务完成
scope.join(); // 等待所有任务完成
// 检查是否有任务失败,如果有就抛出异常
scope.throwIfFailed(e -> { // 如果有失败,执行这个函数
// 可以根据异常类型做不同的处理
if (e instanceof ValidationException) { // 如果是验证异常
return new BusinessException("数据验证失败", e); // 转换为业务异常
} else if (e instanceof ProcessingException) { // 如果是处理异常
return new BusinessException("数据处理失败", e); // 转换为业务异常
} else {
return new RuntimeException("未知错误", e); // 转换为运行时异常
}
});
// 所有任务都成功了,继续处理
String validated = validateTask.get(); // 获取验证结果
String processed = processTask.get(); // 获取处理结果
String saved = saveTask.get(); // 获取保存结果
// 继续后续处理
System.out.println("所有任务都成功了"); // 输出成功信息
} catch (BusinessException e) { // 捕获业务异常
// 处理业务异常
System.err.println("业务处理失败: " + e.getMessage()); // 输出错误信息
throw e; // 重新抛出异常
}
// 如果任何一个任务失败,其他任务会自动取消,作用域关闭
}
// 模拟验证数据
private String validateData(String data) {
if (data == null || data.isEmpty()) { // 如果数据为空
throw new ValidationException("数据不能为空"); // 抛出验证异常
}
return "验证通过: " + data; // 返回验证结果
}
// 模拟处理数据
private String processData(String data) {
// 模拟处理逻辑
return "处理后的数据: " + data.toUpperCase(); // 返回处理结果
}
// 模拟保存数据
private String saveData(String data) {
// 模拟保存逻辑
return "保存成功: " + data; // 返回保存结果
}
// 自定义异常类
class ValidationException extends RuntimeException {
public ValidationException(String message) {
super(message); // 调用父类构造函数
}
}
class ProcessingException extends RuntimeException {
public ProcessingException(String message) {
super(message); // 调用父类构造函数
}
}
class BusinessException extends RuntimeException {
public BusinessException(String message, Throwable cause) {
super(message, cause); // 调用父类构造函数
}
}
这个场景下,如果验证数据失败,其他任务(处理数据、保存数据)会自动取消,然后抛出业务异常,异常处理逻辑很清晰。
与虚拟线程的配合
结构化并发和虚拟线程(Virtual Threads)是绝配。虚拟线程是 Java 21 引入的,它让创建大量线程变得非常轻量级。结构化并发可以很好地管理虚拟线程,确保它们不会泄漏:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.Executor; // 导入执行器接口
import java.util.concurrent.Executors; // 导入执行器工具类
// 使用虚拟线程执行结构化并发任务
public void processWithVirtualThreads(List<String> items) throws InterruptedException {
// 创建虚拟线程执行器
try (Executor executor = Executors.newVirtualThreadPerTaskExecutor()) { // 创建虚拟线程执行器
// 创建 ShutdownOnFailure 作用域,使用虚拟线程执行器
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 大量子任务,每个任务在虚拟线程中运行
var futures = items.stream() // 将列表转为流
.map(item -> scope.fork(() -> processItem(item))) // 为每个项目创建任务
.toList(); // 转为列表
// 等待所有任务完成
scope.join(); // 等待所有任务完成
// 检查是否有任务失败
scope.throwIfFailed(e -> new RuntimeException("处理失败", e)); // 如果有失败,抛出异常
// 获取所有结果
var results = futures.stream() // 将 Future 列表转为流
.map(future -> {
try {
return future.get(); // 获取每个任务的结果
} catch (ExecutionException e) {
throw new RuntimeException(e); // 转换为运行时异常
}
})
.toList(); // 转为列表
// 处理所有结果
System.out.println("处理了 " + results.size() + " 个项目"); // 输出处理数量
}
// 作用域结束时,所有虚拟线程会自动清理,不用担心线程泄漏
}
// 执行器关闭时,所有虚拟线程都会自动清理
}
// 模拟处理单个项目
private String processItem(String item) throws InterruptedException {
Thread.sleep(100); // 模拟处理时间
return "处理完成: " + item; // 返回处理结果
}
这个场景下,即使创建了成千上万个虚拟线程,结构化并发也能确保它们都在同一个作用域内,作用域结束时自动清理,不会泄漏。
自定义 StructuredTaskScope
除了使用 ShutdownOnFailure 和 ShutdownOnSuccess,我们还可以自定义 StructuredTaskScope,实现自己的关闭策略:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.Future; // 导入 Future 接口
// 自定义 StructuredTaskScope,实现自定义关闭策略
class CustomTaskScope<T> extends StructuredTaskScope<T> { // 继承 StructuredTaskScope
private int successCount = 0; // 成功任务计数
private int failureCount = 0; // 失败任务计数
private final int requiredSuccesses; // 需要的成功数量
// 构造函数
public CustomTaskScope(int requiredSuccesses) { // 构造函数,传入需要的成功数量
this.requiredSuccesses = requiredSuccesses; // 保存需要的成功数量
}
// 重写 handleComplete 方法,实现自定义关闭策略
@Override
protected void handleComplete(Future<T> future) { // 处理任务完成
try {
future.get(); // 获取任务结果
successCount++; // 成功计数加1
// 如果达到需要的成功数量,关闭作用域
if (successCount >= requiredSuccesses) { // 如果成功数量达到要求
shutdown(); // 关闭作用域,取消其他任务
}
} catch (Exception e) { // 如果任务失败
failureCount++; // 失败计数加1
// 如果失败太多,关闭作用域
if (failureCount > 3) { // 如果失败超过3次
shutdown(); // 关闭作用域
}
}
}
// 获取成功数量
public int getSuccessCount() {
return successCount; // 返回成功数量
}
// 获取失败数量
public int getFailureCount() {
return failureCount; // 返回失败数量
}
}
// 使用自定义的 StructuredTaskScope
public void useCustomTaskScope() throws InterruptedException {
// 创建自定义作用域,需要至少2个任务成功
try (var scope = new CustomTaskScope<String>(2)) { // 创建自定义作用域
// Fork 多个子任务
scope.fork(() -> task1()); // Fork 第一个任务
scope.fork(() -> task2()); // Fork 第二个任务
scope.fork(() -> task3()); // Fork 第三个任务
scope.fork(() -> task4()); // Fork 第四个任务
// 等待作用域关闭(达到成功数量或失败太多)
scope.join(); // 等待作用域关闭
// 检查结果
System.out.println("成功: " + scope.getSuccessCount()); // 输出成功数量
System.out.println("失败: " + scope.getFailureCount()); // 输出失败数量
}
// 作用域结束时,所有任务都已完成或已取消
}
// 模拟任务
private String task1() throws InterruptedException {
Thread.sleep(100); // 模拟处理时间
return "任务1完成"; // 返回结果
}
private String task2() throws InterruptedException {
Thread.sleep(150); // 模拟处理时间
return "任务2完成"; // 返回结果
}
private String task3() throws InterruptedException {
Thread.sleep(200); // 模拟处理时间
throw new RuntimeException("任务3失败"); // 抛出异常,模拟失败
}
private String task4() throws InterruptedException {
Thread.sleep(250); // 模拟处理时间
return "任务4完成"; // 返回结果
}
这个自定义作用域的策略是:如果至少有2个任务成功,就关闭作用域;如果失败超过3次,也关闭作用域。这样可以根据业务需求灵活控制任务的执行。
最佳实践
使用结构化并发时,有几个最佳实践需要注意:
1. 总是使用 try-with-resources
StructuredTaskScope 实现了 AutoCloseable 接口,必须使用 try-with-resources 来确保资源正确清理:
// ✅ 正确:使用 try-with-resources
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 使用 try-with-resources
// 使用作用域
}
// 作用域自动关闭,资源自动清理
// ❌ 错误:手动管理资源
var scope = new StructuredTaskScope.ShutdownOnFailure(); // 手动创建
// 使用作用域
// 容易忘记关闭,导致资源泄漏
2. 合理选择关闭策略
根据业务需求选择合适的关闭策略:
// 需要所有任务都成功:使用 ShutdownOnFailure
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 所有任务都必须成功
// Fork 任务
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查是否有失败
}
// 只需要一个任务成功:使用 ShutdownOnSuccess
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { // 只需要一个成功
// Fork 任务
scope.join(); // 等待第一个任务成功
String result = scope.result(); // 获取结果
}
3. 正确处理异常
使用 throwIfFailed 或 result 方法来处理异常,不要直接调用 Future.get():
// ✅ 正确:使用 throwIfFailed 处理异常
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
var future = scope.fork(() -> task()); // Fork 任务
scope.join(); // 等待任务完成
scope.throwIfFailed(e -> new BusinessException("任务失败", e)); // 统一处理异常
String result = future.get(); // 获取结果
}
// ❌ 错误:直接调用 Future.get(),异常处理复杂
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
var future = scope.fork(() -> task()); // Fork 任务
scope.join(); // 等待任务完成
try {
String result = future.get(); // 直接获取结果,异常处理复杂
} catch (ExecutionException e) { // 需要单独处理异常
// 异常处理
}
}
4. 与虚拟线程配合使用
结构化并发和虚拟线程是绝配,可以创建大量轻量级任务:
// 使用虚拟线程执行器
try (Executor executor = Executors.newVirtualThreadPerTaskExecutor()) { // 创建虚拟线程执行器
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 大量任务,每个任务在虚拟线程中运行
for (int i = 0; i < 10000; i++) { // 创建10000个任务
scope.fork(() -> processItem(i)); // Fork 任务
}
scope.join(); // 等待所有任务完成
}
}
总结
结构化并发(JEP 462)是 Java 22 引入的一个重要特性,它通过结构化的方式管理并发任务,让多线程编程变得更简单、更可靠。核心优势包括:
- 结构化管理:把一组相关的并发任务当成一个工作单元来管理,依赖关系更清晰
- 自动资源清理:作用域结束时自动清理所有资源,不用担心线程泄漏
- 简化异常处理:统一的异常处理机制,让错误处理更简单
- 灵活的关闭策略:
ShutdownOnFailure和ShutdownOnSuccess满足不同场景需求 - 与虚拟线程配合:可以创建大量轻量级任务,不用担心资源问题
虽然结构化并发还在预览阶段,但它的设计理念和 API 已经相当成熟了。如果你正在写多线程代码,特别是需要管理多个并发任务的场景,强烈建议试试结构化并发,它能让你的代码更清晰、更可靠。
兄弟们,今天就聊到这里。结构化并发这玩意儿确实能简化多线程编程,但也要注意合理使用,根据业务需求选择合适的关闭策略。有啥问题欢迎留言讨论,鹏磊会继续分享 Java 22 的其他新特性。