05、Java 22 新特性:结构化并发(JEP 462)简化多线程编程

兄弟们,鹏磊今天来聊聊 Java 22 里的结构化并发(Structured Concurrency)这个新特性,这玩意儿是 JEP 462 引入的,专门用来简化多线程编程的。说实话,写多线程代码这么多年了,最头疼的就是线程管理、异常处理、资源清理这些破事儿,一个不小心就内存泄漏、线程泄漏,调试起来贼费劲。结构化并发就是为了解决这些问题来的,它把一组相关的并发任务当成一个工作单元来管理,这样代码更清晰、更可靠,也更容易观察和调试。

结构化并发是 Java 22 的预览特性,它提供了一种结构化的方式来组织和管理并发任务。核心思想是:如果任务 A 创建了任务 B 和任务 C,那么任务 A 必须等待 B 和 C 都完成(或者失败)才能结束。这种结构化的方式让并发代码的依赖关系更清晰,错误处理更简单,而且能自动处理线程泄漏问题。

为什么需要结构化并发

先说说传统多线程编程的问题。咱们平时写并发代码,最常用的就是 ExecutorServiceFutureCompletableFuture 这些玩意儿,虽然功能强大,但确实有不少坑:

// 传统方式的并发任务执行
ExecutorService executor = Executors.newCachedThreadPool();  // 创建线程池

// 提交多个任务
Future<String> future1 = executor.submit(() -> fetchData("url1"));  // 提交第一个任务
Future<String> future2 = executor.submit(() -> fetchData("url2"));  // 提交第二个任务
Future<String> future3 = executor.submit(() -> fetchData("url3"));  // 提交第三个任务

try {
    // 获取结果
    String result1 = future1.get();  // 等待第一个任务完成
    String result2 = future2.get();  // 等待第二个任务完成
    String result3 = future3.get();  // 等待第三个任务完成
    
    // 处理结果
    processResults(result1, result2, result3);  // 处理所有结果
} catch (ExecutionException e) {
    // 异常处理很麻烦,得一个个检查
    System.err.println("任务执行失败: " + e.getCause());  // 输出异常信息
} finally {
    // 问题1:容易忘记关闭线程池,导致资源泄漏
    executor.shutdown();  // 必须手动关闭,否则线程会一直存在
}

// 问题2:如果某个任务失败,其他任务可能还在运行,浪费资源
// 问题3:异常处理复杂,得分别处理每个 Future 的异常
// 问题4:线程泄漏风险,如果忘记关闭 executor,线程会一直存在

结构化并发就是为了解决这些问题。它通过结构化的作用域来管理并发任务,确保所有任务都在同一个作用域内完成,作用域结束时自动清理资源,而且异常处理更简单。

核心概念:StructuredTaskScope

结构化并发的核心是 StructuredTaskScope 类,它位于 java.util.concurrent 包里。这个类允许你把一组并发子任务当成一个单元来协调管理。

基本用法

StructuredTaskScope 的基本用法很简单,在 try-with-resources 语句里创建作用域,然后 fork 子任务,最后 join 等待所有任务完成:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.Callable;  // 导入可调用接口
import java.util.concurrent.ExecutionException;  // 导入执行异常类

// 使用结构化并发执行多个任务
public String fetchUserData(String userId) throws InterruptedException {
    // 在 try-with-resources 中创建作用域,作用域结束时自动关闭
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域,任何一个任务失败就关闭
        // 定义子任务,每个任务返回不同的数据
        Callable<String> userInfoTask = () -> fetchUserInfo(userId);  // 获取用户基本信息
        Callable<String> userOrdersTask = () -> fetchUserOrders(userId);  // 获取用户订单
        Callable<String> userPreferencesTask = () -> fetchUserPreferences(userId);  // 获取用户偏好设置
        
        // Fork 子任务,每个任务在独立的线程中运行
        var userInfoFuture = scope.fork(userInfoTask);  // Fork 第一个任务,返回 Future
        var userOrdersFuture = scope.fork(userOrdersTask);  // Fork 第二个任务
        var userPreferencesFuture = scope.fork(userPreferencesTask);  // Fork 第三个任务
        
        // Join 等待所有任务完成(或者任何一个失败)
        scope.join();  // 等待所有任务完成,如果任何一个失败会抛出异常
        
        // 检查是否有任务失败
        scope.throwIfFailed();  // 如果有任务失败,抛出异常
        
        // 获取所有任务的结果
        String userInfo = userInfoFuture.get();  // 获取用户信息
        String userOrders = userOrdersFuture.get();  // 获取订单信息
        String userPreferences = userPreferencesFuture.get();  // 获取偏好设置
        
        // 组合结果
        return combineResults(userInfo, userOrders, userPreferences);  // 组合所有结果返回
    }
    // 作用域结束时,所有未完成的任务会自动取消,线程会自动清理,不用担心资源泄漏
}

