兄弟们,鹏磊今天来聊聊结构化并发和作用域值这两个特性的组合使用,这玩意儿组合起来用,效果贼好。结构化并发让多线程编程更简单、更可靠,作用域值让上下文传递更安全、更高效,把它们组合起来,可以解决很多实际开发中的问题,比如并发任务共享用户上下文、请求追踪、错误处理、日志记录啥的。
结构化并发和作用域值都是 Java 22 的预览特性,它们可以很好地配合使用。结构化并发把一组相关的并发任务当成一个工作单元来管理,作用域值可以在作用域内共享不可变数据,组合起来就是:在结构化并发的任务中,子任务可以自动继承父作用域的作用域值,不用手动传参数,代码更简洁,逻辑更清晰。这玩意儿特别适合需要并发执行多个任务,而且这些任务需要共享上下文信息的场景,比如用户请求处理、数据聚合、批量操作啥的。
为什么需要组合使用
先说说单独使用的问题。虽然结构化并发和作用域值单独用都挺好,但组合起来用效果更好:
// 单独使用结构化并发:需要手动传参数
public String fetchUserData(String userId) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// 需要手动传 userId 给每个任务
var userInfoTask = scope.fork(() -> fetchUserInfo(userId)); // 传参数
var userOrdersTask = scope.fork(() -> fetchUserOrders(userId)); // 传参数
var userPreferencesTask = scope.fork(() -> fetchUserPreferences(userId)); // 传参数
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 获取结果
return combineResults(
userInfoTask.get(), // 获取结果
userOrdersTask.get(),
userPreferencesTask.get()
);
}
}
// 问题:如果参数很多,每个任务都要传,代码很啰嗦
// 问题:如果需要在任务中记录日志,还得传用户上下文
// 问题:如果需要在任务中追踪请求,还得传追踪ID
// 单独使用作用域值:无法利用并发优势
private static final ScopedValue<String> userId = ScopedValue.newInstance(); // 定义作用域值
public String fetchUserData(String id) {
ScopedValue.where(userId, id).run(() -> { // 绑定用户ID
// 只能串行执行,无法并发
String userInfo = fetchUserInfo(userId.get()); // 串行执行
String userOrders = fetchUserOrders(userId.get()); // 串行执行
String userPreferences = fetchUserPreferences(userId.get()); // 串行执行
return combineResults(userInfo, userOrders, userPreferences); // 组合结果
});
}
// 问题:无法并发执行,性能差
// 问题:如果任务很多,执行时间会很长
组合使用的好处是,既可以利用结构化并发提高性能,又可以利用作用域值自动传递上下文,代码更简洁,逻辑更清晰。
基本组合用法
在结构化并发中使用作用域值
最基础的用法就是在结构化并发的任务中使用作用域值:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义作用域值,用于存储用户ID
private static final ScopedValue<String> currentUserId = ScopedValue.newInstance(); // 创建作用域值
// 在结构化并发中使用作用域值
public String fetchUserData(String userId) throws InterruptedException {
// 绑定用户ID到作用域值
return ScopedValue.where(currentUserId, userId).call(() -> { // 绑定用户ID,然后执行
// 在结构化并发作用域内,子任务可以自动继承作用域值
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建结构化任务作用域
// Fork 子任务,这些任务会自动继承 currentUserId 的值
var userInfoTask = scope.fork(() -> fetchUserInfo()); // 不需要传参数,直接使用作用域值
var userOrdersTask = scope.fork(() -> fetchUserOrders()); // 不需要传参数
var userPreferencesTask = scope.fork(() -> fetchUserPreferences()); // 不需要传参数
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 获取结果
return combineResults(
userInfoTask.get(), // 获取用户信息
userOrdersTask.get(), // 获取订单信息
userPreferencesTask.get() // 获取偏好设置
);
}
});
}
// 子任务方法,直接使用作用域值,不需要参数
String fetchUserInfo() {
String userId = currentUserId.get(); // 获取用户ID
// 获取用户信息...
return "UserInfo for " + userId; // 返回用户信息
}
String fetchUserOrders() {
String userId = currentUserId.get(); // 获取用户ID
// 获取订单信息...
return "Orders for " + userId; // 返回订单信息
}
String fetchUserPreferences() {
String userId = currentUserId.get(); // 获取用户ID
// 获取偏好设置...
return "Preferences for " + userId; // 返回偏好设置
}
这玩意儿的好处是,子任务可以自动继承父作用域的作用域值,不用手动传参数,代码更简洁,而且作用域值是不可变的,保证了数据安全。
多值组合使用
可以定义多个作用域值,在结构化并发中组合使用:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义多个作用域值
private static final ScopedValue<String> username = ScopedValue.newInstance(); // 用户名
private static final ScopedValue<String> traceId = ScopedValue.newInstance(); // 追踪ID
private static final ScopedValue<String> requestId = ScopedValue.newInstance(); // 请求ID
// 在结构化并发中组合使用多个作用域值
public void processRequest(String user, String reqId) throws InterruptedException {
// 生成追踪ID
String trace = "trace-" + System.currentTimeMillis(); // 生成追踪ID
// 绑定多个作用域值
ScopedValue.where(username, user) // 绑定用户名
.where(traceId, trace) // 绑定追踪ID
.where(requestId, reqId) // 绑定请求ID
.run(() -> { // 执行代码块
// 在结构化并发作用域内,子任务可以自动继承所有作用域值
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务,这些任务会自动继承所有作用域值
var validateTask = scope.fork(() -> validateRequest()); // 验证请求
var processTask = scope.fork(() -> processData()); // 处理数据
var logTask = scope.fork(() -> logActivity()); // 记录活动
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 所有任务都完成了
System.out.println("请求处理完成"); // 输出完成信息
}
});
}
// 验证请求的方法
String validateRequest() {
String user = username.get(); // 获取用户名
String trace = traceId.get(); // 获取追踪ID
String req = requestId.get(); // 获取请求ID
// 记录日志,使用作用域值
System.out.println(STR."验证请求: 用户=\{user}, 追踪=\{trace}, 请求=\{req}"); // 记录日志
return "validated"; // 返回验证结果
}
// 处理数据的方法
String processData() {
String user = username.get(); // 获取用户名
String trace = traceId.get(); // 获取追踪ID
// 记录日志
System.out.println(STR."处理数据: 用户=\{user}, 追踪=\{trace}"); // 记录日志
return "processed"; // 返回处理结果
}
// 记录活动的方法
String logActivity() {
String user = username.get(); // 获取用户名
String trace = traceId.get(); // 获取追踪ID
String req = requestId.get(); // 获取请求ID
// 记录日志
System.out.println(STR."记录活动: 用户=\{user}, 追踪=\{trace}, 请求=\{req}"); // 记录日志
return "logged"; // 返回记录结果
}
多值组合的好处是,可以同时传递多个上下文信息,在并发任务中灵活使用,生成各种格式的日志和消息。
实际应用场景
场景1:并发用户数据聚合
最常见的场景就是并发获取用户的各种数据,然后聚合:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义用户信息记录
record UserInfo(String userId, String username, String role) {} // 用户信息记录
// 定义作用域值
private static final ScopedValue<UserInfo> userContext = ScopedValue.newInstance(); // 用户上下文
// 并发获取用户数据并聚合
public UserData aggregateUserData(UserInfo user) throws InterruptedException {
// 绑定用户上下文
return ScopedValue.where(userContext, user).call(() -> { // 绑定用户信息
// 在结构化并发作用域内,并发获取各种数据
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务,并发获取数据
var profileTask = scope.fork(() -> fetchUserProfile()); // 获取用户资料
var ordersTask = scope.fork(() -> fetchUserOrders()); // 获取用户订单
var preferencesTask = scope.fork(() -> fetchUserPreferences()); // 获取用户偏好
var activityTask = scope.fork(() -> fetchUserActivity()); // 获取用户活动
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 获取所有结果
return new UserData(
profileTask.get(), // 用户资料
ordersTask.get(), // 用户订单
preferencesTask.get(), // 用户偏好
activityTask.get() // 用户活动
);
}
});
}
// 获取用户资料的方法
String fetchUserProfile() {
UserInfo user = userContext.get(); // 获取用户信息
// 记录日志,使用作用域值
System.out.println(STR."获取用户资料: \{user.username()} (ID: \{user.userId()})"); // 记录日志
// 实际获取逻辑...
return "Profile for " + user.username(); // 返回用户资料
}
// 获取用户订单的方法
String fetchUserOrders() {
UserInfo user = userContext.get(); // 获取用户信息
System.out.println(STR."获取用户订单: \{user.username()}"); // 记录日志
// 实际获取逻辑...
return "Orders for " + user.username(); // 返回用户订单
}
// 获取用户偏好的方法
String fetchUserPreferences() {
UserInfo user = userContext.get(); // 获取用户信息
System.out.println(STR."获取用户偏好: \{user.username()}"); // 记录日志
// 实际获取逻辑...
return "Preferences for " + user.username(); // 返回用户偏好
}
// 获取用户活动的方法
String fetchUserActivity() {
UserInfo user = userContext.get(); // 获取用户信息
System.out.println(STR."获取用户活动: \{user.username()}"); // 记录日志
// 实际获取逻辑...
return "Activity for " + user.username(); // 返回用户活动
}
并发用户数据聚合的好处是,可以并发获取各种数据,提高性能,而且所有任务都能自动访问用户上下文,不用手动传参数。
场景2:并发请求处理与追踪
另一个常见场景是并发处理多个请求,每个请求都有追踪信息:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义作用域值
private static final ScopedValue<String> traceId = ScopedValue.newInstance(); // 追踪ID
private static final ScopedValue<String> requestId = ScopedValue.newInstance(); // 请求ID
private static final ScopedValue<String> userId = ScopedValue.newInstance(); // 用户ID
// 并发处理请求
public void processConcurrentRequests(String user, String reqId) throws InterruptedException {
// 生成追踪ID
String trace = "trace-" + System.currentTimeMillis(); // 生成追踪ID
// 绑定作用域值
ScopedValue.where(traceId, trace) // 绑定追踪ID
.where(requestId, reqId) // 绑定请求ID
.where(userId, user) // 绑定用户ID
.run(() -> { // 执行代码块
// 在结构化并发作用域内,并发处理多个步骤
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// Fork 多个子任务,并发处理
var validateTask = scope.fork(() -> validateRequest()); // 验证请求
var authorizeTask = scope.fork(() -> authorizeUser()); // 授权用户
var processTask = scope.fork(() -> processRequest()); // 处理请求
var logTask = scope.fork(() -> logRequest()); // 记录请求
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 所有任务都完成了
System.out.println("请求处理完成"); // 输出完成信息
}
});
}
// 验证请求的方法
String validateRequest() {
String trace = traceId.get(); // 获取追踪ID
String req = requestId.get(); // 获取请求ID
// 记录日志
System.out.println(STR."验证请求: 追踪=\{trace}, 请求=\{req}"); // 记录日志
// 验证逻辑...
return "validated"; // 返回验证结果
}
// 授权用户的方法
String authorizeUser() {
String trace = traceId.get(); // 获取追踪ID
String user = userId.get(); // 获取用户ID
// 记录日志
System.out.println(STR."授权用户: 追踪=\{trace}, 用户=\{user}"); // 记录日志
// 授权逻辑...
return "authorized"; // 返回授权结果
}
// 处理请求的方法
String processRequest() {
String trace = traceId.get(); // 获取追踪ID
String req = requestId.get(); // 获取请求ID
// 记录日志
System.out.println(STR."处理请求: 追踪=\{trace}, 请求=\{req}"); // 记录日志
// 处理逻辑...
return "processed"; // 返回处理结果
}
// 记录请求的方法
String logRequest() {
String trace = traceId.get(); // 获取追踪ID
String req = requestId.get(); // 获取请求ID
String user = userId.get(); // 获取用户ID
// 记录日志
System.out.println(STR."记录请求: 追踪=\{trace}, 请求=\{req}, 用户=\{user}"); // 记录日志
return "logged"; // 返回记录结果
}
并发请求处理与追踪的好处是,可以并发处理多个步骤,提高性能,而且所有任务都能自动访问追踪信息,方便日志追踪和调试。
场景3:错误处理与日志记录
错误处理和日志记录也是常见场景,可以在并发任务中统一处理:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义作用域值
private static final ScopedValue<String> traceId = ScopedValue.newInstance(); // 追踪ID
private static final ScopedValue<String> operation = ScopedValue.newInstance(); // 操作
// 并发执行操作,统一错误处理和日志记录
public void executeOperation(String op) throws InterruptedException {
// 生成追踪ID
String trace = "trace-" + System.currentTimeMillis(); // 生成追踪ID
// 绑定作用域值
ScopedValue.where(traceId, trace) // 绑定追踪ID
.where(operation, op) // 绑定操作
.run(() -> { // 执行代码块
// 在结构化并发作用域内,并发执行多个步骤
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域,任何一个失败就关闭
// Fork 多个子任务
var step1Task = scope.fork(() -> executeStep1()); // 执行步骤1
var step2Task = scope.fork(() -> executeStep2()); // 执行步骤2
var step3Task = scope.fork(() -> executeStep3()); // 执行步骤3
scope.join(); // 等待所有任务完成
scope.throwIfFailed(e -> { // 检查失败,统一处理
// 记录错误日志
logError("操作失败", e); // 记录错误
return new OperationException("操作失败", e); // 抛出业务异常
});
// 所有任务都成功了
logInfo("操作完成"); // 记录成功日志
}
});
}
// 执行步骤1的方法
String executeStep1() {
String trace = traceId.get(); // 获取追踪ID
String op = operation.get(); // 获取操作
// 记录日志
logInfo(STR."执行步骤1: 操作=\{op}, 追踪=\{trace}"); // 记录日志
// 执行逻辑...
return "step1 completed"; // 返回结果
}
// 执行步骤2的方法
String executeStep2() {
String trace = traceId.get(); // 获取追踪ID
String op = operation.get(); // 获取操作
// 记录日志
logInfo(STR."执行步骤2: 操作=\{op}, 追踪=\{trace}"); // 记录日志
// 执行逻辑...
return "step2 completed"; // 返回结果
}
// 执行步骤3的方法
String executeStep3() {
String trace = traceId.get(); // 获取追踪ID
String op = operation.get(); // 获取操作
// 记录日志
logInfo(STR."执行步骤3: 操作=\{op}, 追踪=\{trace}"); // 记录日志
// 执行逻辑...
return "step3 completed"; // 返回结果
}
// 记录信息日志的方法
void logInfo(String message) {
String trace = traceId.get(); // 获取追踪ID
System.out.println(STR."[INFO] 追踪=\{trace}: \{message}"); // 输出日志
}
// 记录错误日志的方法
void logError(String message, Throwable error) {
String trace = traceId.get(); // 获取追踪ID
System.err.println(STR."[ERROR] 追踪=\{trace}: \{message} - \{error.getMessage()}"); // 输出错误日志
}
错误处理与日志记录的好处是,可以统一处理错误和日志,所有任务都能自动访问追踪信息,方便调试和排查问题。
高级用法
嵌套结构化并发
可以嵌套使用结构化并发,在不同层级传递不同的上下文:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
// 定义作用域值
private static final ScopedValue<String> requestId = ScopedValue.newInstance(); // 请求ID
private static final ScopedValue<String> operationId = ScopedValue.newInstance(); // 操作ID
private static final ScopedValue<String> stepId = ScopedValue.newInstance(); // 步骤ID
// 嵌套结构化并发
public void handleNestedRequests(String reqId) throws InterruptedException {
// 外层作用域:请求级别
ScopedValue.where(requestId, reqId).run(() -> { // 绑定请求ID
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建外层作用域
// Fork 多个操作任务
var op1Task = scope.fork(() -> handleOperation("op-001")); // 处理操作1
var op2Task = scope.fork(() -> handleOperation("op-002")); // 处理操作2
scope.join(); // 等待所有操作完成
scope.throwIfFailed(); // 检查失败
}
});
}
// 处理操作的方法
String handleOperation(String opId) throws InterruptedException {
// 内层作用域:操作级别
return ScopedValue.where(operationId, opId).call(() -> { // 绑定操作ID
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建内层作用域
// Fork 多个步骤任务
var step1Task = scope.fork(() -> executeStep("step-001")); // 执行步骤1
var step2Task = scope.fork(() -> executeStep("step-002")); // 执行步骤2
scope.join(); // 等待所有步骤完成
scope.throwIfFailed(); // 检查失败
// 组合步骤结果
return STR."操作=\{opId}: \{step1Task.get()}, \{step2Task.get()}"; // 组合结果
}
});
}
// 执行步骤的方法
String executeStep(String stepId) {
// 更内层作用域:步骤级别
return ScopedValue.where(ScopedConcurrencyExample.stepId, stepId).call(() -> { // 绑定步骤ID
String req = requestId.get(); // 获取请求ID
String op = operationId.get(); // 获取操作ID
String step = ScopedConcurrencyExample.stepId.get(); // 获取步骤ID
// 记录日志,包含所有层级信息
System.out.println(STR."执行步骤: 请求=\{req}, 操作=\{op}, 步骤=\{step}"); // 记录日志
return "completed"; // 返回结果
});
}
嵌套结构化并发的好处是,可以在不同层级传递不同的上下文,在日志中灵活组合,生成更详细的追踪信息。
与虚拟线程结合
结构化并发和作用域值特别适合与虚拟线程结合使用:
import java.util.concurrent.StructuredTaskScope; // 导入结构化任务作用域类
import java.util.concurrent.ScopedValue; // 导入作用域值类
import java.util.concurrent.ThreadFactory; // 导入线程工厂接口
// 定义作用域值
private static final ScopedValue<String> userId = ScopedValue.newInstance(); // 用户ID
// 使用虚拟线程执行结构化并发任务
public void processWithVirtualThreads(String user) throws InterruptedException {
// 创建虚拟线程工厂
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory(); // 创建虚拟线程工厂
// 绑定用户ID
ScopedValue.where(userId, user).run(() -> { // 绑定用户ID
// 使用虚拟线程执行结构化并发任务
try (var scope = new StructuredTaskScope<>("VirtualThreadScope", virtualThreadFactory)) { // 创建作用域,使用虚拟线程
// Fork 多个子任务,这些任务会在虚拟线程中执行
var task1 = scope.fork(() -> performTask("Task1")); // 执行任务1
var task2 = scope.fork(() -> performTask("Task2")); // 执行任务2
var task3 = scope.fork(() -> performTask("Task3")); // 执行任务3
scope.join(); // 等待所有任务完成
scope.throwIfFailed(); // 检查失败
// 所有任务都完成了
System.out.println("所有任务完成"); // 输出完成信息
}
});
}
// 执行任务的方法
String performTask(String taskName) {
String user = userId.get(); // 获取用户ID
System.out.println(STR."执行任务: \{taskName}, 用户: \{user}"); // 记录日志
// 执行逻辑...
return taskName + " completed"; // 返回结果
}
与虚拟线程结合的好处是,可以创建大量轻量级线程,每个线程都能自动继承作用域值,性能好,资源占用少。
注意事项和最佳实践
1. 作用域值要在作用域内使用
作用域值只能在绑定的作用域内使用,在结构化并发的子任务中会自动继承:
// 正确示例:在结构化并发中使用作用域值
ScopedValue.where(userId, "user123").run(() -> { // 绑定用户ID
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 创建作用域
// 子任务可以自动继承 userId 的值
var task = scope.fork(() -> {
String user = userId.get(); // 正确!可以获取值
return "Task for " + user; // 返回结果
});
scope.join(); // 等待完成
}
});
// 错误示例:在作用域外使用
// String user = userId.get(); // 错误!会抛出异常
2. 结构化并发是预览特性
结构化并发是预览特性,需要启用预览特性:
# 编译时启用预览特性
javac --enable-preview --release 22 Main.java
# 运行时启用预览特性
java --enable-preview Main
3. 作用域值是不可变的
作用域值一旦绑定就不能修改,这保证了数据安全:
// 作用域值是不可变的
ScopedValue.where(userId, "user123").run(() -> { // 绑定用户ID
// 不能修改绑定的值
// userId.set("user456"); // 错误!没有 set 方法
// 可以创建新的作用域,绑定新值
ScopedValue.where(userId, "user456").run(() -> { // 新作用域,新值
String user = userId.get(); // 使用新值
});
});
4. 性能考虑
结构化并发和作用域值的性能都很好,特别适合与虚拟线程结合使用:
// 性能考虑:
// - 结构化并发自动管理线程,性能好
// - 作用域值读取很快,跟本地变量差不多
// - 虚拟线程轻量级,可以创建大量线程
// - 组合使用性能最优
总结
结构化并发和作用域值组合使用,可以解决很多实际开发中的问题。结构化并发让多线程编程更简单、更可靠,作用域值让上下文传递更安全、更高效,组合起来就是:在结构化并发的任务中,子任务可以自动继承父作用域的作用域值,不用手动传参数,代码更简洁,逻辑更清晰。
这玩意儿特别适合需要并发执行多个任务,而且这些任务需要共享上下文信息的场景,比如用户请求处理、数据聚合、批量操作、错误处理、日志记录啥的。而且作用域值是不可变的,保证了数据安全,结构化并发自动管理线程,不用担心资源泄漏。
不过要注意,结构化并发和作用域值都是预览特性,需要启用预览特性才能用,而且可能会在未来的版本中变化。作用域值也只能在绑定的作用域内使用,否则会抛出异常。
好了,今天就聊到这里,兄弟们有啥问题可以在评论区留言,鹏磊看到会回复的。下次咱们聊聊 Java 22 的其他新特性,比如 Java 22 性能优化综合实践,这玩意儿也挺有意思的。