1. 需求描述
nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
2. 代码演示
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowDemo01_TimeWindow {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
//3.Transformation
//将9,3转为CartInfo(9,3)
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
//分组
//KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
// * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
//timeWindow(Time size窗口大小, Time slide滑动间隔)
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
.keyBy(CartInfo::getSensorId)
//.timeWindow(Time.seconds(5))//当size==slide,可以只写一个
//.timeWindow(Time.seconds(5), Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("count");
// * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
.keyBy(CartInfo::getSensorId)
//.timeWindow(Time.seconds(10), Time.seconds(5))
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum("count");
//4.Sink
/*
1,5
2,5
3,5
4,5
*/
//result1.print();
result2.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
3. Scala代码演示时间窗口的滚动和滑动
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sensorStream: DataStream[SensorReading] = env
.socketTextStream("localhost", 9999)
.map(new MyMapToSensorReading)
// 1、使用window方法进行开窗设置
// 1.1、滚动窗口
/**
* 知识点:
* 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类,分别是创建处理时间窗口 和 事件时间窗口(事件时间窗口需要设置时间特性)
* 2、滚动窗口中,of方法可以设置2个参数,第一个是窗口的大小,第二个是时间偏移量(不设置时默认使用伦敦时间,当设置为-8时,为使用北京时间),偏移量设置时需要小于窗口大小
*/
val windowStream_1: DataStream[SensorReading] = sensorStream
.keyBy(_.id)
// .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8))) // 偏移量设置时需要小于窗口大小
// .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 事件时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 1.2、滑动窗口
/**
* 知识点:
* 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类
* 2、滑动窗口中,of方法可以设置3个参数,第一个是窗口大小,第二个是滑动步长,第三个是偏移量
*/
val windowStream_2: DataStream[SensorReading] = sensorStream
.keyBy(_.id)
// .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8))) // 偏移量设置时需要小于窗口大小
// .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5))) // 事件时间窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 1.3、会话窗口
val windowStream_3: DataStream[SensorReading] = sensorStream
.keyBy(_.id)
// .window(EventTimeSessionWindows.withGap(Time.minutes(10))) // 事件时间会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 2、使用timeWindow方法进行开窗
// 2.1、滚动窗口
val timeWindowStream_1: DataStream[SensorReading] = sensorStream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
// 2.2、滑动窗口
val timeWindowStream_2: DataStream[SensorReading] = sensorStream
.keyBy(_.id)
.timeWindow(Time.seconds(15), Time.seconds(5))
.reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
windowStream_1.print()
env.execute("TimeWindowDemo")