写多线程程序的时候,最烦的就是任务管理了,特别是那些需要并发执行多个子任务的场景,每个任务的生命周期、错误处理、取消操作都得自己管,一不小心就出问题。JDK 23的JEP 480整了个新活,引入了结构化并发,让多线程编程变得更有条理,错误处理也更简单了。
鹏磊我之前做项目的时候,经常需要并发调用多个服务,比如同时查询用户信息、订单信息、库存信息,然后合并结果。传统方式得用ExecutorService、Future这些,代码写起来麻烦,错误处理也复杂,有时候任务没执行完就返回了,或者异常没处理好导致资源泄漏。
现在有了结构化并发,可以把相关的并发任务组织成一个工作单元,任务的生命周期被限制在代码块里,自动管理,出错自动取消,省事多了。特别是配合虚拟线程用,性能也好,代码也清晰。
JEP 480 的核心概念
结构化并发就是把不同线程里运行的相关任务,当成一个单一的工作单元来管理。这个工作单元有明确的生命周期,子任务的生命周期被限制在父任务的代码块里,父任务可以等待所有子任务完成,监控它们的执行状态。
结构化并发的原则
结构化并发有几个核心原则:
- 生命周期绑定:子任务的生命周期被限制在父任务的代码块里,父任务退出时,所有子任务都会被取消
- 自动错误传播:如果子任务出错,错误会自动传播到父任务,不会丢失
- 自动资源管理:使用try-with-resources模式,自动管理任务的生命周期
- 结构化取消:取消操作是结构化的,取消父任务会自动取消所有子任务
// 传统方式,任务生命周期不好管理
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future1 = executor.submit(() -> fetchUserInfo());
Future<String> future2 = executor.submit(() -> fetchOrderInfo());
try {
String userInfo = future1.get(); // 可能阻塞
String orderInfo = future2.get(); // 可能阻塞
// 如果这里抛异常,future1和future2可能还在运行,资源泄漏
} catch (Exception e) {
// 错误处理
} finally {
executor.shutdown(); // 得手动关闭
}
// 结构化并发方式,生命周期自动管理
try (var scope = new StructuredTaskScope<String>()) {
var userTask = scope.fork(() -> fetchUserInfo()); // 提交任务
var orderTask = scope.fork(() -> fetchOrderInfo()); // 提交任务
scope.join(); // 等待所有任务完成
String userInfo = userTask.get(); // 获取结果
String orderInfo = orderTask.get(); // 获取结果
} // 自动关闭scope,取消未完成的任务
结构化并发让代码更清晰,资源管理也更安全。
StructuredTaskScope API
JEP 480的核心API是StructuredTaskScope,它提供了管理并发任务的框架。主要方法包括:
fork(Callable<T> task):提交一个任务并发执行,返回Subtaskjoin():等待所有提交的任务完成joinUntil(Instant deadline):等待任务完成,直到指定的时间shutdown():关闭scope,取消所有运行中的任务close():关闭scope,释放资源
// 基本用法
try (var scope = new StructuredTaskScope<String>()) {
// 提交多个任务
var task1 = scope.fork(() -> {
Thread.sleep(1000);
return "Task 1 result";
});
var task2 = scope.fork(() -> {
Thread.sleep(500);
return "Task 2 result";
});
// 等待所有任务完成
scope.join();
// 获取结果
String result1 = task1.get();
String result2 = task2.get();
println("Result 1: " + result1);
println("Result 2: " + result2);
} // 自动关闭,取消未完成的任务
错误处理策略
结构化并发提供了两种预定义的错误处理策略:ShutdownOnFailure和ShutdownOnSuccess。
ShutdownOnFailure:遇到失败就停止
ShutdownOnFailure策略是:如果任何一个子任务失败,就立即关闭scope,取消所有其他任务。这适合需要所有任务都成功的场景。
// 使用ShutdownOnFailure策略
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
// 模拟可能失败的任务
if (Math.random() < 0.5) {
throw new RuntimeException("Task 1 failed");
}
return "Task 1 success";
});
var task2 = scope.fork(() -> {
Thread.sleep(1000);
return "Task 2 success";
});
scope.join(); // 等待所有任务完成或第一个失败
// 检查是否有失败
scope.throwIfFailed(); // 如果有失败,抛出异常
// 获取结果
String result1 = task1.get();
String result2 = task2.get();
println("All tasks succeeded");
} catch (Exception e) {
println("Some task failed: " + e.getMessage());
}
如果task1失败了,scope会立即关闭,task2会被取消,不会继续执行。
ShutdownOnSuccess:第一个成功就停止
ShutdownOnSuccess策略是:如果任何一个子任务成功,就立即关闭scope,取消所有其他任务。这适合只需要一个任务成功的场景,比如多个服务提供相同功能,只要有一个成功就行。
// 使用ShutdownOnSuccess策略
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// 尝试从多个服务获取数据,只要有一个成功就行
var task1 = scope.fork(() -> {
Thread.sleep(2000); // 慢服务
return fetchFromService1();
});
var task2 = scope.fork(() -> {
Thread.sleep(500); // 快服务
return fetchFromService2();
});
var task3 = scope.fork(() -> {
Thread.sleep(1000);
return fetchFromService3();
});
scope.join(); // 等待第一个成功或全部失败
// 获取第一个成功的结果
String result = scope.result(); // 如果全部失败,会抛异常
println("Got result from one service: " + result);
} catch (Exception e) {
println("All services failed: " + e.getMessage());
}
如果task2先成功,scope会立即关闭,task1和task3会被取消,节省资源。
自定义错误处理
除了预定义的策略,也可以自定义错误处理逻辑:
// 自定义错误处理
try (var scope = new StructuredTaskScope<String>()) {
var task1 = scope.fork(() -> fetchUserInfo());
var task2 = scope.fork(() -> fetchOrderInfo());
var task3 = scope.fork(() -> fetchInventoryInfo());
scope.join(); // 等待所有任务完成
// 自定义处理逻辑
List<String> results = new ArrayList<>();
List<Exception> errors = new ArrayList<>();
for (var task : List.of(task1, task2, task3)) {
try {
if (task.state() == Subtask.State.SUCCESS) {
results.add(task.get()); // 获取成功的结果
} else if (task.state() == Subtask.State.FAILED) {
errors.add(task.exception()); // 收集错误
}
} catch (Exception e) {
errors.add(e);
}
}
// 根据业务逻辑处理结果和错误
if (results.size() >= 2) {
// 至少2个成功,可以继续
processResults(results);
} else {
// 失败太多,抛出异常
throw new RuntimeException("Too many failures", errors.get(0));
}
}
与虚拟线程结合
结构化并发特别适合和虚拟线程(JEP 444)结合使用。虚拟线程是轻量级线程,可以在一个操作系统线程上运行大量虚拟线程,性能好,资源占用少。
虚拟线程的优势
虚拟线程的优势:
- 轻量级:创建和销毁成本低,可以创建大量虚拟线程
- 阻塞友好:阻塞操作不会占用操作系统线程,可以高效处理大量并发
- 自动调度:由JVM自动调度,不需要手动管理线程池
// 使用虚拟线程和结构化并发
try (var scope = new StructuredTaskScope<String>()) {
// 每个任务在虚拟线程上运行
var tasks = new ArrayList<Subtask<String>>();
// 提交大量任务,每个任务在虚拟线程上运行
for (int i = 0; i < 1000; i++) {
final int index = i;
var task = scope.fork(() -> {
// 这个任务在虚拟线程上运行
Thread.sleep(100); // 阻塞操作,不会占用OS线程
return "Result " + index;
});
tasks.add(task);
}
scope.join(); // 等待所有任务完成
// 处理结果
for (var task : tasks) {
if (task.state() == Subtask.State.SUCCESS) {
println(task.get());
}
}
}
这样可以在一个OS线程上并发执行1000个任务,性能很好。
服务器应用场景
在服务器应用里,结构化并发和虚拟线程配合,可以高效处理大量并发请求:
// 服务器应用示例
void handleRequest(Request request) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发获取用户相关的各种信息
var userTask = scope.fork(() -> userService.getUser(request.getUserId()));
var orderTask = scope.fork(() -> orderService.getOrders(request.getUserId()));
var profileTask = scope.fork(() -> profileService.getProfile(request.getUserId()));
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 如果有失败,抛出异常
// 合并结果
User user = userTask.get();
List<Order> orders = orderTask.get();
Profile profile = profileTask.get();
return new Response(user, orders, profile);
} catch (Exception e) {
// 统一错误处理
return new ErrorResponse(e.getMessage());
}
}
每个请求可以分配一个虚拟线程,在需要并发执行子任务时,为每个子任务分配新的虚拟线程,高效处理大量并发请求。
实际应用场景
场景1:并发调用多个服务
最常见的场景就是并发调用多个服务,然后合并结果:
// 并发调用多个服务
public UserDashboard getDashboard(String userId) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发获取各种信息
var userTask = scope.fork(() -> userService.getUser(userId));
var ordersTask = scope.fork(() -> orderService.getRecentOrders(userId));
var notificationsTask = scope.fork(() -> notificationService.getUnreadCount(userId));
var preferencesTask = scope.fork(() -> preferenceService.getPreferences(userId));
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查是否有失败
// 合并结果
return new UserDashboard(
userTask.get(),
ordersTask.get(),
notificationsTask.get(),
preferencesTask.get()
);
} catch (Exception e) {
// 统一错误处理
throw new DashboardException("Failed to load dashboard", e);
}
}
这样写代码清晰,错误处理也简单。
场景2:快速失败策略
有些场景需要快速失败,只要有一个任务失败就立即停止:
// 快速失败策略
public ValidationResult validateData(Data data) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并发执行多个验证规则
var rule1Task = scope.fork(() -> validateRule1(data));
var rule2Task = scope.fork(() -> validateRule2(data));
var rule3Task = scope.fork(() -> validateRule3(data));
var rule4Task = scope.fork(() -> validateRule4(data));
scope.join();
scope.throwIfFailed(); // 如果有任何验证失败,立即抛出异常
// 所有验证都通过
return new ValidationResult(true, "All validations passed");
} catch (Exception e) {
return new ValidationResult(false, e.getMessage());
}
}
如果任何一个验证规则失败,其他验证会被立即取消,节省资源。
场景3:第一个成功策略
有些场景只需要一个任务成功就行,比如从多个服务获取相同的数据:
// 第一个成功策略
public String fetchData(String key) {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// 尝试从多个数据源获取数据
scope.fork(() -> cacheService.get(key)); // 最快,但可能没有
scope.fork(() -> databaseService.get(key)); // 较慢,但可靠
scope.fork(() -> remoteService.get(key)); // 最慢,作为备份
scope.join(); // 等待第一个成功
return scope.result(); // 返回第一个成功的结果
} catch (Exception e) {
// 所有数据源都失败
throw new DataFetchException("All data sources failed", e);
}
}
如果cacheService先返回结果,其他服务会被取消,节省资源。
场景4:超时控制
可以结合超时控制,避免任务执行时间过长:
// 超时控制
public Result processWithTimeout(List<Task> tasks, Duration timeout) {
try (var scope = new StructuredTaskScope<Result>()) {
// 提交所有任务
var subtasks = tasks.stream()
.map(task -> scope.fork(() -> executeTask(task)))
.toList();
// 等待任务完成,但有超时限制
Instant deadline = Instant.now().plus(timeout);
scope.joinUntil(deadline);
// 检查哪些任务完成了
List<Result> results = new ArrayList<>();
for (var subtask : subtasks) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
} else if (subtask.state() == Subtask.State.FAILED) {
println("Task failed: " + subtask.exception().getMessage());
} else {
println("Task cancelled or still running");
}
}
return new Result(results);
}
}
如果超时了,还在运行的任务会被取消。
与传统方式的对比
代码复杂度对比
传统方式用ExecutorService,代码比较复杂:
// 传统方式
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> futures = new ArrayList<>();
try {
futures.add(executor.submit(() -> fetchData1()));
futures.add(executor.submit(() -> fetchData2()));
futures.add(executor.submit(() -> fetchData3()));
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
try {
results.add(future.get()); // 可能阻塞
} catch (ExecutionException e) {
// 错误处理复杂
executor.shutdownNow(); // 得手动取消其他任务
throw new RuntimeException(e.getCause());
}
}
return results;
} finally {
executor.shutdown(); // 得手动关闭
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
结构化并发方式,代码简洁多了:
// 结构化并发方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> fetchData1());
var task2 = scope.fork(() -> fetchData2());
var task3 = scope.fork(() -> fetchData3());
scope.join();
scope.throwIfFailed();
return List.of(task1.get(), task2.get(), task3.get());
}
代码量少了很多,逻辑也更清晰。
错误处理对比
传统方式的错误处理比较复杂,得手动管理:
// 传统方式,错误处理复杂
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> futures = new ArrayList<>();
boolean hasError = false;
try {
futures.add(executor.submit(() -> fetchData1()));
futures.add(executor.submit(() -> fetchData2()));
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
try {
results.add(future.get());
} catch (ExecutionException e) {
hasError = true;
// 得手动取消其他任务
for (Future<String> f : futures) {
f.cancel(true);
}
throw new RuntimeException(e.getCause());
}
}
return results;
} finally {
executor.shutdown();
// 复杂的关闭逻辑...
}
结构化并发方式,错误处理自动管理:
// 结构化并发方式,错误处理自动
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> fetchData1());
var task2 = scope.fork(() -> fetchData2());
scope.join();
scope.throwIfFailed(); // 自动处理错误,自动取消其他任务
return List.of(task1.get(), task2.get());
} // 自动关闭,自动取消未完成的任务
错误处理简单多了,资源管理也安全。
启用预览特性
JEP 480是预览特性,得先启用才能用:
# 编译时启用预览特性
javac --enable-preview --release 23 MyClass.java
# 运行时启用预览特性
java --enable-preview MyClass
Maven配置
如果用Maven,可以在pom.xml里配置:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<release>23</release>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
模块依赖
如果项目是模块化的,需要在module-info.java里声明依赖:
module com.example.myapp {
requires java.base;
// StructuredTaskScope在java.base模块里(JDK 23)
}
最佳实践
1. 使用try-with-resources
一定要用try-with-resources模式,确保scope自动关闭:
// 好的做法:使用try-with-resources
try (var scope = new StructuredTaskScope<String>()) {
var task = scope.fork(() -> doWork());
scope.join();
return task.get();
} // 自动关闭
// 不好的做法:手动管理
var scope = new StructuredTaskScope<String>();
try {
var task = scope.fork(() -> doWork());
scope.join();
return task.get();
} finally {
scope.close(); // 容易忘记
}
2. 选择合适的错误处理策略
根据业务需求选择合适的策略:
// 需要所有任务都成功
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// ...
}
// 只需要一个任务成功
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// ...
}
// 自定义错误处理
try (var scope = new StructuredTaskScope<String>()) {
// 自己处理错误
}
3. 配合虚拟线程使用
结构化并发和虚拟线程是绝配,配合使用效果更好:
// 配合虚拟线程使用
try (var scope = new StructuredTaskScope<String>()) {
// 每个任务在虚拟线程上运行
for (int i = 0; i < 100; i++) {
scope.fork(() -> doWork()); // 在虚拟线程上运行
}
scope.join();
}
4. 避免在scope外使用Subtask
Subtask只能在创建它的scope内使用,不要在scope外使用:
// 错误:在scope外使用Subtask
StructuredTaskScope.Subtask<String> task;
try (var scope = new StructuredTaskScope<String>()) {
task = scope.fork(() -> doWork());
scope.join();
}
String result = task.get(); // 错误!scope已经关闭了
// 正确:在scope内使用
try (var scope = new StructuredTaskScope<String>()) {
var task = scope.fork(() -> doWork());
scope.join();
String result = task.get(); // 正确
return result;
}
总结
JEP 480引入的结构化并发,确实让多线程编程变得更有条理了。特别是错误处理和资源管理,自动处理,不用再手动管理,代码也简洁多了。
鹏磊我觉得这个特性还是挺有用的,特别是那些需要并发执行多个任务的场景,用结构化并发代码清晰,错误处理也简单。配合虚拟线程用,性能也好,资源占用也少。
总的来说,结构化并发是Java并发编程的一个很好的补充,让并发编程变得更安全、更简单。虽然现在还是预览特性,但是未来应该会稳定下来,成为Java并发编程的常用工具。