06、JDK 23 新特性:流收集器增强(JEP 473):自定义中间操作与流管道高级转换技巧

Java的Stream API用起来确实方便,但是有时候内置的中间操作不够用,想自定义个中间操作挺麻烦的。现在JDK 23的JEP 473提供了自定义中间操作的能力,让流管道可以更灵活地转换数据,这下就方便多了。

鹏磊我之前做流处理的时候,遇到复杂的转换逻辑,经常得把流打断,用传统方式处理一部分,再用流处理一部分,搞得代码很别扭。现在有了自定义中间操作,可以直接在流管道里搞定,代码更简洁了。今天咱就聊聊这个JEP 473,看看怎么自定义中间操作。

JEP 473 的核心改进

JEP 473增强了Stream API,支持自定义中间操作。以前流的中间操作都是固定的,像mapfilterflatMap这些,现在可以自定义了。这让流API的扩展性更强,能处理更复杂的场景。

自定义中间操作的关键是Stream.Gatherer接口,它定义了一个收集器(gatherer),可以自定义流的转换逻辑。这个收集器可以在流管道中作为中间操作使用,对流的元素进行转换、过滤、分组等各种操作。

基础概念

Gatherer接口

Gatherer是自定义中间操作的核心接口:

interface Gatherer<T, A, R> {
    Supplier<A> initializer();      // 初始化累加器
    Integrator<A, T, R> integrator();  // 集成器,处理每个元素
    BiConsumer<A, Downstream<? super R>> finisher();  // 完成器,处理最后的结果
}

这个接口定义了三个方法:

  • initializer: 初始化一个累加器,用来保存中间状态
  • integrator: 处理每个元素,可以决定是否继续处理、跳过元素或者提前结束
  • finisher: 处理完所有元素后的收尾工作

简单示例

自定义过滤操作

先看个简单的例子,自定义一个过滤操作:

import java.util.stream.*;

// 自定义一个只保留偶数的收集器
Gatherer<Integer, ?, Integer> evensOnly = Gatherer.of(
    () -> new Object[0],  // 不需要累加器,用空数组占位
    (state, element, downstream) -> {
        if (element % 2 == 0) {  // 如果是偶数
            downstream.push(element);  // 传递到下游
        }
        return true;  // 继续处理下一个元素
    },
    (state, downstream) -> {}  // 没有收尾工作
);

// 使用自定义收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> evens = numbers.stream()
    .gather(evensOnly)  // 使用自定义收集器
    .toList();
// 结果:[2, 4, 6, 8, 10]

自定义窗口操作

窗口操作是流处理里的常见需求,可以用自定义收集器实现:

// 自定义一个滑动窗口收集器
static <T> Gatherer<T, ?, List<T>> windowed(int size) {
    return Gatherer.of(
        () -> new ArrayList<T>(),  // 用列表保存窗口元素
        (window, element, downstream) -> {
            window.add(element);  // 添加元素到窗口
            
            if (window.size() == size) {  // 窗口满了
                downstream.push(new ArrayList<>(window));  // 传递窗口
                window.remove(0);  // 移除最旧的元素,保持窗口大小
            }
            
            return true;  // 继续处理
        },
        (window, downstream) -> {
            // 处理剩余的元素(如果有)
            if (!window.isEmpty()) {
                downstream.push(new ArrayList<>(window));
            }
        }
    );
}

// 使用窗口收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8);
List<List<Integer>> windows = numbers.stream()
    .gather(windowed(3))  // 3个元素的滑动窗口
    .toList();
// 结果:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]]

实际应用场景

分组收集

可以自定义分组逻辑,比内置的groupingBy更灵活:

// 自定义按条件分组的收集器
static <T> Gatherer<T, ?, Map<Boolean, List<T>>> partitionBy(
    Predicate<T> predicate) {
    return Gatherer.of(
        () -> new HashMap<Boolean, List<T>>() {{
            put(true, new ArrayList<>());
            put(false, new ArrayList<>());
        }},
        (partition, element, downstream) -> {
            boolean key = predicate.test(element);
            partition.get(key).add(element);
            return true;
        },
        (partition, downstream) -> {
            downstream.push(partition);
        }
    );
}

// 使用分组收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Map<Boolean, List<Integer>> partition = numbers.stream()
    .gather(partitionBy(n -> n % 2 == 0))
    .findFirst()
    .orElse(Map.of());
// 结果:{false=[1, 3, 5, 7, 9], true=[2, 4, 6, 8, 10]}

去重操作

可以实现更复杂的去重逻辑:

// 自定义去重收集器,保留第一个出现的元素
static <T> Gatherer<T, ?, T> distinctBy(Function<T, ?> keyExtractor) {
    return Gatherer.of(
        () -> new HashSet<>(),  // 用Set保存已见过的key
        (seen, element, downstream) -> {
            Object key = keyExtractor.apply(element);
            if (seen.add(key)) {  // 如果是新的key
                downstream.push(element);  // 传递元素
            }
            return true;  // 继续处理
        },
        (seen, downstream) -> {}  // 没有收尾工作
    );
}

// 使用去重收集器
List<Person> people = List.of(
    new Person("Alice", 25),
    new Person("Bob", 30),
    new Person("Alice", 28)  // 同名但不同年龄
);
List<Person> uniqueByName = people.stream()
    .gather(distinctBy(Person::getName))
    .toList();
