09、Java 22 新特性:流收集器(JEP 461)增强数据聚合

兄弟们,鹏磊今天来聊聊 Java 22 里的流收集器(Stream Gatherers)这个新特性,这玩意儿是 JEP 461 引入的预览特性,专门用来增强流的数据聚合能力。说实话,用 Java 流(Stream)这么多年了,最头疼的就是有些复杂的数据转换需求,用现有的中间操作(比如 map、filter、flatMap)根本搞不定,要么得写一堆嵌套操作,要么就得用 collect 收集器,但收集器是终端操作,不能用在流中间。流收集器就是为了解决这个问题来的,它让咱们可以自定义中间操作,实现各种复杂的数据转换,而且还能保持状态,支持短路,甚至支持并行处理,这功能贼强大。

流收集器是 Java 22 的预览特性,它提供了一个 Gatherer 接口和 Gatherers 工具类,让咱们可以创建自定义的中间操作。核心思想是:把输入流中的元素通过一个收集器(Gatherer)转换成输出流中的元素,这个转换可以是一对一、一对多、多对一或者多对多的映射,而且可以是有状态的,可以缓冲输入,可以短路停止处理。这玩意儿特别适合需要复杂数据转换的场景,比如窗口化处理、增量聚合、并发映射、扫描累积啥的。

为什么需要流收集器

先说说传统流操作的限制。咱们平时用流,主要就是那些内置的中间操作:

// 传统流操作的限制
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

// 问题1:简单的一对一转换可以用 map
List<Integer> doubled = numbers.stream()
    .map(n -> n * 2)  // 一对一映射,这个没问题
    .toList();

// 问题2:过滤可以用 filter
List<Integer> evens = numbers.stream()
    .filter(n -> n % 2 == 0)  // 过滤,这个也没问题
    .toList();

// 问题3:一对多可以用 flatMap
List<Integer> expanded = numbers.stream()
    .flatMap(n -> Stream.of(n, n * 2))  // 一对多,这个也能搞定
    .toList();

// 问题4:但是多对一就麻烦了,比如把流分成固定大小的窗口
// 用现有操作很难实现,得写一堆代码
List<List<Integer>> windows = new ArrayList<>();  // 得手动维护状态
List<Integer> current = new ArrayList<>();  // 当前窗口
for (Integer n : numbers) {
    current.add(n);  // 添加到当前窗口
    if (current.size() == 3) {  // 窗口满了
        windows.add(new ArrayList<>(current));  // 保存窗口
        current.clear();  // 清空
    }
}
if (!current.isEmpty()) {  // 处理剩余元素
    windows.add(current);
}

// 问题5:增量聚合也很麻烦,比如扫描累积
// 用现有操作得用 reduce,但 reduce 是终端操作,不能用在中间
// 或者得用 collect,但 collect 也是终端操作

// 问题6:并发处理每个元素也很难实现
// 现有的 parallel() 是并行整个流,不能控制每个元素的并发度

流收集器就是为了解决这些问题。它让咱们可以自定义中间操作,实现各种复杂的数据转换,而且还能保持状态,支持短路,支持并行。

核心概念

流收集器的核心概念包括:

  1. Gatherer(收集器):一个接口,定义了如何把输入流转换成输出流
  2. Integrator(积分器):处理每个输入元素,更新状态,可选地向下游发送元素
  3. Initializer(初始化器):创建初始状态
  4. Combiner(组合器):合并两个状态,用于并行处理
  5. Finisher(完成器):在流结束时执行最终操作
  6. Gatherers(工具类):提供了一些内置的收集器,比如 windowFixed、windowSliding、fold、scan、mapConcurrent 等

基本用法

使用内置收集器:窗口化处理

最常用的场景就是把流分成固定大小的窗口,这个用 windowFixed 就能搞定:

import java.util.stream.*;  // 导入流相关类
import java.util.stream.Gatherers;  // 导入收集器工具类

// 窗口化处理:把流分成固定大小的窗口
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

// 使用 windowFixed 把流分成大小为 3 的窗口
List<List<Integer>> windows = numbers.stream()
    .gather(Gatherers.windowFixed(3))  // 收集成大小为 3 的窗口
    .toList();  // 转换成列表

// 结果:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
// 每个窗口都是一个新的列表,包含 3 个元素
System.out.println(windows);
// 输出:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