这玩意儿的好处是,所有任务都在同一个作用域内,作用域结束时自动清理,不用担心线程泄漏。而且如果任何一个任务失败,可以立即取消其他任务,避免浪费资源。

ShutdownOnFailure:任何一个失败就关闭

ShutdownOnFailureStructuredTaskScope 的一个子类,它的策略是:如果任何一个子任务失败,就立即关闭作用域,取消所有未完成的任务。这适合需要所有任务都成功的场景。

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException;  // 导入执行异常类
import java.time.Instant;  // 导入时间类

// 使用 ShutdownOnFailure 执行多个必须全部成功的任务
public String processOrder(String orderId) throws InterruptedException {
    Instant deadline = Instant.now().plusSeconds(30);  // 设置30秒超时时间
    
    // 创建 ShutdownOnFailure 作用域,任何一个任务失败就关闭
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
        // Fork 多个子任务
        var validateTask = scope.fork(() -> validateOrder(orderId));  // 验证订单
        var checkInventoryTask = scope.fork(() -> checkInventory(orderId));  // 检查库存
        var calculatePriceTask = scope.fork(() -> calculatePrice(orderId));  // 计算价格
        var checkPaymentTask = scope.fork(() -> checkPayment(orderId));  // 检查支付
        
        // 等待所有任务完成,或者超时
        scope.joinUntil(deadline);  // 等待所有任务完成,或者到达截止时间
        
        // 检查是否有任务失败,如果有就抛出异常
        scope.throwIfFailed(e -> new OrderProcessingException("订单处理失败", e));  // 如果有失败,抛出业务异常
        
        // 所有任务都成功了,获取结果
        String validation = validateTask.get();  // 获取验证结果
        String inventory = checkInventoryTask.get();  // 获取库存检查结果
        String price = calculatePriceTask.get();  // 获取价格计算结果
        String payment = checkPaymentTask.get();  // 获取支付检查结果
        
        // 处理所有结果
        return processOrderResults(validation, inventory, price, payment);  // 处理所有结果
    }
    // 如果任何一个任务失败,作用域会自动取消其他任务,然后关闭
}

这个场景下,如果验证订单失败,就不需要继续检查库存、计算价格了,直接取消所有任务,节省资源。

ShutdownOnSuccess:第一个成功就关闭

ShutdownOnSuccess 是另一个子类,它的策略是:只要有一个子任务成功,就立即关闭作用域,取消其他未完成的任务。这适合只需要一个结果就够的场景,比如从多个数据源获取数据,只要有一个成功就行。

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException;  // 导入执行异常类

// 使用 ShutdownOnSuccess 从多个数据源获取数据,只要一个成功就行
public String fetchDataFromMultipleSources(String query) throws InterruptedException {
    // 创建 ShutdownOnSuccess 作用域,第一个任务成功就关闭
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {  // 创建作用域,泛型是结果类型
        // Fork 多个子任务,从不同的数据源获取数据
        scope.fork(() -> fetchFromDatabase(query));  // 从数据库获取
        scope.fork(() -> fetchFromCache(query));  // 从缓存获取
        scope.fork(() -> fetchFromAPI(query));  // 从 API 获取
        scope.fork(() -> fetchFromFile(query));  // 从文件获取
        
        // 等待第一个任务成功
        scope.join();  // 等待第一个任务成功,然后自动取消其他任务
        
        // 获取第一个成功的结果
        String result = scope.result(e -> new DataFetchException("所有数据源都失败了", e));  // 获取结果,如果都失败就抛出异常
        
        return result;  // 返回第一个成功的结果
    }
    // 第一个任务成功后,其他任务会自动取消,作用域关闭
}

