如果一个任务由不同的子任务组成,可以并行完成(例如,从数据库访问数据、调用远程 API 和加载文件),我们可以使用 Java 多线程的一些工具类来完成。
例如:
private final ExecutorService executor = Executors.newCachedThreadPool();
// jdk 19 之前
public Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException {
Future<Order> orderFuture = executor.submit(() -> loadOrderFromOrderService(orderId));
Future<Customer> customerFuture = executor.submit(() -> loadCustomerFromDatabase(customerId));
Future<String> invoiceTemplateFuture = executor.submit(() -> loadInvoiceTemplateFromFile(language));
Order order = orderFuture.get();
Customer customer = customerFuture.get();
String invoiceTemplate = invoiceTemplateFuture.get();
return Invoice.generate(order, customer, invoiceTemplate);
}
但是:
如果一个子任务发生错误–我们如何取消其他子任务?
如果某个子任务不需要了,我们如何取消这个子任务呢?
这两种情况都可以,但需要相当复杂和难以维护的代码。
而如果我们想对这种类型的并发代码进行调试也非常麻烦。
[JDK Enhancement Proposal 428][]为所谓的 "结构化并发 "引入了一个 API,这个概念旨在改善这种类型需求的代码的实现、可读性和可维护性。
使用StructuredTaskScope,我们可以把这个例子改写成如下。
public Invoice createInvoiceSinceJava19(int orderId, int customerId, String language)
throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<Order> orderFuture =
scope.fork(() -> loadOrderFromOrderService(orderId));
Future<Customer> customerFuture =
scope.fork(() -> loadCustomerFromDatabase(customerId));
Future<String> invoiceTemplateFuture =
scope.fork(() -> loadInvoiceTemplateFromFile(language));
scope.join();
scope.throwIfFailed();
Order order = orderFuture.resultNow();
Customer customer = customerFuture.resultNow();
String invoiceTemplate = invoiceTemplateFuture.resultNow();
return new Invoice(order, customer, invoiceTemplate);
}
}
使用StructuredTaskScope,我们可以将 executor.submit()
替换为 scope.fork()
。
使用scope.join()
,我们等待所有任务完成–或者至少有一个任务失败或被取消。在后两种情况下,随后的 throwIfFailed()
会抛出一个 ExecutionException
或一个 CancellationException
。
与旧方法相比,新方法带来了以下改进。
1、 任务和子任务在代码中形成一个独立的单元,每个子任务都在一个新的虚拟线程中执行;
2、 一旦其中一个子任务发生错误,所有其他子任务都会被取消;
3、 当调用线程被取消时,子任务也会被取消;
完整代码如下:
package git.snippets.jdk19;
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 预览功能
* 控制台运行
* 1. 配置Java运行环境是JDK 19
* 2. 注释掉 package 路径
* 3. 在本代码的目录下执行
* 编译:javac --enable-preview -source 19 --add-modules jdk.incubator.concurrent StructuredConcurrencyTest.java
*运行:java --enable-preview --add-modules jdk.incubator.concurrent StructuredConcurrencyTest
*/
public class StructuredConcurrencyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new StructuredConcurrencyTest().createInvoiceSinceJava19(1, 2, "ZH");
}
private final ExecutorService executor = Executors.newCachedThreadPool();
// jdk 19 之前
public Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException {
Future<Order> orderFuture = executor.submit(() -> loadOrderFromOrderService(orderId));
Future<Customer> customerFuture = executor.submit(() -> loadCustomerFromDatabase(customerId));
Future<String> invoiceTemplateFuture = executor.submit(() -> loadInvoiceTemplateFromFile(language));
Order order = orderFuture.get();
Customer customer = customerFuture.get();
String invoiceTemplate = invoiceTemplateFuture.get();
return Invoice.generate(order, customer, invoiceTemplate);
}
// jdk 19 之后
public Invoice createInvoiceSinceJava19(int orderId, int customerId, String language)
throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<Order> orderFuture =
scope.fork(() -> loadOrderFromOrderService(orderId));
Future<Customer> customerFuture =
scope.fork(() -> loadCustomerFromDatabase(customerId));
Future<String> invoiceTemplateFuture =
scope.fork(() -> loadInvoiceTemplateFromFile(language));
scope.join();
scope.throwIfFailed();
Order order = orderFuture.resultNow();
Customer customer = customerFuture.resultNow();
String invoiceTemplate = invoiceTemplateFuture.resultNow();
return new Invoice(order, customer, invoiceTemplate);
}
}
private String loadInvoiceTemplateFromFile(String language) {
return language;
}
private Customer loadCustomerFromDatabase(int customerId) {
return new Customer(customerId);
}
private Order loadOrderFromOrderService(int orderId) {
return new Order(orderId);
}
}
class Invoice {
// TODO
public Invoice(Order order, Customer customer, String invoiceTemplate) {
}
public static Invoice generate(Order order, Customer customer, String invoiceTemplate) {
return null;
}
}
class Order {
private int id;
public Order(int orderId) {
this.id = orderId;
}
}
class Customer {
private int id;
public Customer(int customerId) {
this.id = customerId;
}
}