07、JDK 25 新特性:结构化并发(JEP 505)简化多线程编程

鹏磊我在写多线程代码时,经常遇到这些问题:任务散落在各处,生命周期不明确,错误处理困难,取消机制复杂。这些问题让并发代码难以理解和维护。JEP 505 的结构化并发(Structured Concurrency)正是为了解决这些问题而设计的。

结构化并发将相关的并发任务组织成一个工作单元,这个单元有明确的开始和结束。所有子任务都在这个单元内执行,单元结束时所有任务都会完成或被取消,不会留下"孤儿任务"。作为第五次预览特性,结构化并发通过 StructuredTaskScope 类提供了清晰的任务管理机制。鹏磊我觉得这让并发代码的结构更加清晰,错误处理和取消操作也更加简单。

结构化并发是啥

先说说啥是结构化并发吧。简单点说,结构化并发就是把相关的并发任务当成一个工作单元来管理,这个工作单元有明确的开始和结束,所有子任务都在这个单元内执行,单元结束的时候所有任务都会完成或者被取消。

这跟传统的并发编程不一样,传统的并发编程里,任务可能散落在各个地方,生命周期不明确,错误处理也麻烦。结构化并发把任务组织成树状结构,父任务管理子任务,子任务的生命周期不能超过父任务,这样就能保证任务的结构清晰,不会出现孤儿任务。

结构化并发的核心原则有几个:第一是任务有明确的层次结构,子任务的生命周期不能超过父任务;第二是错误传播更清晰,子任务的错误会自动传播到父任务;第三是取消机制更简单,取消父任务会自动取消所有子任务;第四是可观测性更好,任务的结构清晰,调试和监控都更方便。

为啥需要结构化并发

传统的并发编程有几个问题,结构化并发就是为了解决这些问题:

第一个问题是任务管理混乱。传统的并发编程里,任务可能散落在各个地方,用 ExecutorService 提交任务,但任务的生命周期不明确,不知道任务啥时候结束,也不知道任务之间的关系。

第二个问题是错误处理复杂。如果多个并发任务里有一个失败了,其他任务可能还在运行,得手动处理错误,代码写起来麻烦,而且容易出错。

第三个问题是取消机制不好用。如果要取消一组相关的任务,得一个个取消,而且可能有些任务已经完成了,有些还在运行,取消逻辑写起来很复杂。

第四个问题是可观测性差。任务散落在各个地方,不知道任务之间的关系,调试和监控都麻烦,特别是出问题的时候,很难定位是哪个任务出了问题。

结构化并发就是为了解决这些问题而设计的,它提供了结构化的任务管理方式,让并发代码更容易理解、维护和调试。

基本用法

先看看结构化并发的基本用法。主要是用 StructuredTaskScope 这个类来管理并发任务。在 JDK 25 里,StructuredTaskScope 是通过静态工厂方法 open() 来创建的,不是用构造函数。

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;

public class StructuredConcurrencyExample {
    
    // 执行多个并发任务
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 用 try-with-resources 创建任务作用域,确保作用域结束时所有任务都完成或取消
        try (var scope = StructuredTaskScope.<String>open()) {
            // 提交第一个任务,fork 方法会立即返回,任务在后台执行
            var task1 = scope.fork(() -> {
                Thread.sleep(100);  // 模拟耗时操作
                return "任务1完成";
            });
            
            // 提交第二个任务
            var task2 = scope.fork(() -> {
                Thread.sleep(150);  // 模拟耗时操作
                return "任务2完成";
            });
            
            // 提交第三个任务
            var task3 = scope.fork(() -> {
                Thread.sleep(200);  // 模拟耗时操作
                return "任务3完成";
            });
            
            // 等待所有任务完成,join 方法会阻塞直到所有任务完成或作用域关闭
            scope.join();
            
            // 获取任务结果,get() 方法会返回任务的结果
            System.out.println(task1.get());  // 输出: 任务1完成
            System.out.println(task2.get());  // 输出: 任务2完成
            System.out.println(task3.get());  // 输出: 任务3完成
        }
        // 作用域结束的时候,所有未完成的任务会自动取消
    }
}

这个例子展示了结构化并发的基本用法。用 try-with-resources 创建 StructuredTaskScope,确保作用域结束时所有任务都完成或取消。用 fork() 方法提交任务,任务会立即开始执行。用 join() 方法等待所有任务完成。作用域结束的时候,所有未完成的任务会自动取消。

任务状态和结果获取

fork() 方法返回的是 Subtask 对象,可以用它来检查任务状态和获取结果。Subtask 有几个状态:UNAVAILABLE(任务还没完成)、SUCCESS(任务成功完成)、FAILED(任务失败)。

import java.util.concurrent.StructuredTaskScope;

public class TaskStateExample {
    
