Java的Stream API用起来确实方便,但是有时候内置的中间操作不够用,想自定义个中间操作挺麻烦的。现在JDK 23的JEP 473提供了自定义中间操作的能力,让流管道可以更灵活地转换数据,这下就方便多了。
鹏磊我之前做流处理的时候,遇到复杂的转换逻辑,经常得把流打断,用传统方式处理一部分,再用流处理一部分,搞得代码很别扭。现在有了自定义中间操作,可以直接在流管道里搞定,代码更简洁了。今天咱就聊聊这个JEP 473,看看怎么自定义中间操作。
JEP 473 的核心改进
JEP 473增强了Stream API,支持自定义中间操作。以前流的中间操作都是固定的,像map、filter、flatMap这些,现在可以自定义了。这让流API的扩展性更强,能处理更复杂的场景。
自定义中间操作的关键是Stream.Gatherer接口,它定义了一个收集器(gatherer),可以自定义流的转换逻辑。这个收集器可以在流管道中作为中间操作使用,对流的元素进行转换、过滤、分组等各种操作。
基础概念
Gatherer接口
Gatherer是自定义中间操作的核心接口:
interface Gatherer<T, A, R> {
Supplier<A> initializer(); // 初始化累加器
Integrator<A, T, R> integrator(); // 集成器,处理每个元素
BiConsumer<A, Downstream<? super R>> finisher(); // 完成器,处理最后的结果
}
这个接口定义了三个方法:
initializer: 初始化一个累加器,用来保存中间状态integrator: 处理每个元素,可以决定是否继续处理、跳过元素或者提前结束finisher: 处理完所有元素后的收尾工作
简单示例
自定义过滤操作
先看个简单的例子,自定义一个过滤操作:
import java.util.stream.*;
// 自定义一个只保留偶数的收集器
Gatherer<Integer, ?, Integer> evensOnly = Gatherer.of(
() -> new Object[0], // 不需要累加器,用空数组占位
(state, element, downstream) -> {
if (element % 2 == 0) { // 如果是偶数
downstream.push(element); // 传递到下游
}
return true; // 继续处理下一个元素
},
(state, downstream) -> {} // 没有收尾工作
);
// 使用自定义收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> evens = numbers.stream()
.gather(evensOnly) // 使用自定义收集器
.toList();
// 结果:[2, 4, 6, 8, 10]
自定义窗口操作
窗口操作是流处理里的常见需求,可以用自定义收集器实现:
// 自定义一个滑动窗口收集器
static <T> Gatherer<T, ?, List<T>> windowed(int size) {
return Gatherer.of(
() -> new ArrayList<T>(), // 用列表保存窗口元素
(window, element, downstream) -> {
window.add(element); // 添加元素到窗口
if (window.size() == size) { // 窗口满了
downstream.push(new ArrayList<>(window)); // 传递窗口
window.remove(0); // 移除最旧的元素,保持窗口大小
}
return true; // 继续处理
},
(window, downstream) -> {
// 处理剩余的元素(如果有)
if (!window.isEmpty()) {
downstream.push(new ArrayList<>(window));
}
}
);
}
// 使用窗口收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8);
List<List<Integer>> windows = numbers.stream()
.gather(windowed(3)) // 3个元素的滑动窗口
.toList();
// 结果:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]]
实际应用场景
分组收集
可以自定义分组逻辑,比内置的groupingBy更灵活:
// 自定义按条件分组的收集器
static <T> Gatherer<T, ?, Map<Boolean, List<T>>> partitionBy(
Predicate<T> predicate) {
return Gatherer.of(
() -> new HashMap<Boolean, List<T>>() {{
put(true, new ArrayList<>());
put(false, new ArrayList<>());
}},
(partition, element, downstream) -> {
boolean key = predicate.test(element);
partition.get(key).add(element);
return true;
},
(partition, downstream) -> {
downstream.push(partition);
}
);
}
// 使用分组收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Map<Boolean, List<Integer>> partition = numbers.stream()
.gather(partitionBy(n -> n % 2 == 0))
.findFirst()
.orElse(Map.of());
// 结果:{false=[1, 3, 5, 7, 9], true=[2, 4, 6, 8, 10]}
去重操作
可以实现更复杂的去重逻辑:
// 自定义去重收集器,保留第一个出现的元素
static <T> Gatherer<T, ?, T> distinctBy(Function<T, ?> keyExtractor) {
return Gatherer.of(
() -> new HashSet<>(), // 用Set保存已见过的key
(seen, element, downstream) -> {
Object key = keyExtractor.apply(element);
if (seen.add(key)) { // 如果是新的key
downstream.push(element); // 传递元素
}
return true; // 继续处理
},
(seen, downstream) -> {} // 没有收尾工作
);
}
// 使用去重收集器
List<Person> people = List.of(
new Person("Alice", 25),
new Person("Bob", 30),
new Person("Alice", 28) // 同名但不同年龄
);
List<Person> uniqueByName = people.stream()
.gather(distinctBy(Person::getName))
.toList();
// 结果:只保留第一个Alice
批处理
批处理是常见需求,可以用自定义收集器实现:
// 自定义批处理收集器
static <T> Gatherer<T, ?, List<T>> batched(int batchSize) {
return Gatherer.of(
() -> new ArrayList<T>(batchSize), // 用列表保存当前批次
(batch, element, downstream) -> {
batch.add(element); // 添加到批次
if (batch.size() == batchSize) { // 批次满了
downstream.push(new ArrayList<>(batch)); // 传递批次
batch.clear(); // 清空批次
}
return true; // 继续处理
},
(batch, downstream) -> {
// 处理最后的不完整批次
if (!batch.isEmpty()) {
downstream.push(new ArrayList<>(batch));
}
}
);
}
// 使用批处理收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<List<Integer>> batches = numbers.stream()
.gather(batched(3)) // 每3个一批
.toList();
// 结果:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
状态累积
可以实现需要累积状态的操作:
// 自定义累积收集器,计算累积和
static Gatherer<Integer, ?, Integer> cumulativeSum() {
return Gatherer.of(
() -> new int[]{0}, // 用数组保存累积和
(sum, element, downstream) -> {
sum[0] += element; // 累加
downstream.push(sum[0]); // 传递累积和
return true; // 继续处理
},
(sum, downstream) -> {} // 没有收尾工作
);
}
// 使用累积收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
List<Integer> cumulative = numbers.stream()
.gather(cumulativeSum())
.toList();
// 结果:[1, 3, 6, 10, 15]
高级用法
组合收集器
多个收集器可以组合使用:
// 先窗口化,再过滤
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8);
List<List<Integer>> filteredWindows = numbers.stream()
.gather(windowed(3)) // 窗口化
.filter(window -> window.stream().mapToInt(Integer::intValue).sum() > 10) // 过滤
.toList();
// 结果:只保留和大于10的窗口
提前终止
可以在处理过程中提前终止:
// 自定义查找前N个元素的收集器
static <T> Gatherer<T, ?, T> take(int n) {
return Gatherer.of(
() -> new int[]{0}, // 用数组保存计数
(count, element, downstream) -> {
if (count[0] < n) { // 还没取够
downstream.push(element);
count[0]++;
return true; // 继续处理
}
return false; // 提前终止
},
(count, downstream) -> {} // 没有收尾工作
);
}
// 使用take收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> first3 = numbers.stream()
.gather(take(3))
.toList();
// 结果:[1, 2, 3]
条件处理
可以根据条件决定是否处理元素:
// 自定义条件收集器,只在满足条件时处理
static <T> Gatherer<T, ?, T> when(Predicate<T> condition) {
return Gatherer.of(
() -> new Object[0], // 不需要状态
(state, element, downstream) -> {
if (condition.test(element)) { // 满足条件
downstream.push(element);
}
return true; // 继续处理(即使不满足条件也继续)
},
(state, downstream) -> {} // 没有收尾工作
);
}
最佳实践
保持收集器无状态
尽可能让收集器无状态,这样更容易理解和测试:
// 好的做法:无状态收集器
static <T> Gatherer<T, ?, T> filterEven() {
return Gatherer.of(
() -> null, // 不需要状态
(state, element, downstream) -> {
if (element % 2 == 0) {
downstream.push(element);
}
return true;
},
(state, downstream) -> {}
);
}
合理使用状态
需要状态的时候,用合适的类型保存:
// 使用专门的状态类,而不是数组
static class WindowState<T> {
private final List<T> window = new ArrayList<>();
private final int size;
WindowState(int size) {
this.size = size;
}
void add(T element) {
window.add(element);
}
boolean isFull() {
return window.size() == size;
}
List<T> getWindowAndShift() {
List<T> result = new ArrayList<>(window);
if (window.size() == size) {
window.remove(0);
}
return result;
}
}
处理边界情况
要考虑空流、单元素流等边界情况:
static <T> Gatherer<T, ?, List<T>> windowedSafe(int size) {
if (size <= 0) {
throw new IllegalArgumentException("窗口大小必须大于0");
}
return Gatherer.of(
() -> new WindowState<T>(size),
(state, element, downstream) -> {
state.add(element);
if (state.isFull()) {
downstream.push(state.getWindowAndShift());
}
return true;
},
(state, downstream) -> {
// 处理最后的不完整窗口(如果需要)
}
);
}
总结
JEP 473的流收集器增强让Stream API更灵活了,可以自定义中间操作来处理各种复杂场景。虽然API稍微复杂点,但是比之前打断流管道的方式好多了。
对于需要复杂流处理的场景,自定义收集器很有用。特别是窗口操作、批处理、状态累积这些需求,用自定义收集器实现起来很方便。虽然还是预览特性,但是值得尝试。
总的来说,流收集器增强让Java的函数式编程能力更强了。兄弟们如果遇到流处理的需求,可以试试自定义收集器,看看能不能简化代码。不过要注意,自定义收集器需要理解流的工作原理,刚开始可能有点难,多练练就好了。