这个场景下,如果从缓存获取数据成功了,就不需要继续从数据库、API、文件获取了,直接取消其他任务,提高响应速度。

实际应用场景

场景1:并行查询多个数据源

最常见的场景就是并行查询多个数据源,然后组合结果。比如查询用户信息,需要同时从数据库、缓存、外部 API 获取数据:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.List;  // 导入列表类
import java.util.ArrayList;  // 导入数组列表类

// 用户信息记录
record UserData(String basicInfo, String orders, String preferences) {}  // 用户数据记录,包含基本信息、订单、偏好

// 并行查询用户数据
public UserData fetchUserDataParallel(String userId) throws InterruptedException {
    // 创建 ShutdownOnFailure 作用域,需要所有数据都成功
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
        // Fork 三个子任务,并行查询
        var basicInfoTask = scope.fork(() -> {  // Fork 第一个任务
            // 模拟从数据库查询用户基本信息
            Thread.sleep(100);  // 模拟网络延迟
            return "用户ID: " + userId + ", 姓名: 张三";  // 返回基本信息
        });
        
        var ordersTask = scope.fork(() -> {  // Fork 第二个任务
            // 模拟从订单服务查询订单
            Thread.sleep(150);  // 模拟网络延迟
            return "订单1, 订单2, 订单3";  // 返回订单信息
        });
        
        var preferencesTask = scope.fork(() -> {  // Fork 第三个任务
            // 模拟从偏好服务查询偏好设置
            Thread.sleep(120);  // 模拟网络延迟
            return "主题: 暗色, 语言: 中文";  // 返回偏好设置
        });
        
        // 等待所有任务完成
        scope.join();  // 等待所有任务完成
        
        // 检查是否有任务失败
        scope.throwIfFailed(e -> new RuntimeException("查询用户数据失败", e));  // 如果有失败,抛出异常
        
        // 获取所有结果
        String basicInfo = basicInfoTask.get();  // 获取基本信息
        String orders = ordersTask.get();  // 获取订单信息
        String preferences = preferencesTask.get();  // 获取偏好设置
        
        // 组合结果
        return new UserData(basicInfo, orders, preferences);  // 返回组合后的用户数据
    }
    // 作用域结束时,所有任务都已完成或已取消,资源自动清理
}

这样写的好处是,三个查询任务并行执行,总耗时是三个任务中最慢的那个,而不是三个任务的时间相加。而且如果任何一个任务失败,其他任务会自动取消,不会浪费资源。

场景2:快速失败:第一个成功就返回

有时候我们只需要一个结果就够了,比如从多个镜像服务器下载文件,只要一个成功就行:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.io.IOException;  // 导入 IO 异常类

// 从多个镜像服务器下载文件,只要一个成功就行
public byte[] downloadFileFast(String fileName) throws InterruptedException {
    // 创建 ShutdownOnSuccess 作用域,第一个成功就关闭
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<byte[]>()) {  // 创建作用域,结果类型是字节数组
        // Fork 多个子任务,从不同的镜像服务器下载
        scope.fork(() -> downloadFromMirror1(fileName));  // 从镜像1下载
        scope.fork(() -> downloadFromMirror2(fileName));  // 从镜像2下载
        scope.fork(() -> downloadFromMirror3(fileName));  // 从镜像3下载
        scope.fork(() -> downloadFromMirror4(fileName));  // 从镜像4下载
        
        // 等待第一个任务成功
        scope.join();  // 等待第一个任务成功
        
        // 获取第一个成功的结果
        byte[] fileData = scope.result(e -> {  // 获取结果
            // 如果所有任务都失败,抛出异常
            throw new IOException("所有镜像服务器都下载失败", e);  // 抛出 IO 异常
        });
        
        return fileData;  // 返回第一个成功下载的文件数据
    }
    // 第一个任务成功后,其他任务会自动取消,节省带宽和资源
}