    public static void main(String[] args) throws InterruptedException {
        try (var scope = StructuredTaskScope.<String>open()) {
            // 提交一个成功的任务
            var successTask = scope.fork(() -> {
                Thread.sleep(100);
                return "成功";
            });
            
            // 提交一个失败的任务
            var failTask = scope.fork(() -> {
                Thread.sleep(100);
                throw new RuntimeException("任务失败");
            });
            
            // 等待所有任务完成
            scope.join();
            
            // 检查任务状态
            if (successTask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
                // 任务成功,获取结果
                System.out.println("成功任务结果: " + successTask.get());  // 输出: 成功任务结果: 成功
            }
            
            if (failTask.state() == StructuredTaskScope.Subtask.State.FAILED) {
                // 任务失败,获取异常
                System.out.println("失败任务异常: " + failTask.exception().getMessage());  // 输出: 失败任务异常: 任务失败
            }
        }
    }
}

这个例子展示了如何检查任务状态和获取结果。state() 方法返回任务状态,get() 方法获取任务结果(如果成功),exception() 方法获取任务异常(如果失败)。

错误处理

结构化并发让错误处理更清晰。如果子任务失败了,错误会自动传播,父任务可以统一处理。可以用 ShutdownOnFailure 策略,让第一个任务失败的时候就关闭作用域,取消其他任务。

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;

public class ErrorHandlingExample {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 用 ShutdownOnFailure 策略,第一个任务失败的时候就关闭作用域
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 提交多个任务
            var task1 = scope.fork(() -> {
                Thread.sleep(100);
                return "任务1完成";
            });
            
            var task2 = scope.fork(() -> {
                Thread.sleep(50);
                throw new RuntimeException("任务2失败");  // 这个任务会失败
            });
            
            var task3 = scope.fork(() -> {
                Thread.sleep(200);
                return "任务3完成";
            });
            
            // 等待所有任务完成或第一个任务失败
            scope.join();
            
            // 检查是否有任务失败
            scope.throwIfFailed();  // 如果有任务失败,这里会抛异常
            
            // 如果没失败,获取结果
            System.out.println(task1.get());
            System.out.println(task3.get());
        } catch (ExecutionException e) {
            // 处理异常
            System.out.println("任务执行失败: " + e.getCause().getMessage());  // 输出: 任务执行失败: 任务2失败
        }
    }
}

这个例子展示了错误处理。用 ShutdownOnFailure 策略,第一个任务失败的时候就关闭作用域,取消其他任务。throwIfFailed() 方法会检查是否有任务失败,如果有就抛异常。这样就能统一处理错误了,不用一个个检查任务状态。

取消机制

结构化并发提供了简单的取消机制。取消作用域会自动取消所有未完成的任务。可以用 shutdown() 方法手动取消,或者等作用域结束自动取消。

import java.util.concurrent.StructuredTaskScope;

public class CancellationExample {
    
    public static void main(String[] args) throws InterruptedException {
        try (var scope = StructuredTaskScope.<String>open()) {
            // 提交一个长时间运行的任务
            var longTask = scope.fork(() -> {
                try {
                    Thread.sleep(5000);  // 模拟长时间运行
                    return "长时间任务完成";
                } catch (InterruptedException e) {
                    // 任务被取消时会收到中断信号
                    System.out.println("任务被取消");
                    throw e;
                }
            });
            
            // 提交一个快速任务
            var quickTask = scope.fork(() -> {
                Thread.sleep(100);
                return "快速任务完成";
            });
            
            // 等待快速任务完成
            scope.join();
            
            // 手动取消作用域,所有未完成的任务都会被取消
            scope.shutdown();
            
            // 检查长时间任务是否被取消
            if (longTask.state() == StructuredTaskScope.Subtask.State.UNAVAILABLE) {
                System.out.println("长时间任务被取消了");
            }
        }
    }
}

这个例子展示了取消机制。shutdown() 方法会取消所有未完成的任务,任务会收到中断信号,可以检查 InterruptedException 来处理取消。作用域结束的时候,所有未完成的任务也会自动取消。

ShutdownOnSuccess 策略

除了 ShutdownOnFailure,还有 ShutdownOnSuccess 策略,第一个任务成功的时候就关闭作用域,取消其他任务。这在"第一个成功就返回"的场景里很有用。

import java.util.concurrent.StructuredTaskScope;