// 结果:只保留第一个Alice

批处理

批处理是常见需求,可以用自定义收集器实现:

// 自定义批处理收集器
static <T> Gatherer<T, ?, List<T>> batched(int batchSize) {
    return Gatherer.of(
        () -> new ArrayList<T>(batchSize),  // 用列表保存当前批次
        (batch, element, downstream) -> {
            batch.add(element);  // 添加到批次
            
            if (batch.size() == batchSize) {  // 批次满了
                downstream.push(new ArrayList<>(batch));  // 传递批次
                batch.clear();  // 清空批次
            }
            
            return true;  // 继续处理
        },
        (batch, downstream) -> {
            // 处理最后的不完整批次
            if (!batch.isEmpty()) {
                downstream.push(new ArrayList<>(batch));
            }
        }
    );
}

// 使用批处理收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<List<Integer>> batches = numbers.stream()
    .gather(batched(3))  // 每3个一批
    .toList();
// 结果:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

状态累积

可以实现需要累积状态的操作:

// 自定义累积收集器,计算累积和
static Gatherer<Integer, ?, Integer> cumulativeSum() {
    return Gatherer.of(
        () -> new int[]{0},  // 用数组保存累积和
        (sum, element, downstream) -> {
            sum[0] += element;  // 累加
            downstream.push(sum[0]);  // 传递累积和
            return true;  // 继续处理
        },
        (sum, downstream) -> {}  // 没有收尾工作
    );
}

// 使用累积收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
List<Integer> cumulative = numbers.stream()
    .gather(cumulativeSum())
    .toList();
// 结果:[1, 3, 6, 10, 15]

高级用法

组合收集器

多个收集器可以组合使用:

// 先窗口化,再过滤
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8);
List<List<Integer>> filteredWindows = numbers.stream()
    .gather(windowed(3))  // 窗口化
    .filter(window -> window.stream().mapToInt(Integer::intValue).sum() > 10)  // 过滤
    .toList();
// 结果:只保留和大于10的窗口

提前终止

可以在处理过程中提前终止:

// 自定义查找前N个元素的收集器
static <T> Gatherer<T, ?, T> take(int n) {
    return Gatherer.of(
        () -> new int[]{0},  // 用数组保存计数
        (count, element, downstream) -> {
            if (count[0] < n) {  // 还没取够
                downstream.push(element);
                count[0]++;
                return true;  // 继续处理
            }
            return false;  // 提前终止
        },
        (count, downstream) -> {}  // 没有收尾工作
    );
}

// 使用take收集器
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> first3 = numbers.stream()
    .gather(take(3))
    .toList();
// 结果:[1, 2, 3]

条件处理

可以根据条件决定是否处理元素:

// 自定义条件收集器,只在满足条件时处理
static <T> Gatherer<T, ?, T> when(Predicate<T> condition) {
    return Gatherer.of(
        () -> new Object[0],  // 不需要状态
        (state, element, downstream) -> {
            if (condition.test(element)) {  // 满足条件
                downstream.push(element);
            }
            return true;  // 继续处理(即使不满足条件也继续)
        },
        (state, downstream) -> {}  // 没有收尾工作
    );
}

最佳实践

保持收集器无状态

尽可能让收集器无状态,这样更容易理解和测试:

// 好的做法:无状态收集器
static <T> Gatherer<T, ?, T> filterEven() {
    return Gatherer.of(
        () -> null,  // 不需要状态
        (state, element, downstream) -> {
            if (element % 2 == 0) {
                downstream.push(element);
            }
            return true;
        },
        (state, downstream) -> {}
    );
}

合理使用状态

需要状态的时候,用合适的类型保存:

// 使用专门的状态类,而不是数组
static class WindowState<T> {
    private final List<T> window = new ArrayList<>();
    private final int size;
    
    WindowState(int size) {
        this.size = size;
    }
    
    void add(T element) {
        window.add(element);
    }
    
    boolean isFull() {
        return window.size() == size;
    }
    
    List<T> getWindowAndShift() {
        List<T> result = new ArrayList<>(window);
        if (window.size() == size) {
            window.remove(0);
        }
        return result;
    }
}

处理边界情况

要考虑空流、单元素流等边界情况:

static <T> Gatherer<T, ?, List<T>> windowedSafe(int size) {
    if (size <= 0) {
        throw new IllegalArgumentException("窗口大小必须大于0");
    }
    
    return Gatherer.of(
        () -> new WindowState<T>(size),
        (state, element, downstream) -> {
            state.add(element);
            if (state.isFull()) {
                downstream.push(state.getWindowAndShift());
            }
            return true;
        },
        (state, downstream) -> {
            // 处理最后的不完整窗口(如果需要)
        }
    );
}

总结

JEP 473的流收集器增强让Stream API更灵活了,可以自定义中间操作来处理各种复杂场景。虽然API稍微复杂点,但是比之前打断流管道的方式好多了。

对于需要复杂流处理的场景,自定义收集器很有用。特别是窗口操作、批处理、状态累积这些需求,用自定义收集器实现起来很方便。虽然还是预览特性,但是值得尝试。

总的来说,流收集器增强让Java的函数式编程能力更强了。兄弟们如果遇到流处理的需求,可以试试自定义收集器,看看能不能简化代码。不过要注意,自定义收集器需要理解流的工作原理,刚开始可能有点难,多练练就好了。

本文章最后更新于 2025-11-28