07、Flink 实战 - 时间特性、窗口、Watermark代码实践,支持Flink1.12

一、前言

时间、窗口和Watermark是Flink的很重要的概念,学习它们是掌握运用Flink的重中之重。

二、时间特性

在DataStream API中,你可以用时间特性告知Flink在创建窗口时如何定义时间。
 
时间特性是StreamExecutionEnvironment的一个属性,它可以接受如下值。

1 ProcessingTime 处理时间

简单来说,处理时间就是Flink算子处理该条数据时,当前的系统时间。

通常,在窗口算子中使用处理时间会导致不确定的结果,因为这取决于数据到达Flink窗口时速率。

但是用处理时间也是有优点的,因为它处理任务无需考虑迟到的数据,所以提供了极低的延迟,比用事件时间快。

当你对自己的网络和数据的顺序非常自信时,并且要求极低的延迟,可以考虑用ProcessingTime。

2 EventTime 事件时间

事件时间是指某个事件发生时,他天然自带的时间。

比如温度监控器,某一时刻,它把当前时间+温度发送远程服务器,不管这条消息在网络中传输了多久,当它到达服务器时,
服务器可以从中提取事件真正的发生的时间。

我们在工作中,一般用的是事件时间,它极大的保证了结果正确性。因为Flink可以根据事件时间,结合watermark和allowedLateness提供处理迟到数据的能力。

3 IngestionTime 摄入时间

数据源算子处理到该数据的时间,可以简单理解为该数据进入Flink处理引擎的时间,并自动生成watermark。

摄入时间是事件时间和处理时间的混合体。和事件时间相比,摄入时间的价值不大,摄入时间没有提供极低延迟,也无法保证结果正确性。

4 设置时间特性的代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//默认的时间特性是ProcessingTime,这里设置成EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

三、Window

Flink 实战(三) 大白话 时间 窗口 watermark,已经对窗口的分类有了概念上的介绍,这里不再赘述。Flink提供的窗口种类也很多,需要结合代码实践才能逐渐理解,下面先了解下代码结构。

1 代码结构

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

2 时间窗口

每个事件的产生,它就天然的带上了时间属性,不管是事件的产生时间还是事件进入Flink的时间,这些时间都可以作为业务维度来划分窗口。
如下图:滚动时间窗口中,每隔1分钟,划分一个窗口。
 

2.1 滚动时间窗口

滚动窗口将事件分配到长度固定且不重叠的桶中,如上图就是滚动时间窗口。
用滚动时间窗口,我们需要设置每个窗口的长度。

//这里不设置时间特性,默认用ProcessingTime
SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .timeWindow(Time.seconds(5))//设置每5s划分一个窗口
        .apply(new SensorRecordUtils.MyAvg());

timeWindow在Flink 1.12将标记过期,最好用下面的window()

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        //基于ProcessingTime的滚动时间窗口,每5s划分一个窗口
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .apply(new SensorRecordUtils.MyAvg());

举一反三,基于EventTime的滚动时间窗口如下,这里忽略设置EventTime时间戳的代码,后面会给。

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply(new SensorRecordUtils.MyAvg());

2.2 滑动时间窗口

滑动窗口将事件分配到大小固定且允许相互重叠的桶中,这意味着每个事件有可能同时属于多个桶。要指定窗口的长度和滑动间隔来定义滑动窗口。

对于滑动时间窗口,我们要指定窗口的时间长度滑动步长

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
		//设置每5s划分一个窗口,滑动步长为1s
        .timeWindow(Time.seconds(5), Time.seconds(1))
        .apply(new SensorRecordUtils.MyAvg());

timeWindow在Flink 1.12将标记过期,最好用下面的window()

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
        //timeWindow在Flink 1.12将标记过期,最好用上面的window()
        //.timeWindow(Time.seconds(5), Time.seconds(1))
        .apply(new SensorRecordUtils.MyAvg());

举一反三,基于EventTime的滑动时间窗口如下,这里忽略设置EventTime时间戳的代码,后面会给。

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
        .apply(new SensorRecordUtils.MyAvg());

2.3 会话窗口

会话窗口在一些常见的真实场景中非常有用,这些场景既不适合用滚动窗口,也不适用滑动窗口。

会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是 Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
        .apply(new SensorRecordUtils.MyAvg());

3 计数窗口

事件是依次进入Flink的,以固定个数的事件为单位,划分窗口,就是计数窗口。
如下图:每4个事件,划分一个窗口。
 

3.1 滚动计数窗口

SingleOutputStreamOperator<Double> result = dataStream
        .keyBy(a -> a.getId())
        .countWindow(5)
        .aggregate(new SensorRecordUtils.MyAvgTemp());

3.2 滑动计数窗口

