一、窗口函数的分类
大多数Flink应用都是要划分窗口的,如果不划分窗口,那就得计算流中所有的数据的结果(很少有这样的需求)。
Flink如何划分窗口在之前的博客Flink 实战(7)时间特性、窗口、Watermark代码实践已经讲过了。
本篇的重点是讲窗口函数,即数据划分窗口后可以调用的处理函数。
1. 全量和增量的区别
从上图中看出,窗口函数主要分全量函数和增量函数这2大类。
- 全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。
- 增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。
2. apply和process的区别
apply和process的区别
- apply和process都是处理全量计算,但工作中正常用process。
- process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。
3. reduce和aggregate的区别
reduce和aggregate的区别
- reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个
<T>
- maxBy、minBy、sum这3个底层都是由reduce实现的
- aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有
<T, ACC, R>
二、AggregateFunction和ProcessWindowFunction结合使用
在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
下面通过案例把增量计算和全量计算讲一下。
1. 需求背景
- 有一些城市的气温数据,如下面所示
- 想每隔30秒,获得该城市的这30秒内的温度最大值、最小值、平均值
江苏,苏州,1,20.5,2021-01-29 16:00:00
江苏,盐城,1,24.5,2021-01-29 16:00:20
江苏,盐城,1,28.5,2021-01-29 16:00:30
江苏,盐城,1,23.5,2021-01-29 16:00:40
江苏,苏州,1,21.5,2021-01-29 16:00:10
江苏,苏州,1,22.5,2021-01-29 16:00:20
江苏,苏州,1,24.5,2021-01-29 16:00:30
江苏,苏州,1,23.5,2021-01-29 16:00:40
江苏,苏州,1,22.5,2021-01-29 16:00:50
//太多了,省略
2. 分析
- 求温度的最大值、最小值、平均值,这样明显是聚合计算,适合用AggregateFunction
- 每隔30秒获取数据,就是30秒后窗口关闭时,获取窗口的信息(开始结束时间),并加上AggregateFunction的结果,这个适合用ProcessWindowFunction
- 所以得用下面AggregateFunction和ProcessWindowFunction结合的aggregate函数
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction) {
}
3. 程序主体
/**
* 该程序测试window的reduce功能
* reduce是每输入一个数据,触发一次计算
* 具体实现求得一个window数据中的max、min、sum、count、avg
*/
public class Test04_AggregateAndProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.readTextFile(BaseConstant.TEMP_RECORD);
SingleOutputStreamOperator<TempRecord> dataStream = source
.flatMap(new TempRecordUtils.BeanFlatMap())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<TempRecord>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<TempRecord>() {
@Override
public long extractTimestamp(TempRecord element, long recordTimestamp) {
return element.getTimeEpochMilli();
}
})
);
SingleOutputStreamOperator<TempRecordAggsResult> result = dataStream
.keyBy(TempRecord::getCity)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.aggregate(new TempRecordUtils.MyAggregateFunction(),//增量计算
new TempRecordUtils.MyProcessWindow());//全量计算
result.print();
env.execute();
}
}
4. AggregateFunction
/**
* 聚合函数,每来一个数据,就会执行聚合操作
*/
public static class MyAggregateFunction implements AggregateFunction<
TempRecord,
TempRecordAggsResult,
TempRecordAggsResult> {
@Override
public TempRecordAggsResult createAccumulator() {
return TempRecordAggsResult.getInitResult();
}
/**
* 每进入一个数据就会执行一次
* @param value 当前进入的数据
* @param accumulator 之前计算好的中间结果
* @return
*/
@Override
public TempRecordAggsResult add(TempRecord value, TempRecordAggsResult accumulator) {
accumulator.setKey(value.getProvince() + "," + value.getCity());
accumulator.setMax(value.getTemp() > accumulator.getMax() ? value.getTemp() : accumulator.getMax());
accumulator.setMin(value.getTemp() < accumulator.getMin() ? value.getTemp() : accumulator.getMin());
accumulator.setSum(value.getTemp() + accumulator.getSum());
accumulator.setCounts(accumulator.getCounts() + 1);
accumulator.setAvg(accumulator.getSum() / accumulator.getCounts());
return accumulator;
}
/*
当window的结束时间到达时,触发这个方法,返回结果
*/
@Override
public TempRecordAggsResult getResult(TempRecordAggsResult accumulator) {
//System.out.println("getResult :" + accumulator.toString());
return accumulator;
}
/**
* 在session窗口才会用到merge,时间窗口其实用不得
* @param a
* @param b
* @return
*/
@Override
public TempRecordAggsResult merge(TempRecordAggsResult a, TempRecordAggsResult b) {
a.setMax(a.getMax() > b.getMax() ? a.getMax() : b.getMax());
a.setMin(a.getMin() < b.getMin() ? a.getMin() : b.getMin());
a.setSum(a.getSum() + b.getSum());
a.setCounts(a.getCounts() + b.getCounts());
a.setAvg(a.getSum() / a.getCounts());
return a;
}
}
5. ProcessWindowFunction
上面有个问题,其实平均值可以在聚合函数不计算,最后在ProcessWindowFunction里用总和除以个数来求
public static class MyProcessWindow extends ProcessWindowFunction<
TempRecordAggsResult,
TempRecordAggsResult,
String,
TimeWindow> {
@Override
public void process(String s, Context context, Iterable<TempRecordAggsResult> elements, Collector<TempRecordAggsResult> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
if (elements.iterator().hasNext()) {
TempRecordAggsResult result = elements.iterator().next();
System.out.println("result:" + result.toString());
result.setBeginTime(
LocalDateTime.ofInstant(
Instant.ofEpochMilli(windowStartTs), ZoneId.systemDefault()
)
);
result.setEndTime(
LocalDateTime.ofInstant(
Instant.ofEpochMilli(windowEndTs), ZoneId.systemDefault()
)
);
out.collect(result);
}
}
}
6 TempRecordAggsResult
package study.common.entity;
import study.common.constant.FormatterConstant;
import java.time.LocalDateTime;
public class TempRecordAggsResult {
private String key;
private LocalDateTime beginTime;
private LocalDateTime endTime;
private Double max;
private Double min;
private Double sum;
private Double avg;
private Integer counts;
public static TempRecordAggsResult getInitResult() {
TempRecordAggsResult result = new TempRecordAggsResult();
result.setBeginTime(LocalDateTime.now());
result.setEndTime(LocalDateTime.now());
result.setMax(Double.MIN_VALUE);
result.setMin(Double.MAX_VALUE);
result.setSum(0.0);
result.setAvg(0.0);
result.setCounts(0);
return result;
}
public TempRecordAggsResult() {
}
public TempRecordAggsResult(String key, LocalDateTime beginTime, LocalDateTime endTime, Double max, Double min, Double sum, Double avg, Integer counts) {
this.key = key;
this.beginTime = beginTime;
this.endTime = endTime;
this.max = max;
this.min = min;
this.sum = sum;
this.avg = avg;
this.counts = counts;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public LocalDateTime getBeginTime() {
return beginTime;
}
public void setBeginTime(LocalDateTime beginTime) {
this.beginTime = beginTime;
}
public LocalDateTime getEndTime() {
return endTime;
}
public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
}
public Double getMax() {
return max;
}
public void setMax(Double max) {
this.max = max;
}
public Double getMin() {
return min;
}
public void setMin(Double min) {
this.min = min;
}
public Double getSum() {
return sum;
}
public void setSum(Double sum) {
this.sum = sum;
}
public Double getAvg() {
return avg;
}
public void setAvg(Double avg) {
this.avg = avg;
}
public Integer getCounts() {
return counts;
}
public void setCounts(Integer counts) {
this.counts = counts;
}
@Override
public String toString() {
return "TempRecordAggsResult{" +
"key='" + key + '\'' +
", windowTime=[" + beginTime.format(FormatterConstant.commonDtf) +
", " + endTime.format(FormatterConstant.commonDtf) + ")" +
", max=" + max +
", min=" + min +
", sum=" + sum +
", avg=" + avg +
", counts=" + counts +
'}';
}
}