public class ShutdownOnSuccessExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 用 ShutdownOnSuccess 策略,第一个任务成功的时候就关闭作用域
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            // 提交多个任务,模拟从多个数据源获取数据
            var task1 = scope.fork(() -> {
                Thread.sleep(200);  // 模拟慢速数据源
                return "数据源1的数据";
            });
            
            var task2 = scope.fork(() -> {
                Thread.sleep(100);  // 模拟快速数据源
                return "数据源2的数据";  // 这个会先完成
            });
            
            var task3 = scope.fork(() -> {
                Thread.sleep(300);  // 模拟慢速数据源
                return "数据源3的数据";
            });
            
            // 等待第一个任务成功
            scope.join();
            
            // 获取第一个成功的任务结果
            String result = scope.result();  // 获取第一个成功的任务结果
            System.out.println("获取到数据: " + result);  // 输出: 获取到数据: 数据源2的数据
            
            // 其他任务会被自动取消
        }
    }
}

这个例子展示了 ShutdownOnSuccess 策略。第一个任务成功的时候就关闭作用域,取消其他任务。result() 方法返回第一个成功的任务结果。这在"第一个成功就返回"的场景里很有用,比如从多个数据源获取数据,只要有一个成功就行。

超时控制

可以用 joinUntil() 方法设置超时,如果超时了还没完成,可以取消任务。

import java.time.Instant;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.StructuredTaskScope;

public class TimeoutExample {
    
    public static void main(String[] args) throws InterruptedException {
        try (var scope = StructuredTaskScope.<String>open()) {
            // 提交一个任务
            var task = scope.fork(() -> {
                Thread.sleep(5000);  // 模拟长时间运行
                return "任务完成";
            });
            
            try {
                // 设置超时时间为 1 秒
                Instant deadline = Instant.now().plusSeconds(1);
                scope.joinUntil(deadline);  // 等待任务完成,最多等 1 秒
                
                // 如果超时了,这里不会执行
                System.out.println("任务结果: " + task.get());
            } catch (TimeoutException e) {
                // 超时了,取消任务
                System.out.println("任务超时,取消任务");
                scope.shutdown();  // 取消所有任务
            }
        }
    }
}

这个例子展示了超时控制。joinUntil() 方法设置超时时间,如果超时了还没完成,会抛 TimeoutException,可以手动取消任务。这在需要控制任务执行时间的场景里很有用。

与虚拟线程配合使用

结构化并发跟虚拟线程配合使用效果特别好。虚拟线程是轻量级的,可以创建很多个,结构化并发可以很好地管理这些虚拟线程。

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class VirtualThreadExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 创建虚拟线程执行器
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // 在虚拟线程里使用结构化并发
            executor.submit(() -> {
                try (var scope = StructuredTaskScope.<String>open()) {
                    // 提交多个任务,这些任务会在虚拟线程里执行
                    var task1 = scope.fork(() -> {
                        Thread.sleep(100);
                        return "虚拟线程任务1完成";
                    });
                    
                    var task2 = scope.fork(() -> {
                        Thread.sleep(150);
                        return "虚拟线程任务2完成";
                    });
                    
                    // 等待所有任务完成
                    scope.join();
                    
                    // 获取结果
                    System.out.println(task1.get());
                    System.out.println(task2.get());
                }
            });
            
            // 等待任务完成
            Thread.sleep(200);
        }
    }
}

这个例子展示了结构化并发跟虚拟线程的配合使用。虚拟线程是轻量级的,可以创建很多个,结构化并发可以很好地管理这些虚拟线程,确保任务的结构清晰,不会出现孤儿任务。

实际应用场景

结构化并发在实际开发中还是挺有用的,下面举几个常见的应用场景。

场景一:并行调用多个服务

在微服务架构里,经常需要并行调用多个服务,然后合并结果。结构化并发很适合这种场景。

import java.util.concurrent.StructuredTaskScope;
import java.util.List;
import java.util.ArrayList;

public class MicroserviceExample {
    
    // 并行调用多个服务
    public List<String> fetchDataFromMultipleServices() throws InterruptedException {
        try (var scope = StructuredTaskScope.<String>open()) {
            // 并行调用多个服务
            var service1Task = scope.fork(() -> callService1());  // 调用服务1
            var service2Task = scope.fork(() -> callService2());  // 调用服务2
            var service3Task = scope.fork(() -> callService3());  // 调用服务3
            
            // 等待所有服务响应
            scope.join();
            
            // 收集结果
            List<String> results = new ArrayList<>();
            if (service1Task.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
                results.add(service1Task.get());
            }
            if (service2Task.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
                results.add(service2Task.get());
            }
            if (service3Task.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
                results.add(service3Task.get());
            }
            
            return results;
        }
    }
    
    // 模拟调用服务
    private String callService1() throws InterruptedException {
        Thread.sleep(100);
        return "服务1的数据";
    }
    
    private String callService2() throws InterruptedException {
        Thread.sleep(150);
        return "服务2的数据";
    }
    
    private String callService3() throws InterruptedException {
        Thread.sleep(200);
        return "服务3的数据";
    }
}

场景二:并行处理数据

