兄弟们,鹏磊今天来聊聊 Java 22 里的流收集器(Stream Gatherers)这个新特性,这玩意儿是 JEP 461 引入的预览特性,专门用来增强流的数据聚合能力。说实话,用 Java 流(Stream)这么多年了,最头疼的就是有些复杂的数据转换需求,用现有的中间操作(比如 map、filter、flatMap)根本搞不定,要么得写一堆嵌套操作,要么就得用 collect 收集器,但收集器是终端操作,不能用在流中间。流收集器就是为了解决这个问题来的,它让咱们可以自定义中间操作,实现各种复杂的数据转换,而且还能保持状态,支持短路,甚至支持并行处理,这功能贼强大。
流收集器是 Java 22 的预览特性,它提供了一个 Gatherer 接口和 Gatherers 工具类,让咱们可以创建自定义的中间操作。核心思想是:把输入流中的元素通过一个收集器(Gatherer)转换成输出流中的元素,这个转换可以是一对一、一对多、多对一或者多对多的映射,而且可以是有状态的,可以缓冲输入,可以短路停止处理。这玩意儿特别适合需要复杂数据转换的场景,比如窗口化处理、增量聚合、并发映射、扫描累积啥的。
为什么需要流收集器
先说说传统流操作的限制。咱们平时用流,主要就是那些内置的中间操作:
// 传统流操作的限制
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// 问题1:简单的一对一转换可以用 map
List<Integer> doubled = numbers.stream()
.map(n -> n * 2) // 一对一映射,这个没问题
.toList();
// 问题2:过滤可以用 filter
List<Integer> evens = numbers.stream()
.filter(n -> n % 2 == 0) // 过滤,这个也没问题
.toList();
// 问题3:一对多可以用 flatMap
List<Integer> expanded = numbers.stream()
.flatMap(n -> Stream.of(n, n * 2)) // 一对多,这个也能搞定
.toList();
// 问题4:但是多对一就麻烦了,比如把流分成固定大小的窗口
// 用现有操作很难实现,得写一堆代码
List<List<Integer>> windows = new ArrayList<>(); // 得手动维护状态
List<Integer> current = new ArrayList<>(); // 当前窗口
for (Integer n : numbers) {
current.add(n); // 添加到当前窗口
if (current.size() == 3) { // 窗口满了
windows.add(new ArrayList<>(current)); // 保存窗口
current.clear(); // 清空
}
}
if (!current.isEmpty()) { // 处理剩余元素
windows.add(current);
}
// 问题5:增量聚合也很麻烦,比如扫描累积
// 用现有操作得用 reduce,但 reduce 是终端操作,不能用在中间
// 或者得用 collect,但 collect 也是终端操作
// 问题6:并发处理每个元素也很难实现
// 现有的 parallel() 是并行整个流,不能控制每个元素的并发度
流收集器就是为了解决这些问题。它让咱们可以自定义中间操作,实现各种复杂的数据转换,而且还能保持状态,支持短路,支持并行。
核心概念
流收集器的核心概念包括:
- Gatherer(收集器):一个接口,定义了如何把输入流转换成输出流
- Integrator(积分器):处理每个输入元素,更新状态,可选地向下游发送元素
- Initializer(初始化器):创建初始状态
- Combiner(组合器):合并两个状态,用于并行处理
- Finisher(完成器):在流结束时执行最终操作
- Gatherers(工具类):提供了一些内置的收集器,比如 windowFixed、windowSliding、fold、scan、mapConcurrent 等
基本用法
使用内置收集器:窗口化处理
最常用的场景就是把流分成固定大小的窗口,这个用 windowFixed 就能搞定:
import java.util.stream.*; // 导入流相关类
import java.util.stream.Gatherers; // 导入收集器工具类
// 窗口化处理:把流分成固定大小的窗口
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// 使用 windowFixed 把流分成大小为 3 的窗口
List<List<Integer>> windows = numbers.stream()
.gather(Gatherers.windowFixed(3)) // 收集成大小为 3 的窗口
.toList(); // 转换成列表
// 结果:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
// 每个窗口都是一个新的列表,包含 3 个元素
System.out.println(windows);
// 输出:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
// 如果元素数量不是窗口大小的倍数,最后一个窗口会包含剩余元素
List<Integer> numbers2 = List.of(1, 2, 3, 4, 5);
List<List<Integer>> windows2 = numbers2.stream()
.gather(Gatherers.windowFixed(3)) // 窗口大小为 3
.toList();
// 结果:[[1, 2, 3], [4, 5]]
// 最后一个窗口只有 2 个元素
这玩意儿的好处是,不用手动维护状态,不用写循环,一行代码就搞定窗口化处理。而且它是中间操作,可以继续链式调用其他操作。
滑动窗口处理
除了固定窗口,还可以用滑动窗口,这个用 windowSliding 就能搞定:
import java.util.stream.*;
import java.util.stream.Gatherers;
// 滑动窗口:每个窗口都包含前一个窗口的大部分元素
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6);
// 使用 windowSliding 创建大小为 3 的滑动窗口
List<List<Integer>> slidingWindows = numbers.stream()
.gather(Gatherers.windowSliding(3)) // 滑动窗口大小为 3
.toList();
// 结果:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
// 每个窗口都包含 3 个元素,相邻窗口之间有重叠
System.out.println(slidingWindows);
// 输出:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
// 滑动窗口特别适合需要计算移动平均、移动最大值的场景
List<Double> prices = List.of(10.0, 12.0, 11.0, 13.0, 14.0, 15.0);
List<Double> movingAverages = prices.stream()
.gather(Gatherers.windowSliding(3)) // 3 日移动平均
.map(window -> window.stream() // 对每个窗口计算平均值
.mapToDouble(Double::doubleValue) // 转换成 double
.average() // 计算平均值
.orElse(0.0)) // 如果没有值就返回 0
.toList();
// 结果:[11.0, 12.0, 12.67, 13.33, 14.0]
// 每个值都是前 3 个价格的移动平均
滑动窗口的好处是,可以处理需要重叠窗口的场景,比如移动平均、移动最大值、时间序列分析啥的。
增量聚合:fold 收集器
如果需要增量聚合,可以用 fold 收集器,它会把所有元素聚合成一个结果:
import java.util.stream.*;
import java.util.stream.Gatherers;
// 增量聚合:把所有元素聚合成一个结果
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// 使用 fold 把所有数字用分号连接
String result = numbers.stream()
.gather(Gatherers.fold(
() -> "", // 初始值:空字符串
(acc, element) -> { // 累积函数:把新元素加到累积值后面
if (acc.isEmpty()) { // 如果是第一个元素
return element.toString(); // 直接返回元素
} else { // 否则
return acc + ";" + element; // 用分号连接
}
}
))
.findFirst() // fold 只产生一个结果,所以用 findFirst
.orElse(""); // 如果没有结果就返回空字符串
// 结果:"1;2;3;4;5;6;7;8;9"
System.out.println(result);
// fold 还可以用来计算总和、乘积、最大值等
Integer sum = numbers.stream()
.gather(Gatherers.fold(
() -> 0, // 初始值:0
(acc, element) -> acc + element // 累积:累加
))
.findFirst()
.orElse(0);
// 结果:45
// 计算乘积
Integer product = numbers.stream()
.gather(Gatherers.fold(
() -> 1, // 初始值:1
(acc, element) -> acc * element // 累积:相乘
))
.findFirst()
.orElse(1);
// 结果:362880
fold 的好处是,可以自定义聚合逻辑,而且它是中间操作,可以继续链式调用。不过要注意,fold 是终端操作,它只产生一个结果,所以通常用 findFirst() 来获取结果。
扫描累积:scan 收集器
如果需要扫描累积(每个元素都产生一个累积结果),可以用 scan 收集器:
import java.util.stream.*;
import java.util.stream.Gatherers;
// 扫描累积:每个元素都产生一个累积结果
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
// 使用 scan 计算累积和
List<Integer> cumulativeSums = numbers.stream()
.gather(Gatherers.scan(
0, // 初始值:0
(acc, element) -> acc + element // 累积函数:累加
))
.toList();
// 结果:[1, 3, 6, 10, 15]
// 每个元素都是到当前位置的累积和
System.out.println(cumulativeSums);
// 输出:[1, 3, 6, 10, 15]
// scan 和 fold 的区别:
// - fold 只产生一个最终结果
// - scan 每个元素都产生一个累积结果
// 所以 scan 的输出流长度和输入流长度一样
// 计算累积乘积
List<Integer> cumulativeProducts = numbers.stream()
.gather(Gatherers.scan(
1, // 初始值:1
(acc, element) -> acc * element // 累积:相乘
))
.toList();
// 结果:[1, 2, 6, 24, 120]
scan 的好处是,可以产生中间结果,适合需要看到累积过程的场景,比如计算移动平均、累积收益啥的。
并发映射:mapConcurrent 收集器
如果需要并发处理每个元素,可以用 mapConcurrent 收集器:
import java.util.stream.*;
import java.util.stream.Gatherers;
import java.util.concurrent.CompletableFuture;
// 并发映射:并发处理每个元素,但保持顺序
List<String> urls = List.of(
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
"https://example.com/4",
"https://example.com/5"
);
// 使用 mapConcurrent 并发获取 URL 内容
List<String> contents = urls.stream()
.gather(Gatherers.mapConcurrent(
3, // 并发度:最多 3 个并发请求
url -> { // 映射函数:获取 URL 内容
// 模拟网络请求
try {
Thread.sleep(100); // 模拟网络延迟
return "Content of " + url; // 返回内容
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
))
.toList();
// 结果:按输入顺序返回,但处理是并发的
// 最多同时处理 3 个 URL,提高了性能
System.out.println(contents);
// mapConcurrent 的好处:
// 1. 可以控制并发度,避免创建太多线程
// 2. 保持输出顺序和输入顺序一致
// 3. 适合 IO 密集型操作,比如网络请求、文件读取
mapConcurrent 的好处是,可以控制并发度,避免创建太多线程,而且保持输出顺序,适合 IO 密集型操作。
自定义收集器
除了内置收集器,咱们还可以自定义收集器,实现更复杂的数据转换。自定义收集器需要实现 Gatherer 接口,或者用 Gatherer.of() 静态方法。
简单的自定义收集器:去重相邻元素
先来个简单的例子,实现一个去重相邻元素的收集器:
import java.util.stream.*;
import java.util.stream.Gatherer;
// 自定义收集器:去重相邻的相同元素
// 比如 [1, 1, 2, 2, 3, 1, 1] -> [1, 2, 3, 1]
Gatherer<Integer, ?, Integer> distinctAdjacent = Gatherer.of(
() -> new Object() { Integer last = null; }, // 初始状态:保存上一个元素
(state, element, downstream) -> { // 积分器:处理每个元素
if (state.last == null || !state.last.equals(element)) { // 如果和上一个元素不同
state.last = element; // 更新状态
return downstream.push(element); // 发送到下游
} else { // 如果和上一个元素相同
return true; // 继续处理,但不发送
}
}
);
// 使用自定义收集器
List<Integer> numbers = List.of(1, 1, 2, 2, 3, 1, 1);
List<Integer> distinct = numbers.stream()
.gather(distinctAdjacent) // 使用自定义收集器
.toList();
// 结果:[1, 2, 3, 1]
System.out.println(distinct);
这个例子展示了如何用 Gatherer.of() 创建简单的收集器。状态是一个对象,保存上一个元素;积分器检查当前元素是否和上一个元素相同,如果不同就发送到下游。
复杂自定义收集器:查找最大值并短路
再来个复杂点的例子,实现一个查找最大值的收集器,如果遇到超过限制的值就短路停止:
import java.util.stream.*;
import java.util.stream.Gatherer;
// 自定义收集器:查找最大值,如果遇到超过限制的值就短路
record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {
// 实现 Gatherer 接口需要实现几个方法
@Override
public Integrator<List<Integer>, Integer, Integer> integrator() {
return (state, element, downstream) -> {
// 如果遇到超过限制的值,短路停止
if (element >= limit) {
return false; // 返回 false 表示停止处理
}
// 更新状态:保存当前最大值
if (state.isEmpty() || element > state.get(0)) {
state.clear(); // 清空状态
state.add(element); // 添加新最大值
}
return true; // 继续处理
};
}
@Override
public Supplier<List<Integer>> initializer() {
return ArrayList::new; // 初始状态:空列表
}
@Override
public BinaryOperator<List<Integer>> combiner() {
return (state1, state2) -> {
// 合并两个状态:取最大值
if (state1.isEmpty()) return state2;
if (state2.isEmpty()) return state1;
return state1.get(0) > state2.get(0) ? state1 : state2;
};
}
@Override
public Function<List<Integer>, Integer> finisher() {
return state -> state.isEmpty() ? null : state.get(0); // 返回最大值
}
}
// 使用自定义收集器
List<Integer> numbers = List.of(3, 1, 4, 1, 5, 9, 2, 6);
Integer max = numbers.stream()
.gather(new BiggestInt(8)) // 限制为 8,如果遇到 >= 8 的值就停止
.findFirst()
.orElse(null);
// 结果:5(因为遇到 9 时停止了,9 >= 8)
System.out.println(max);
这个例子展示了如何实现完整的 Gatherer 接口。需要实现四个方法:integrator()(积分器)、initializer()(初始化器)、combiner()(组合器)、finisher()(完成器)。积分器处理每个元素,可以返回 false 来短路停止;组合器用于并行处理时合并状态;完成器在流结束时执行最终操作。
实际应用场景
场景1:批量处理数据
批量处理是流收集器的典型应用场景,可以用 windowFixed 把数据分成批次:
import java.util.stream.*;
import java.util.stream.Gatherers;
// 批量处理:把大量数据分成小批次处理
List<String> allData = List.of(
"data1", "data2", "data3", "data4", "data5",
"data6", "data7", "data8", "data9", "data10"
);
// 分成每批 3 个元素
allData.stream()
.gather(Gatherers.windowFixed(3)) // 每批 3 个
.forEach(batch -> { // 处理每批数据
System.out.println("Processing batch: " + batch);
// 这里可以调用批量 API,比如批量插入数据库
// batchInsertToDatabase(batch);
});
// 输出:
// Processing batch: [data1, data2, data3]
// Processing batch: [data4, data5, data6]
// Processing batch: [data7, data8, data9]
// Processing batch: [data10]
批量处理的好处是,可以减少 API 调用次数,提高性能,特别适合数据库批量插入、批量网络请求啥的。
场景2:时间序列分析
时间序列分析可以用滑动窗口来计算移动平均、移动最大值等:
import java.util.stream.*;
import java.util.stream.Gatherers;
// 时间序列分析:计算移动平均
record PricePoint(double price, long timestamp) {}
List<PricePoint> prices = List.of(
new PricePoint(10.0, 1000),
new PricePoint(12.0, 2000),
new PricePoint(11.0, 3000),
new PricePoint(13.0, 4000),
new PricePoint(14.0, 5000),
new PricePoint(15.0, 6000)
);
// 计算 3 点移动平均
List<Double> movingAverages = prices.stream()
.gather(Gatherers.windowSliding(3)) // 3 点滑动窗口
.map(window -> window.stream() // 对每个窗口
.mapToDouble(PricePoint::price) // 提取价格
.average() // 计算平均
.orElse(0.0)) // 默认值
.toList();
// 结果:[11.0, 12.0, 12.67, 13.33, 14.0]
// 每个值都是前 3 个价格的移动平均
System.out.println(movingAverages);
// 计算移动最大值
List<Double> movingMax = prices.stream()
.gather(Gatherers.windowSliding(3)) // 滑动窗口
.map(window -> window.stream() // 对每个窗口
.mapToDouble(PricePoint::price) // 提取价格
.max() // 计算最大
.orElse(0.0)) // 默认值
.toList();
// 结果:[12.0, 12.0, 13.0, 14.0, 15.0]
System.out.println(movingMax);
时间序列分析在金融、监控、数据分析等领域很常用,滑动窗口可以方便地计算各种统计指标。
场景3:并发 IO 操作
并发 IO 操作可以用 mapConcurrent 来提高性能:
import java.util.stream.*;
import java.util.stream.Gatherers;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
// 并发 IO 操作:并发获取多个 URL 的内容
List<String> urls = List.of(
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
"https://api.example.com/data4",
"https://api.example.com/data5"
);
HttpClient client = HttpClient.newHttpClient(); // 创建 HTTP 客户端
// 并发获取 URL 内容,最多 3 个并发
List<String> responses = urls.stream()
.gather(Gatherers.mapConcurrent(
3, // 并发度:最多 3 个并发请求
url -> { // 映射函数:发送 HTTP 请求
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.build();
HttpResponse<String> response = client.send(
request,
HttpResponse.BodyHandlers.ofString()
);
return response.body(); // 返回响应体
} catch (Exception e) {
throw new RuntimeException(e);
}
}
))
.toList();
// 结果:按输入顺序返回响应,但请求是并发发送的
// 最多同时发送 3 个请求,提高了性能
System.out.println(responses);
并发 IO 操作特别适合网络请求、文件读取等 IO 密集型场景,可以显著提高性能。
注意事项和最佳实践
1. 预览特性需要启用
流收集器是预览特性,编译和运行都需要启用预览特性:
# 编译时启用预览特性
javac --enable-preview --release 22 Main.java
# 运行时启用预览特性
java --enable-preview Main
2. 状态管理要小心
自定义收集器时,状态管理要小心,特别是并行处理时:
// 错误示例:状态不是线程安全的
Gatherer<Integer, ?, Integer> badGatherer = Gatherer.of(
() -> new ArrayList<>(), // 状态:列表
(state, element, downstream) -> {
state.add(element); // 直接修改状态,并行时可能有问题
return true;
}
);
// 正确示例:使用线程安全的状态
Gatherer<Integer, ?, Integer> goodGatherer = Gatherer.of(
() -> new java.util.concurrent.ConcurrentLinkedQueue<>(), // 线程安全的队列
(state, element, downstream) -> {
state.offer(element); // 线程安全的操作
return true;
}
);
3. 短路要谨慎使用
短路可以提前停止处理,但要确保逻辑正确:
// 短路示例:遇到负数就停止
Gatherer<Integer, ?, Integer> shortCircuit = Gatherer.of(
() -> new Object() { boolean stopped = false; },
(state, element, downstream) -> {
if (state.stopped) { // 如果已经停止
return false; // 不再处理
}
if (element < 0) { // 如果遇到负数
state.stopped = true; // 标记停止
return false; // 停止处理
}
return downstream.push(element); // 发送元素
}
);
4. 性能考虑
流收集器的性能取决于实现,内置收集器通常已经优化过了,但自定义收集器要注意性能:
// 性能考虑:
// 1. 避免在积分器中做重计算
// 2. 状态要尽量小,避免内存占用过大
// 3. 并行处理时要确保组合器效率高
// 4. 短路可以提前停止,提高性能
总结
流收集器是 Java 22 引入的强大特性,它让咱们可以自定义中间操作,实现各种复杂的数据转换。内置的收集器(windowFixed、windowSliding、fold、scan、mapConcurrent)已经覆盖了很多常见场景,而且咱们还可以自定义收集器来实现更复杂的需求。
这玩意儿特别适合需要复杂数据转换的场景,比如窗口化处理、增量聚合、并发映射、扫描累积啥的。而且它是中间操作,可以继续链式调用其他操作,用起来贼方便。
不过要注意,流收集器是预览特性,需要启用预览特性才能用,而且可能会在未来的版本中变化。但不管怎么说,这功能确实很强大,值得试试。
好了,今天就聊到这里,兄弟们有啥问题可以在评论区留言,鹏磊看到会回复的。下次咱们聊聊 Java 22 的其他新特性,比如在 super() 调用之前的语句(JEP 447),这玩意儿也挺有意思的。