// 如果元素数量不是窗口大小的倍数,最后一个窗口会包含剩余元素
List<Integer> numbers2 = List.of(1, 2, 3, 4, 5);
List<List<Integer>> windows2 = numbers2.stream()
    .gather(Gatherers.windowFixed(3))  // 窗口大小为 3
    .toList();
// 结果:[[1, 2, 3], [4, 5]]
// 最后一个窗口只有 2 个元素

这玩意儿的好处是,不用手动维护状态,不用写循环,一行代码就搞定窗口化处理。而且它是中间操作,可以继续链式调用其他操作。

滑动窗口处理

除了固定窗口,还可以用滑动窗口,这个用 windowSliding 就能搞定:

import java.util.stream.*;
import java.util.stream.Gatherers;

// 滑动窗口:每个窗口都包含前一个窗口的大部分元素
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6);

// 使用 windowSliding 创建大小为 3 的滑动窗口
List<List<Integer>> slidingWindows = numbers.stream()
    .gather(Gatherers.windowSliding(3))  // 滑动窗口大小为 3
    .toList();

// 结果:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
// 每个窗口都包含 3 个元素,相邻窗口之间有重叠
System.out.println(slidingWindows);
// 输出:[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

// 滑动窗口特别适合需要计算移动平均、移动最大值的场景
List<Double> prices = List.of(10.0, 12.0, 11.0, 13.0, 14.0, 15.0);
List<Double> movingAverages = prices.stream()
    .gather(Gatherers.windowSliding(3))  // 3 日移动平均
    .map(window -> window.stream()  // 对每个窗口计算平均值
        .mapToDouble(Double::doubleValue)  // 转换成 double
        .average()  // 计算平均值
        .orElse(0.0))  // 如果没有值就返回 0
    .toList();
// 结果:[11.0, 12.0, 12.67, 13.33, 14.0]
// 每个值都是前 3 个价格的移动平均

滑动窗口的好处是,可以处理需要重叠窗口的场景,比如移动平均、移动最大值、时间序列分析啥的。

增量聚合:fold 收集器

如果需要增量聚合,可以用 fold 收集器,它会把所有元素聚合成一个结果:

import java.util.stream.*;
import java.util.stream.Gatherers;

// 增量聚合:把所有元素聚合成一个结果
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

// 使用 fold 把所有数字用分号连接
String result = numbers.stream()
    .gather(Gatherers.fold(
        () -> "",  // 初始值:空字符串
        (acc, element) -> {  // 累积函数:把新元素加到累积值后面
            if (acc.isEmpty()) {  // 如果是第一个元素
                return element.toString();  // 直接返回元素
            } else {  // 否则
                return acc + ";" + element;  // 用分号连接
            }
        }
    ))
    .findFirst()  // fold 只产生一个结果,所以用 findFirst
    .orElse("");  // 如果没有结果就返回空字符串

// 结果:"1;2;3;4;5;6;7;8;9"
System.out.println(result);

// fold 还可以用来计算总和、乘积、最大值等
Integer sum = numbers.stream()
    .gather(Gatherers.fold(
        () -> 0,  // 初始值:0
        (acc, element) -> acc + element  // 累积:累加
    ))
    .findFirst()
    .orElse(0);
// 结果:45

// 计算乘积
Integer product = numbers.stream()
    .gather(Gatherers.fold(
        () -> 1,  // 初始值:1
        (acc, element) -> acc * element  // 累积:相乘
    ))
    .findFirst()
    .orElse(1);
// 结果:362880

fold 的好处是,可以自定义聚合逻辑,而且它是中间操作,可以继续链式调用。不过要注意,fold 是终端操作,它只产生一个结果,所以通常用 findFirst() 来获取结果。

扫描累积:scan 收集器

如果需要扫描累积(每个元素都产生一个累积结果),可以用 scan 收集器:

import java.util.stream.*;
import java.util.stream.Gatherers;

// 扫描累积:每个元素都产生一个累积结果
List<Integer> numbers = List.of(1, 2, 3, 4, 5);

// 使用 scan 计算累积和
List<Integer> cumulativeSums = numbers.stream()
    .gather(Gatherers.scan(
        0,  // 初始值:0
        (acc, element) -> acc + element  // 累积函数:累加
    ))
    .toList();