// 模拟从镜像服务器下载文件
private byte[] downloadFromMirror1(String fileName) throws IOException {
    // 模拟下载逻辑
    Thread.sleep(200);  // 模拟网络延迟
    return ("文件内容 from mirror1: " + fileName).getBytes();  // 返回文件数据
}

private byte[] downloadFromMirror2(String fileName) throws IOException {
    Thread.sleep(150);  // 模拟网络延迟,这个更快
    return ("文件内容 from mirror2: " + fileName).getBytes();  // 返回文件数据
}

private byte[] downloadFromMirror3(String fileName) throws IOException {
    Thread.sleep(300);  // 模拟网络延迟,这个最慢
    return ("文件内容 from mirror3: " + fileName).getBytes();  // 返回文件数据
}

private byte[] downloadFromMirror4(String fileName) throws IOException {
    Thread.sleep(250);  // 模拟网络延迟
    return ("文件内容 from mirror4: " + fileName).getBytes();  // 返回文件数据
}

这个场景下,如果镜像2最快下载成功,其他三个镜像的下载任务会自动取消,节省带宽和服务器资源。

场景3:超时控制

结构化并发还支持超时控制,可以设置一个截止时间,如果在这个时间之前任务还没完成,就取消所有任务:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.time.Instant;  // 导入时间类
import java.time.Duration;  // 导入时长类

// 带超时的并行任务执行
public String fetchDataWithTimeout(String query) throws InterruptedException {
    // 设置5秒超时
    Instant deadline = Instant.now().plus(Duration.ofSeconds(5));  // 计算截止时间
    
    // 创建 ShutdownOnFailure 作用域
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
        // Fork 多个子任务
        var task1 = scope.fork(() -> slowQuery(query));  // Fork 第一个任务,可能很慢
        var task2 = scope.fork(() -> fastQuery(query));  // Fork 第二个任务,比较快
        var task3 = scope.fork(() -> mediumQuery(query));  // Fork 第三个任务,中等速度
        
        // 等待所有任务完成,或者超时
        scope.joinUntil(deadline);  // 等待所有任务完成,或者到达截止时间
        
        // 检查是否超时
        if (scope.isShutdown()) {  // 如果作用域已关闭(可能是超时或失败)
            throw new TimeoutException("任务执行超时");  // 抛出超时异常
        }
        
        // 检查是否有任务失败
        scope.throwIfFailed(e -> new RuntimeException("任务执行失败", e));  // 如果有失败,抛出异常
        
        // 获取所有结果
        String result1 = task1.get();  // 获取第一个任务的结果
        String result2 = task2.get();  // 获取第二个任务的结果
        String result3 = task3.get();  // 获取第三个任务的结果
        
        // 组合结果
        return result1 + " | " + result2 + " | " + result3;  // 组合所有结果
    }
    // 如果超时,所有未完成的任务会自动取消
}

// 模拟慢查询
private String slowQuery(String query) throws InterruptedException {
    Thread.sleep(6000);  // 模拟6秒延迟,会超时
    return "慢查询结果";  // 返回结果
}

// 模拟快查询
private String fastQuery(String query) throws InterruptedException {
    Thread.sleep(1000);  // 模拟1秒延迟
    return "快查询结果";  // 返回结果
}

// 模拟中等速度查询
private String mediumQuery(String query) throws InterruptedException {
    Thread.sleep(3000);  // 模拟3秒延迟
    return "中等查询结果";  // 返回结果
}

这个场景下,如果5秒内所有任务都完成了,就正常返回结果;如果5秒到了还有任务没完成,就取消所有任务,抛出超时异常。

场景4:异常处理和错误传播

结构化并发让异常处理变得更简单,所有子任务的异常都会自动传播到主任务:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.ExecutionException;  // 导入执行异常类

