写多线程程序的时候,经常需要在并发任务间共享上下文数据,比如用户信息、请求ID、追踪ID这些,传统方式用ThreadLocal,但是ThreadLocal在结构化并发场景下有问题,数据可能泄漏,生命周期也不好管理。现在有了结构化并发和作用域值,可以完美配合,让并发编程更安全、更高效。
鹏磊我之前做服务器应用的时候,经常要并发处理多个请求,每个请求都有自己的上下文数据,传统方式用ThreadLocal,但是用结构化并发的时候,ThreadLocal的数据可能在线程间泄漏,而且生命周期不好控制。现在有了作用域值,可以在结构化并发的子任务间安全地共享数据,数据不可变,生命周期自动管理,配合使用效果特别好。
结构化并发负责管理任务的生命周期,作用域值负责在任务间共享数据,它们配合使用,可以让并发编程变得更安全、更简洁。今天咱就好好聊聊这两个特性怎么配合使用,看看它们组合起来能发挥多大威力。
协同使用的核心优势
为什么需要协同使用
结构化并发和作用域值各自解决了不同的问题:
- 结构化并发:管理任务的生命周期,确保任务不会泄漏
- 作用域值:在任务间共享不可变数据,避免ThreadLocal的问题
它们配合使用,可以同时解决任务管理和数据共享的问题。
协同使用的优势
协同使用有几个核心优势:
- 数据安全:作用域值不可变,生命周期受限,不会泄漏
- 自动管理:结构化并发自动管理任务生命周期,作用域值自动管理数据生命周期
- 性能优化:作用域值在虚拟线程场景下性能更好,配合结构化并发使用效果更好
- 代码简洁:不需要显式传递参数,不需要手动管理ThreadLocal
// 传统方式,用ThreadLocal和ExecutorService
private static final ThreadLocal<String> USER_ID = new ThreadLocal<>();
void handleRequests(List<Request> requests) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
for (Request request : requests) {
executor.submit(() -> {
USER_ID.set(request.getUserId()); // 设置ThreadLocal
try {
processRequest(request);
} finally {
USER_ID.remove(); // 得手动清理
}
});
}
} finally {
executor.shutdown(); // 得手动关闭
}
// ThreadLocal可能在线程池复用的时候泄漏
}
// 协同使用方式,用结构化并发和作用域值
private static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
void handleRequests(List<Request> requests) {
try (var scope = new StructuredTaskScope<Void>()) {
for (Request request : requests) {
scope.fork(() -> {
ScopedValue.runWhere(USER_ID, request.getUserId(), () -> {
processRequest(request); // 在作用域内,USER_ID可用
}); // 作用域结束,自动清理
});
}
scope.join(); // 等待所有任务完成
} // 自动关闭scope,取消未完成的任务
// 作用域值自动清理,不会泄漏
}
协同使用让代码更安全,资源管理也更简单。
基础协同模式
模式1:在结构化并发中共享用户上下文
最常见的场景就是在并发任务间共享用户上下文:
// 定义作用域值
private static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();
// 在结构化并发中使用作用域值
public void processUserRequest(String userId, Request request) {
RequestContext context = createContext(userId, request);
// 在作用域值的作用域内创建结构化并发scope
ScopedValue.runWhere(CONTEXT, context, () -> {
try (var scope = new StructuredTaskScope<UserData>()) {
// 在结构化并发的子任务中,CONTEXT自动可用
var userTask = scope.fork(() -> {
RequestContext ctx = CONTEXT.get(); // 子任务可以访问作用域值
return fetchUserInfo(ctx.getUserId());
});
var ordersTask = scope.fork(() -> {
RequestContext ctx = CONTEXT.get(); // 子任务可以访问作用域值
return fetchOrders(ctx.getUserId());
});
var profileTask = scope.fork(() -> {
RequestContext ctx = CONTEXT.get(); // 子任务可以访问作用域值
return fetchProfile(ctx.getUserId());
});
scope.join(); // 等待所有任务完成
// 合并结果
UserData data = new UserData(
userTask.get(),
ordersTask.get(),
profileTask.get()
);
return data;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}); // 作用域值自动清理
}
这样写代码简洁,数据在子任务间自动共享,不需要显式传递。
模式2:在结构化并发中共享追踪ID
在追踪场景,需要在调用链中传递追踪ID:
// 定义追踪ID的作用域值
private static final ScopedValue<String> TRACE_ID = ScopedValue.newInstance();
// 在结构化并发中使用追踪ID
public void handleRequest(Request request) {
String traceId = generateTraceId();
ScopedValue.runWhere(TRACE_ID, traceId, () -> {
log("Request started: " + traceId);
try (var scope = new StructuredTaskScope<Response>()) {
// 并发调用多个服务,追踪ID自动传递
var service1Task = scope.fork(() -> {
String id = TRACE_ID.get(); // 子任务可以访问追踪ID
log("Calling service1: " + id);
return callService1();
});
var service2Task = scope.fork(() -> {
String id = TRACE_ID.get(); // 子任务可以访问追踪ID
log("Calling service2: " + id);
return callService2();
});
scope.join();
Response response = combineResponses(
service1Task.get(),
service2Task.get()
);
log("Request completed: " + traceId);
return response;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}); // 追踪ID自动清理
}
void log(String message) {
String traceId = TRACE_ID.orElse("unknown");
System.out.println("[" + traceId + "] " + message);
}
追踪ID在调用链中自动传递,不需要显式传参。
模式3:在结构化并发中共享事务上下文
在事务管理场景,需要在并发操作间共享事务上下文:
// 定义事务上下文的作用域值
private static final ScopedValue<Transaction> TX = ScopedValue.newInstance();
// 在结构化并发中使用事务上下文
public <T> T executeInTransaction(Supplier<T> operation) {
Transaction tx = beginTransaction();
return ScopedValue.callWhere(TX, tx, () -> {
try (var scope = new StructuredTaskScope<Void>()) {
// 并发执行多个数据库操作,事务上下文自动共享
scope.fork(() -> {
Transaction t = TX.get(); // 子任务可以访问事务
t.execute("INSERT INTO users ...");
});
scope.fork(() -> {
Transaction t = TX.get(); // 子任务可以访问事务
t.execute("INSERT INTO orders ...");
});
scope.fork(() -> {
Transaction t = TX.get(); // 子任务可以访问事务
t.execute("UPDATE inventory ...");
});
scope.join(); // 等待所有操作完成
// 所有操作在同一个事务中
tx.commit();
return operation.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
tx.rollback();
throw new RuntimeException(e);
} catch (Exception e) {
tx.rollback();
throw e;
}
}); // 事务上下文自动清理
}
事务上下文在并发操作间自动共享,确保所有操作在同一个事务中。
高级协同技巧
技巧1:嵌套作用域值
可以在结构化并发中嵌套使用多个作用域值:
// 定义多个作用域值
private static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();
// 嵌套使用作用域值
public void processRequest(String userId, String requestId, Request request) {
RequestContext context = createContext(userId, requestId, request);
// 嵌套绑定多个作用域值
ScopedValue.runWhere(USER_ID, userId, () -> {
ScopedValue.runWhere(REQUEST_ID, requestId, () -> {
ScopedValue.runWhere(CONTEXT, context, () -> {
try (var scope = new StructuredTaskScope<Result>()) {
// 所有子任务都可以访问所有作用域值
var task1 = scope.fork(() -> {
String uid = USER_ID.get();
String rid = REQUEST_ID.get();
RequestContext ctx = CONTEXT.get();
return processTask1(uid, rid, ctx);
});
var task2 = scope.fork(() -> {
String uid = USER_ID.get();
String rid = REQUEST_ID.get();
RequestContext ctx = CONTEXT.get();
return processTask2(uid, rid, ctx);
});
scope.join();
return combineResults(task1.get(), task2.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
});
});
}
嵌套作用域值让数据共享更灵活,可以传递多个上下文数据。
技巧2:作用域值配合错误处理策略
可以配合结构化并发的错误处理策略使用作用域值:
// 配合ShutdownOnFailure使用作用域值
public Result processWithFailureHandling(String userId, Request request) {
ScopedValue.runWhere(USER_ID, userId, () -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
String uid = USER_ID.get(); // 子任务可以访问作用域值
return fetchData1(uid);
});
var task2 = scope.fork(() -> {
String uid = USER_ID.get(); // 子任务可以访问作用域值
return fetchData2(uid);
});
scope.join();
scope.throwIfFailed(); // 如果有失败,抛出异常
return new Result(task1.get(), task2.get());
} catch (Exception e) {
// 错误处理,作用域值仍然可用
String uid = USER_ID.get();
logError(uid, e);
throw e;
}
});
}
作用域值在错误处理时仍然可用,可以用于日志记录等操作。
技巧3:作用域值配合虚拟线程
作用域值特别适合和虚拟线程配合使用:
// 作用域值配合虚拟线程
public void processManyRequests(List<Request> requests) {
try (var scope = new StructuredTaskScope<Void>()) {
for (Request request : requests) {
scope.fork(() -> {
// 每个任务在虚拟线程上运行
ScopedValue.runWhere(USER_ID, request.getUserId(), () -> {
processRequest(request); // 在虚拟线程上,作用域值可用
});
});
}
scope.join(); // 等待所有任务完成
}
// 作用域值在虚拟线程场景下性能更好
}
作用域值在虚拟线程场景下性能更好,配合结构化并发使用效果更好。
技巧4:作用域值在递归任务中的传递
作用域值可以在递归任务中自动传递:
// 作用域值在递归任务中的传递
private static final ScopedValue<String> TRACE_ID = ScopedValue.newInstance();
public void processRecursive(String traceId, List<Task> tasks) {
ScopedValue.runWhere(TRACE_ID, traceId, () -> {
try (var scope = new StructuredTaskScope<Void>()) {
for (Task task : tasks) {
scope.fork(() -> {
String id = TRACE_ID.get(); // 子任务可以访问追踪ID
log("Processing task: " + id);
if (task.hasSubtasks()) {
// 递归处理子任务,追踪ID自动传递
processRecursive(id, task.getSubtasks());
}
});
}
scope.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
作用域值在递归任务中自动传递,不需要显式传参。
实际应用场景
场景1:Web服务器的请求处理
在Web服务器中,需要并发处理多个请求,每个请求都有自己的上下文:
// Web服务器的请求处理
public class WebServer {
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
private static final ScopedValue<User> CURRENT_USER = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();
public void handleRequest(HttpRequest request) {
String requestId = generateRequestId();
User user = authenticate(request);
RequestContext context = createContext(requestId, user, request);
ScopedValue.runWhere(REQUEST_ID, requestId, () -> {
ScopedValue.runWhere(CURRENT_USER, user, () -> {
ScopedValue.runWhere(CONTEXT, context, () -> {
try (var scope = new StructuredTaskScope<Response>()) {
// 并发处理请求的各个部分
var authTask = scope.fork(() -> {
User u = CURRENT_USER.get();
return validateAuth(u);
});
var dataTask = scope.fork(() -> {
String rid = REQUEST_ID.get();
RequestContext ctx = CONTEXT.get();
return fetchData(rid, ctx);
});
var cacheTask = scope.fork(() -> {
String rid = REQUEST_ID.get();
return checkCache(rid);
});
scope.join();
Response response = buildResponse(
authTask.get(),
dataTask.get(),
cacheTask.get()
);
return response;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
});
});
}
}
这样处理请求代码简洁,上下文数据自动传递。
场景2:微服务的调用链追踪
在微服务架构中,需要在调用链中传递追踪信息:
// 微服务的调用链追踪
public class MicroserviceClient {
private static final ScopedValue<String> TRACE_ID = ScopedValue.newInstance();
private static final ScopedValue<String> SPAN_ID = ScopedValue.newInstance();
public Response callServices(String traceId, Request request) {
ScopedValue.runWhere(TRACE_ID, traceId, () -> {
try (var scope = new StructuredTaskScope<ServiceResponse>()) {
// 并发调用多个微服务,追踪信息自动传递
var service1Task = scope.fork(() -> {
String tid = TRACE_ID.get();
String sid = generateSpanId();
return ScopedValue.callWhere(SPAN_ID, sid, () -> {
return callService1(tid, sid);
});
});
var service2Task = scope.fork(() -> {
String tid = TRACE_ID.get();
String sid = generateSpanId();
return ScopedValue.callWhere(SPAN_ID, sid, () -> {
return callService2(tid, sid);
});
});
scope.join();
return combineResponses(
service1Task.get(),
service2Task.get()
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
}
追踪信息在调用链中自动传递,不需要显式传参。
场景3:批处理任务的数据共享
在批处理任务中,需要在并发处理的数据项间共享配置:
// 批处理任务的数据共享
public class BatchProcessor {
private static final ScopedValue<BatchConfig> CONFIG = ScopedValue.newInstance();
private static final ScopedValue<String> BATCH_ID = ScopedValue.newInstance();
public void processBatch(String batchId, BatchConfig config, List<DataItem> items) {
ScopedValue.runWhere(BATCH_ID, batchId, () -> {
ScopedValue.runWhere(CONFIG, config, () -> {
try (var scope = new StructuredTaskScope<Void>()) {
for (DataItem item : items) {
scope.fork(() -> {
String bid = BATCH_ID.get();
BatchConfig cfg = CONFIG.get();
processItem(bid, cfg, item); // 配置自动共享
});
}
scope.join(); // 等待所有项处理完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
});
}
}
配置在并发处理的数据项间自动共享,不需要显式传递。
最佳实践
1. 作用域值应该在结构化并发之前绑定
作用域值应该在创建结构化并发scope之前绑定:
// 好的做法:先绑定作用域值,再创建scope
ScopedValue.runWhere(USER_ID, userId, () -> {
try (var scope = new StructuredTaskScope<Void>()) {
// scope内的任务可以访问USER_ID
}
});
// 不好的做法:在scope内绑定作用域值
try (var scope = new StructuredTaskScope<Void>()) {
scope.fork(() -> {
ScopedValue.runWhere(USER_ID, userId, () -> {
// 这样也可以,但是不够清晰
});
});
}
先绑定作用域值,再创建scope,代码更清晰。
2. 使用有意义的变量名
作用域值应该使用有意义的变量名:
// 好的做法:使用有意义的变量名
private static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> REQUEST_CONTEXT = ScopedValue.newInstance();
// 不好的做法:使用无意义的变量名
private static final ScopedValue<String> SV1 = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> SV2 = ScopedValue.newInstance();
有意义的变量名让代码更易读。
3. 避免在作用域值中存储可变对象
作用域值应该是不可变的,避免存储可变对象:
// 好的做法:存储不可变对象
private static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
private static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();
// 不好的做法:存储可变对象
private static final ScopedValue<StringBuilder> BUFFER = ScopedValue.newInstance();
存储不可变对象确保数据安全。
4. 配合错误处理策略使用
配合结构化并发的错误处理策略使用作用域值:
// 好的做法:配合错误处理策略
ScopedValue.runWhere(USER_ID, userId, () -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 使用错误处理策略
}
});
配合错误处理策略使用,代码更健壮。
性能考虑
作用域值在虚拟线程场景下的性能
作用域值在虚拟线程场景下性能更好,配合结构化并发使用效果更好:
// 作用域值在虚拟线程场景下性能更好
public void processManyRequests(List<Request> requests) {
try (var scope = new StructuredTaskScope<Void>()) {
for (Request request : requests) {
scope.fork(() -> {
// 每个任务在虚拟线程上运行
ScopedValue.runWhere(USER_ID, request.getUserId(), () -> {
processRequest(request);
});
});
}
scope.join();
}
// 作用域值的存储开销小,性能好
}
作用域值在虚拟线程场景下性能更好,配合结构化并发使用效果更好。
总结
结构化并发和作用域值的协同使用,确实让现代并发编程变得更安全、更高效了。结构化并发负责管理任务的生命周期,作用域值负责在任务间共享数据,它们配合使用,可以同时解决任务管理和数据共享的问题。
鹏磊我觉得这个组合特别适合那些需要在并发任务间共享上下文数据的场景,比如Web服务器、微服务调用、批处理任务这些。用结构化并发和作用域值,代码简洁,数据安全,性能也好。
总的来说,结构化并发和作用域值是现代Java并发编程的重要工具,它们的协同使用让并发编程变得更优雅、更安全。在实际开发中,应该多使用这些特性,提高代码质量。