SingleOutputStreamOperator<Double> result = dataStream
        .keyBy(a -> a.getId())
        .countWindow(5, 2)
        .aggregate(new SensorRecordUtils.MyAvgTemp());

四、处理迟到数据

要处理迟到数据,得先理解下在没有迟到数据的情况下,窗口是如何被触发计算并关闭的。

1 理想情况

下面我用ppt画了一幅图,事件按照事件时间的正确顺序进入,并设置5秒划分一个窗口。
窗口一般是左闭右开的,当事件时间=5s的数据到来时,它会放到[5, 10)这个桶中,并表示5s之前的事件数据都已经到达,触发桶1进行计算结果并关闭窗口。
ppt画图实属不易,如果我画的不对,欢迎指正。
 

2 watermark

在用EventTime时,处理乱序数据,得用到Watermark,通常结合Window+Watermark使用

  • 数据流中的Watermark用于表示timestamp小于Watermark的数据都已经到达,因此Window的执行是由Watermark触发的
  • Watermark是一条特殊的数据记录
  • Watermark必须单调递增,以确保任务的事件时间时钟向前推进
    下面我用ppt画一幅图,简单讲下watermark的工作流程。

 

2.1 设置事件时间和watermark

BoundedOutOfOrdernessTimestampExtractor默认实现周期性生成watermark,这样也比较符合大多数生成环境,这个周期可以在环境变量中设置。

//自动生成水印的时间间隔,默认值是200毫秒
env.getConfig().setAutoWatermarkInterval(300);

SingleOutputStreamOperator<SensorRecord> dataStream = source
        .filter(new SensorRecordUtils.MyFilter())
        .map(new SensorRecordUtils.MyMap())
        .filter(a -> a != null)
        //认为到达的数据可能乱序,设置延迟3s
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorRecord>(Time.seconds(3)) {
   
     

            //提取SensorRecord中时间戳
            @Override
            public long extractTimestamp(SensorRecord element) {
   
     
                return element.getTime();
            }
        });
SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        //timeWindow在Flink 1.12将标记过期,最好用上面的window()
        //.timeWindow(Time.seconds(5))
        .apply(new SensorRecordUtils.MyAvg());

2.2 Flink1.12中设置

在Flink1.11中就已经发现assignTimestampsAndWatermarks的有些实现过期了,现在Flink1.12了,官网推荐用WatermarkStrategy。

SingleOutputStreamOperator<SensorRecord> dataStream = source
        .filter(new SensorRecordUtils.MyFilter())
        .map(new SensorRecordUtils.MyMap())
        .filter(a -> a != null)
        //认为到达的数据可能乱序,设置延迟1s,用Flink 1.11的WatermarkStrategy
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<SensorRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner<SensorRecord>() {
   
     

                            //提取SensorRecord中时间戳
                            @Override
                            public long extractTimestamp(SensorRecord element, long recordTimestamp) {
   
     
                                return element.getTime();
                            }
                        })
        );

SingleOutputStreamOperator<Object> result = dataStream
        .keyBy(a -> a.getId())
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        //.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
        //timeWindow在Flink 1.12将标记过期,最好用上面的window()
        //.timeWindow(Time.seconds(5))
                .apply(new SensorRecordUtils.MyAvg());

3 用事件时间,但不用watermark

SingleOutputStreamOperator<SensorRecord> dataStream = source
        .filter(new SensorRecordUtils.MyFilter())
        .map(new SensorRecordUtils.MyMap())
        .filter(a -> a != null)
        //认为到来的数据的事件时间就是有序的,没有乱序,所以不用设置Watermark
        .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorRecord>() {
   
     

            //提取SensorRecord中时间戳
            @Override
            public long extractAscendingTimestamp(SensorRecord element) {
   
     
                return element.getTime();
            }
        });

在Flink1.11中,AscendingTimestampExtractor过时了,后期会删除,我们可按照下面这么写

.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());

4 allowedLateness和侧输出流

watermark触发了窗口关闭时间时,先输出一个计算结果,但是还有迟到的数据怎么办呢?
可以使用allowedLateness和侧输出流。

allowedLateness设置一个事件长度,在窗口关闭后的这个时间长度内,对于属于该窗口的迟到数据来一次,计算一次。

如果任然有数据在allowedLateness到期后才到,得用侧输出流,可以将这些数据数据到某个Kafka,后期再合并计算。当然进入侧输出流的也是那种延迟太过分的数据,也是极少的。

OutputTag<SensorRecord> lateTag = new OutputTag<>("late");

SingleOutputStreamOperator<Object> result = mapDataStream
         .keyBy(a -> a.getId())
         .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
         .allowedLateness(Time.minutes(1))
         .sideOutputLateData(lateTag)
         .apply(new MyAvg())

五、结语

学无止境,后面好好学Flink的状态方面的内容,并复习前面所学,温故而知新。