// 处理异常和错误传播
public void processWithErrorHandling(String data) throws InterruptedException {
    // 创建 ShutdownOnFailure 作用域
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
        // Fork 多个子任务
        var validateTask = scope.fork(() -> validateData(data));  // 验证数据
        var processTask = scope.fork(() -> processData(data));  // 处理数据
        var saveTask = scope.fork(() -> saveData(data));  // 保存数据
        
        // 等待所有任务完成
        scope.join();  // 等待所有任务完成
        
        // 检查是否有任务失败,如果有就抛出异常
        scope.throwIfFailed(e -> {  // 如果有失败,执行这个函数
            // 可以根据异常类型做不同的处理
            if (e instanceof ValidationException) {  // 如果是验证异常
                return new BusinessException("数据验证失败", e);  // 转换为业务异常
            } else if (e instanceof ProcessingException) {  // 如果是处理异常
                return new BusinessException("数据处理失败", e);  // 转换为业务异常
            } else {
                return new RuntimeException("未知错误", e);  // 转换为运行时异常
            }
        });
        
        // 所有任务都成功了,继续处理
        String validated = validateTask.get();  // 获取验证结果
        String processed = processTask.get();  // 获取处理结果
        String saved = saveTask.get();  // 获取保存结果
        
        // 继续后续处理
        System.out.println("所有任务都成功了");  // 输出成功信息
    } catch (BusinessException e) {  // 捕获业务异常
        // 处理业务异常
        System.err.println("业务处理失败: " + e.getMessage());  // 输出错误信息
        throw e;  // 重新抛出异常
    }
    // 如果任何一个任务失败,其他任务会自动取消,作用域关闭
}

// 模拟验证数据
private String validateData(String data) {
    if (data == null || data.isEmpty()) {  // 如果数据为空
        throw new ValidationException("数据不能为空");  // 抛出验证异常
    }
    return "验证通过: " + data;  // 返回验证结果
}

// 模拟处理数据
private String processData(String data) {
    // 模拟处理逻辑
    return "处理后的数据: " + data.toUpperCase();  // 返回处理结果
}

// 模拟保存数据
private String saveData(String data) {
    // 模拟保存逻辑
    return "保存成功: " + data;  // 返回保存结果
}

// 自定义异常类
class ValidationException extends RuntimeException {
    public ValidationException(String message) {
        super(message);  // 调用父类构造函数
    }
}

class ProcessingException extends RuntimeException {
    public ProcessingException(String message) {
        super(message);  // 调用父类构造函数
    }
}

class BusinessException extends RuntimeException {
    public BusinessException(String message, Throwable cause) {
        super(message, cause);  // 调用父类构造函数
    }
}

这个场景下,如果验证数据失败,其他任务(处理数据、保存数据)会自动取消,然后抛出业务异常,异常处理逻辑很清晰。

与虚拟线程的配合

结构化并发和虚拟线程(Virtual Threads)是绝配。虚拟线程是 Java 21 引入的,它让创建大量线程变得非常轻量级。结构化并发可以很好地管理虚拟线程,确保它们不会泄漏:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.Executor;  // 导入执行器接口
import java.util.concurrent.Executors;  // 导入执行器工具类

// 使用虚拟线程执行结构化并发任务
public void processWithVirtualThreads(List<String> items) throws InterruptedException {
    // 创建虚拟线程执行器
    try (Executor executor = Executors.newVirtualThreadPerTaskExecutor()) {  // 创建虚拟线程执行器
        // 创建 ShutdownOnFailure 作用域,使用虚拟线程执行器
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
            // Fork 大量子任务,每个任务在虚拟线程中运行
            var futures = items.stream()  // 将列表转为流
                    .map(item -> scope.fork(() -> processItem(item)))  // 为每个项目创建任务
                    .toList();  // 转为列表
            
            // 等待所有任务完成
            scope.join();  // 等待所有任务完成
            
            // 检查是否有任务失败
            scope.throwIfFailed(e -> new RuntimeException("处理失败", e));  // 如果有失败,抛出异常
            
            // 获取所有结果
            var results = futures.stream()  // 将 Future 列表转为流
                    .map(future -> {
                        try {
                            return future.get();  // 获取每个任务的结果
                        } catch (ExecutionException e) {
                            throw new RuntimeException(e);  // 转换为运行时异常
                        }
                    })
                    .toList();  // 转为列表
            
            // 处理所有结果
            System.out.println("处理了 " + results.size() + " 个项目");  // 输出处理数量
        }
        // 作用域结束时,所有虚拟线程会自动清理,不用担心线程泄漏
    }
    // 执行器关闭时,所有虚拟线程都会自动清理
}