在处理大量数据的时候,可以并行处理多个数据块,提高处理速度。

import java.util.concurrent.StructuredTaskScope;
import java.util.List;
import java.util.ArrayList;

public class DataProcessingExample {
    
    // 并行处理数据块
    public List<String> processDataBlocks(List<List<Integer>> dataBlocks) throws InterruptedException {
        try (var scope = StructuredTaskScope.<String>open()) {
            // 为每个数据块提交处理任务
            List<StructuredTaskScope.Subtask<String>> tasks = new ArrayList<>();
            for (List<Integer> block : dataBlocks) {
                tasks.add(scope.fork(() -> processBlock(block)));  // 提交处理任务
            }
            
            // 等待所有任务完成
            scope.join();
            
            // 收集处理结果
            List<String> results = new ArrayList<>();
            for (var task : tasks) {
                if (task.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
                    results.add(task.get());
                }
            }
            
            return results;
        }
    }
    
    // 处理单个数据块
    private String processBlock(List<Integer> block) {
        // 模拟数据处理
        int sum = block.stream().mapToInt(Integer::intValue).sum();
        return "处理结果: " + sum;
    }
}

场景三:第一个成功就返回

有时候需要从多个数据源获取数据,只要有一个成功就行,可以用 ShutdownOnSuccess 策略。

import java.util.concurrent.StructuredTaskScope;

public class FirstSuccessExample {
    
    // 从多个数据源获取数据,第一个成功就返回
    public String fetchDataFromMultipleSources() throws InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            // 从多个数据源获取数据
            scope.fork(() -> fetchFromSource1());  // 从数据源1获取
            scope.fork(() -> fetchFromSource2());  // 从数据源2获取
            scope.fork(() -> fetchFromSource3());  // 从数据源3获取
            
            // 等待第一个成功
            scope.join();
            
            // 返回第一个成功的结果
            return scope.result();
        }
    }
    
    // 模拟从数据源获取数据
    private String fetchFromSource1() throws InterruptedException {
        Thread.sleep(200);
        return "数据源1的数据";
    }
    
    private String fetchFromSource2() throws InterruptedException {
        Thread.sleep(100);  // 这个最快
        return "数据源2的数据";
    }
    
    private String fetchFromSource3() throws InterruptedException {
        Thread.sleep(300);
        return "数据源3的数据";
    }
}

与传统并发编程的对比

结构化并发跟传统的并发编程有几个主要区别:

第一个区别是任务管理方式。传统并发编程用 ExecutorService 提交任务,任务散落在各个地方,生命周期不明确。结构化并发用 StructuredTaskScope 管理任务,任务有明确的层次结构,生命周期清晰。

第二个区别是错误处理。传统并发编程得手动处理错误,代码写起来麻烦。结构化并发提供了统一的错误处理机制,错误会自动传播,可以用策略来控制错误处理行为。

第三个区别是取消机制。传统并发编程取消任务得一个个取消,代码写起来复杂。结构化并发取消作用域会自动取消所有任务,简单多了。

第四个区别是可观测性。传统并发编程任务散落在各个地方,调试和监控都麻烦。结构化并发任务结构清晰,调试和监控都方便。

注意事项

用结构化并发的时候有几个注意事项:

第一个是作用域的生命周期。作用域结束的时候,所有未完成的任务会自动取消,所以要确保作用域的生命周期覆盖所有需要执行的任务。

第二个是错误处理。要根据实际需求选择合适的策略,ShutdownOnFailure 适合需要所有任务都成功的场景,ShutdownOnSuccess 适合第一个成功就返回的场景。

第三个是性能考虑。结构化并发虽然让代码更清晰,但如果任务数量很多,还是要注意性能,特别是跟虚拟线程配合使用的时候。

第四个是兼容性。结构化并发是预览特性,API 可能会变化,生产环境用的时候要注意版本兼容性。

总结

结构化并发(JEP 505)是 JDK 25 引入的一个很重要的特性,它提供了结构化的任务管理方式,让并发代码更容易理解、维护和调试。主要优势包括:任务有明确的层次结构,错误处理更清晰,取消机制更简单,可观测性更好。

在实际开发中,结构化并发特别适合用在并行调用多个服务、并行处理数据、第一个成功就返回等场景。配合虚拟线程使用,效果更好。

虽然结构化并发有一些限制,比如作用域的生命周期管理、错误处理策略选择等,但这些限制也带来了代码清晰度和可维护性的提升。总的来说,结构化并发是一个很好的并发编程改进,值得在实际项目中尝试使用。

兄弟们,结构化并发这特性还是挺实用的,特别是如果你在处理复杂的并发场景,结构化并发能让代码写起来更清晰,错误处理也更简单。建议大家在合适的场景下试试,应该会有不错的体验。

本文章最后更新于 2025-11-27