// 结果:[1, 3, 6, 10, 15]
// 每个元素都是到当前位置的累积和
System.out.println(cumulativeSums);
// 输出:[1, 3, 6, 10, 15]

// scan 和 fold 的区别:
// - fold 只产生一个最终结果
// - scan 每个元素都产生一个累积结果
// 所以 scan 的输出流长度和输入流长度一样

// 计算累积乘积
List<Integer> cumulativeProducts = numbers.stream()
    .gather(Gatherers.scan(
        1,  // 初始值:1
        (acc, element) -> acc * element  // 累积:相乘
    ))
    .toList();
// 结果:[1, 2, 6, 24, 120]

scan 的好处是,可以产生中间结果,适合需要看到累积过程的场景,比如计算移动平均、累积收益啥的。

并发映射:mapConcurrent 收集器

如果需要并发处理每个元素,可以用 mapConcurrent 收集器:

import java.util.stream.*;
import java.util.stream.Gatherers;
import java.util.concurrent.CompletableFuture;

// 并发映射:并发处理每个元素,但保持顺序
List<String> urls = List.of(
    "https://example.com/1",
    "https://example.com/2",
    "https://example.com/3",
    "https://example.com/4",
    "https://example.com/5"
);

// 使用 mapConcurrent 并发获取 URL 内容
List<String> contents = urls.stream()
    .gather(Gatherers.mapConcurrent(
        3,  // 并发度:最多 3 个并发请求
        url -> {  // 映射函数:获取 URL 内容
            // 模拟网络请求
            try {
                Thread.sleep(100);  // 模拟网络延迟
                return "Content of " + url;  // 返回内容
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    ))
    .toList();

// 结果:按输入顺序返回,但处理是并发的
// 最多同时处理 3 个 URL,提高了性能
System.out.println(contents);

// mapConcurrent 的好处:
// 1. 可以控制并发度,避免创建太多线程
// 2. 保持输出顺序和输入顺序一致
// 3. 适合 IO 密集型操作,比如网络请求、文件读取

mapConcurrent 的好处是,可以控制并发度,避免创建太多线程,而且保持输出顺序,适合 IO 密集型操作。

自定义收集器

除了内置收集器,咱们还可以自定义收集器,实现更复杂的数据转换。自定义收集器需要实现 Gatherer 接口,或者用 Gatherer.of() 静态方法。

简单的自定义收集器:去重相邻元素

先来个简单的例子,实现一个去重相邻元素的收集器:

import java.util.stream.*;
import java.util.stream.Gatherer;

// 自定义收集器:去重相邻的相同元素
// 比如 [1, 1, 2, 2, 3, 1, 1] -> [1, 2, 3, 1]
Gatherer<Integer, ?, Integer> distinctAdjacent = Gatherer.of(
    () -> new Object() { Integer last = null; },  // 初始状态:保存上一个元素
    (state, element, downstream) -> {  // 积分器:处理每个元素
        if (state.last == null || !state.last.equals(element)) {  // 如果和上一个元素不同
            state.last = element;  // 更新状态
            return downstream.push(element);  // 发送到下游
        } else {  // 如果和上一个元素相同
            return true;  // 继续处理,但不发送
        }
    }
);

// 使用自定义收集器
List<Integer> numbers = List.of(1, 1, 2, 2, 3, 1, 1);
List<Integer> distinct = numbers.stream()
    .gather(distinctAdjacent)  // 使用自定义收集器
    .toList();

// 结果:[1, 2, 3, 1]
System.out.println(distinct);

这个例子展示了如何用 Gatherer.of() 创建简单的收集器。状态是一个对象,保存上一个元素;积分器检查当前元素是否和上一个元素相同,如果不同就发送到下游。

复杂自定义收集器:查找最大值并短路

再来个复杂点的例子,实现一个查找最大值的收集器,如果遇到超过限制的值就短路停止:

import java.util.stream.*;
import java.util.stream.Gatherer;

// 自定义收集器:查找最大值,如果遇到超过限制的值就短路
record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {
    // 实现 Gatherer 接口需要实现几个方法
    
    @Override
    public Integrator<List<Integer>, Integer, Integer> integrator() {
        return (state, element, downstream) -> {
            // 如果遇到超过限制的值,短路停止
            if (element >= limit) {
                return false;  // 返回 false 表示停止处理
            }
            
            // 更新状态:保存当前最大值
            if (state.isEmpty() || element > state.get(0)) {
                state.clear();  // 清空状态
                state.add(element);  // 添加新最大值
            }
            
            return true;  // 继续处理
        };
    }
    
    @Override
    public Supplier<List<Integer>> initializer() {
        return ArrayList::new;  // 初始状态:空列表
    }
    
    @Override
    public BinaryOperator<List<Integer>> combiner() {
        return (state1, state2) -> {
            // 合并两个状态:取最大值
            if (state1.isEmpty()) return state2;
            if (state2.isEmpty()) return state1;
            return state1.get(0) > state2.get(0) ? state1 : state2;
        };
    }
    
    @Override
    public Function<List<Integer>, Integer> finisher() {
        return state -> state.isEmpty() ? null : state.get(0);  // 返回最大值
    }
}

// 使用自定义收集器
List<Integer> numbers = List.of(3, 1, 4, 1, 5, 9, 2, 6);
Integer max = numbers.stream()
    .gather(new BiggestInt(8))  // 限制为 8,如果遇到 >= 8 的值就停止
    .findFirst()
    .orElse(null);

// 结果:5(因为遇到 9 时停止了,9 >= 8)
System.out.println(max);

这个例子展示了如何实现完整的 Gatherer 接口。需要实现四个方法:integrator()(积分器)、initializer()(初始化器)、combiner()(组合器)、finisher()(完成器)。积分器处理每个元素,可以返回 false 来短路停止;组合器用于并行处理时合并状态;完成器在流结束时执行最终操作。

实际应用场景

场景1:批量处理数据

批量处理是流收集器的典型应用场景,可以用 windowFixed 把数据分成批次:

import java.util.stream.*;
import java.util.stream.Gatherers;

// 批量处理:把大量数据分成小批次处理
List<String> allData = List.of(
    "data1", "data2", "data3", "data4", "data5",
    "data6", "data7", "data8", "data9", "data10"
);

// 分成每批 3 个元素
allData.stream()
    .gather(Gatherers.windowFixed(3))  // 每批 3 个
    .forEach(batch -> {  // 处理每批数据
        System.out.println("Processing batch: " + batch);
        // 这里可以调用批量 API,比如批量插入数据库
        // batchInsertToDatabase(batch);
    });

// 输出:
// Processing batch: [data1, data2, data3]
// Processing batch: [data4, data5, data6]
// Processing batch: [data7, data8, data9]
// Processing batch: [data10]

批量处理的好处是,可以减少 API 调用次数,提高性能,特别适合数据库批量插入、批量网络请求啥的。

场景2:时间序列分析

时间序列分析可以用滑动窗口来计算移动平均、移动最大值等:

import java.util.stream.*;
import java.util.stream.Gatherers;

// 时间序列分析:计算移动平均
record PricePoint(double price, long timestamp) {}

List<PricePoint> prices = List.of(
    new PricePoint(10.0, 1000),
    new PricePoint(12.0, 2000),
    new PricePoint(11.0, 3000),
    new PricePoint(13.0, 4000),
    new PricePoint(14.0, 5000),
    new PricePoint(15.0, 6000)
);

// 计算 3 点移动平均
List<Double> movingAverages = prices.stream()
    .gather(Gatherers.windowSliding(3))  // 3 点滑动窗口
    .map(window -> window.stream()  // 对每个窗口
        .mapToDouble(PricePoint::price)  // 提取价格
        .average()  // 计算平均
        .orElse(0.0))  // 默认值
    .toList();

// 结果:[11.0, 12.0, 12.67, 13.33, 14.0]
// 每个值都是前 3 个价格的移动平均
System.out.println(movingAverages);

// 计算移动最大值
List<Double> movingMax = prices.stream()
    .gather(Gatherers.windowSliding(3))  // 滑动窗口
    .map(window -> window.stream()  // 对每个窗口
        .mapToDouble(PricePoint::price)  // 提取价格
        .max()  // 计算最大
        .orElse(0.0))  // 默认值
    .toList();

// 结果:[12.0, 12.0, 13.0, 14.0, 15.0]
System.out.println(movingMax);

时间序列分析在金融、监控、数据分析等领域很常用,滑动窗口可以方便地计算各种统计指标。

场景3:并发 IO 操作

并发 IO 操作可以用 mapConcurrent 来提高性能:

import java.util.stream.*;
import java.util.stream.Gatherers;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;

// 并发 IO 操作:并发获取多个 URL 的内容
List<String> urls = List.of(
    "https://api.example.com/data1",
    "https://api.example.com/data2",
    "https://api.example.com/data3",
    "https://api.example.com/data4",
    "https://api.example.com/data5"
);

HttpClient client = HttpClient.newHttpClient();  // 创建 HTTP 客户端

// 并发获取 URL 内容,最多 3 个并发
List<String> responses = urls.stream()
    .gather(Gatherers.mapConcurrent(
        3,  // 并发度:最多 3 个并发请求
        url -> {  // 映射函数:发送 HTTP 请求
            try {
                HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(url))
                    .build();
                HttpResponse<String> response = client.send(
                    request,
                    HttpResponse.BodyHandlers.ofString()
                );
                return response.body();  // 返回响应体
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    ))
    .toList();

// 结果:按输入顺序返回响应,但请求是并发发送的
// 最多同时发送 3 个请求,提高了性能
System.out.println(responses);

并发 IO 操作特别适合网络请求、文件读取等 IO 密集型场景,可以显著提高性能。

注意事项和最佳实践

1. 预览特性需要启用

流收集器是预览特性,编译和运行都需要启用预览特性:

# 编译时启用预览特性
javac --enable-preview --release 22 Main.java

# 运行时启用预览特性
java --enable-preview Main

2. 状态管理要小心

自定义收集器时,状态管理要小心,特别是并行处理时:

// 错误示例:状态不是线程安全的
Gatherer<Integer, ?, Integer> badGatherer = Gatherer.of(
    () -> new ArrayList<>(),  // 状态:列表
    (state, element, downstream) -> {
        state.add(element);  // 直接修改状态,并行时可能有问题
        return true;
    }
);

// 正确示例:使用线程安全的状态
Gatherer<Integer, ?, Integer> goodGatherer = Gatherer.of(
    () -> new java.util.concurrent.ConcurrentLinkedQueue<>(),  // 线程安全的队列
    (state, element, downstream) -> {
        state.offer(element);  // 线程安全的操作
        return true;
    }
);

3. 短路要谨慎使用

短路可以提前停止处理,但要确保逻辑正确:

// 短路示例:遇到负数就停止
Gatherer<Integer, ?, Integer> shortCircuit = Gatherer.of(
    () -> new Object() { boolean stopped = false; },
    (state, element, downstream) -> {
        if (state.stopped) {  // 如果已经停止
            return false;  // 不再处理
        }
        if (element < 0) {  // 如果遇到负数
            state.stopped = true;  // 标记停止
            return false;  // 停止处理
        }
        return downstream.push(element);  // 发送元素
    }
);

4. 性能考虑

流收集器的性能取决于实现,内置收集器通常已经优化过了,但自定义收集器要注意性能:

// 性能考虑:
// 1. 避免在积分器中做重计算
// 2. 状态要尽量小,避免内存占用过大
// 3. 并行处理时要确保组合器效率高
// 4. 短路可以提前停止,提高性能

总结

流收集器是 Java 22 引入的强大特性,它让咱们可以自定义中间操作,实现各种复杂的数据转换。内置的收集器(windowFixed、windowSliding、fold、scan、mapConcurrent)已经覆盖了很多常见场景,而且咱们还可以自定义收集器来实现更复杂的需求。

这玩意儿特别适合需要复杂数据转换的场景,比如窗口化处理、增量聚合、并发映射、扫描累积啥的。而且它是中间操作,可以继续链式调用其他操作,用起来贼方便。

不过要注意,流收集器是预览特性,需要启用预览特性才能用,而且可能会在未来的版本中变化。但不管怎么说,这功能确实很强大,值得试试。

好了,今天就聊到这里,兄弟们有啥问题可以在评论区留言,鹏磊看到会回复的。下次咱们聊聊 Java 22 的其他新特性,比如在 super() 调用之前的语句(JEP 447),这玩意儿也挺有意思的。

本文章最后更新于 2025-11-27