// 模拟处理单个项目
private String processItem(String item) throws InterruptedException {
    Thread.sleep(100);  // 模拟处理时间
    return "处理完成: " + item;  // 返回处理结果
}

这个场景下,即使创建了成千上万个虚拟线程,结构化并发也能确保它们都在同一个作用域内,作用域结束时自动清理,不会泄漏。

自定义 StructuredTaskScope

除了使用 ShutdownOnFailureShutdownOnSuccess,我们还可以自定义 StructuredTaskScope,实现自己的关闭策略:

import java.util.concurrent.StructuredTaskScope;  // 导入结构化任务作用域类
import java.util.concurrent.Future;  // 导入 Future 接口

// 自定义 StructuredTaskScope,实现自定义关闭策略
class CustomTaskScope<T> extends StructuredTaskScope<T> {  // 继承 StructuredTaskScope
    private int successCount = 0;  // 成功任务计数
    private int failureCount = 0;  // 失败任务计数
    private final int requiredSuccesses;  // 需要的成功数量
    
    // 构造函数
    public CustomTaskScope(int requiredSuccesses) {  // 构造函数,传入需要的成功数量
        this.requiredSuccesses = requiredSuccesses;  // 保存需要的成功数量
    }
    
    // 重写 handleComplete 方法,实现自定义关闭策略
    @Override
    protected void handleComplete(Future<T> future) {  // 处理任务完成
        try {
            future.get();  // 获取任务结果
            successCount++;  // 成功计数加1
            
            // 如果达到需要的成功数量,关闭作用域
            if (successCount >= requiredSuccesses) {  // 如果成功数量达到要求
                shutdown();  // 关闭作用域,取消其他任务
            }
        } catch (Exception e) {  // 如果任务失败
            failureCount++;  // 失败计数加1
            
            // 如果失败太多,关闭作用域
            if (failureCount > 3) {  // 如果失败超过3次
                shutdown();  // 关闭作用域
            }
        }
    }
    
    // 获取成功数量
    public int getSuccessCount() {
        return successCount;  // 返回成功数量
    }
    
    // 获取失败数量
    public int getFailureCount() {
        return failureCount;  // 返回失败数量
    }
}

// 使用自定义的 StructuredTaskScope
public void useCustomTaskScope() throws InterruptedException {
    // 创建自定义作用域,需要至少2个任务成功
    try (var scope = new CustomTaskScope<String>(2)) {  // 创建自定义作用域
        // Fork 多个子任务
        scope.fork(() -> task1());  // Fork 第一个任务
        scope.fork(() -> task2());  // Fork 第二个任务
        scope.fork(() -> task3());  // Fork 第三个任务
        scope.fork(() -> task4());  // Fork 第四个任务
        
        // 等待作用域关闭(达到成功数量或失败太多)
        scope.join();  // 等待作用域关闭
        
        // 检查结果
        System.out.println("成功: " + scope.getSuccessCount());  // 输出成功数量
        System.out.println("失败: " + scope.getFailureCount());  // 输出失败数量
    }
    // 作用域结束时,所有任务都已完成或已取消
}

// 模拟任务
private String task1() throws InterruptedException {
    Thread.sleep(100);  // 模拟处理时间
    return "任务1完成";  // 返回结果
}

private String task2() throws InterruptedException {
    Thread.sleep(150);  // 模拟处理时间
    return "任务2完成";  // 返回结果
}

private String task3() throws InterruptedException {
    Thread.sleep(200);  // 模拟处理时间
    throw new RuntimeException("任务3失败");  // 抛出异常,模拟失败
}

private String task4() throws InterruptedException {
    Thread.sleep(250);  // 模拟处理时间
    return "任务4完成";  // 返回结果
}

这个自定义作用域的策略是:如果至少有2个任务成功,就关闭作用域;如果失败超过3次,也关闭作用域。这样可以根据业务需求灵活控制任务的执行。

最佳实践

使用结构化并发时,有几个最佳实践需要注意:

1. 总是使用 try-with-resources

StructuredTaskScope 实现了 AutoCloseable 接口,必须使用 try-with-resources 来确保资源正确清理:

// ✅ 正确:使用 try-with-resources
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 使用 try-with-resources
    // 使用作用域
}
// 作用域自动关闭,资源自动清理

// ❌ 错误:手动管理资源
var scope = new StructuredTaskScope.ShutdownOnFailure();  // 手动创建
// 使用作用域
// 容易忘记关闭,导致资源泄漏

2. 合理选择关闭策略

根据业务需求选择合适的关闭策略:

// 需要所有任务都成功:使用 ShutdownOnFailure
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 所有任务都必须成功
    // Fork 任务
    scope.join();  // 等待所有任务完成
    scope.throwIfFailed();  // 检查是否有失败
}

// 只需要一个任务成功:使用 ShutdownOnSuccess
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {  // 只需要一个成功
    // Fork 任务
    scope.join();  // 等待第一个任务成功
    String result = scope.result();  // 获取结果
}

3. 正确处理异常

使用 throwIfFailedresult 方法来处理异常,不要直接调用 Future.get()

// ✅ 正确:使用 throwIfFailed 处理异常
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
    var future = scope.fork(() -> task());  // Fork 任务
    scope.join();  // 等待任务完成
    scope.throwIfFailed(e -> new BusinessException("任务失败", e));  // 统一处理异常
    String result = future.get();  // 获取结果
}

// ❌ 错误:直接调用 Future.get(),异常处理复杂
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
    var future = scope.fork(() -> task());  // Fork 任务
    scope.join();  // 等待任务完成
    try {
        String result = future.get();  // 直接获取结果,异常处理复杂
    } catch (ExecutionException e) {  // 需要单独处理异常
        // 异常处理
    }
}

4. 与虚拟线程配合使用

结构化并发和虚拟线程是绝配,可以创建大量轻量级任务:

// 使用虚拟线程执行器
try (Executor executor = Executors.newVirtualThreadPerTaskExecutor()) {  // 创建虚拟线程执行器
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {  // 创建作用域
        // Fork 大量任务,每个任务在虚拟线程中运行
        for (int i = 0; i < 10000; i++) {  // 创建10000个任务
            scope.fork(() -> processItem(i));  // Fork 任务
        }
        scope.join();  // 等待所有任务完成
    }
}

总结

结构化并发(JEP 462)是 Java 22 引入的一个重要特性,它通过结构化的方式管理并发任务,让多线程编程变得更简单、更可靠。核心优势包括:

  1. 结构化管理:把一组相关的并发任务当成一个工作单元来管理,依赖关系更清晰
  2. 自动资源清理:作用域结束时自动清理所有资源,不用担心线程泄漏
  3. 简化异常处理:统一的异常处理机制,让错误处理更简单
  4. 灵活的关闭策略ShutdownOnFailureShutdownOnSuccess 满足不同场景需求
  5. 与虚拟线程配合:可以创建大量轻量级任务,不用担心资源问题

虽然结构化并发还在预览阶段,但它的设计理念和 API 已经相当成熟了。如果你正在写多线程代码,特别是需要管理多个并发任务的场景,强烈建议试试结构化并发,它能让你的代码更清晰、更可靠。

兄弟们,今天就聊到这里。结构化并发这玩意儿确实能简化多线程编程,但也要注意合理使用,根据业务需求选择合适的关闭策略。有啥问题欢迎留言讨论,鹏磊会继续分享 Java 22 的其他新特性。

本文章最后更新于 2025-11-27