一、概述

以wordcount为例,为什么每次输入数据,flink都能统计每个单词的总数呢?我们都没有显示保存每个单词的状态值,但是每来一条数据,都能计算单词的总数。事实上,flink在底层维护了每个 key的状态,就是state。比较于Spark,Spark如果没有显示保存其中的状态,它会统计当前批次的单词次数,也就是没有了历史总数,这就相当于,来一条数据我就处理,不管之前的数据,这就是无状态。总之,状态在Flink编程中显得极其重要,也是新一代实时流式处理框架的核心。

二、state 概念

state:一般指一个具体的task/operator的状态。State可以被记录,在失败的情况下数据还可以恢复,Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原始状态(raw state)和托管状态(managed state)。
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑它。
下图是保存状态流程:
 

三、state 类型

3.1、Operator State(算子状态)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有
数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
state是task级别的state,说白了就是每个task对应一个state。
 

Flink 为算子状态提供三种基本数据结构:

3.1.1、List State(列表状态)

简单来说,就是用一个列表集合保存当前task的状态。

3.1.2、Union List State(联合列表状态)

也是将当前task的状态保存到列表集合中,也普通的列表状态不同的是,当发生故障或者利用检查点(Checkpoint)启动应用程序的时候,就可以利用联合列表状态恢复。

3.1.3、Broadcast State(广播状态)

如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。相当于 Spark 的广播机制。

3.2、Keyed State(键控状态)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。
 
Flink 的 Keyed State 支持以下数据类型:

3.2.1、单值状态

ValueState[T] 保存单个值,数据类型是T
get操作: ValueState.value()
set操作: ValueState.update(T value)

3.2.1、列表状态

ListState[T] 保存列表,数据类型是T
添加一个元素: ListState.add(T value)
添加多个元素:ListState.addAll(List values)
获取全部数据:ListState.get()返回 Iterable
更新全部数据:ListState.update(List values)

3.2.3、key-value 状态

MapState<K, V>保存 Key-Value 对状态
获取一个 key 的value:MapState.get(UK key)
添加一个key-value:MapState.put(UK key, UV value)
判断是否包含一个key:MapState.contains(UK key)
移除一个key:MapState.remove(UK key)

3.2.4、聚合状态

第一个聚合状态:ReducingState[T] 之前有所代码实现过。
第二个聚合状态:AggregatingState<I, O>

3.2.5、清空状态

State.clear()是清空操作

3.3、案例

3.3.1、ValueState -案例

根据传感器id每接收到三条数据就计算平均温度并输出。

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class State_ValueState {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        inputDataStream.keyBy(SensorReading::getId)
                .flatMap(new CustomFlatMapFunction())
                .print();
        env.execute();
    }

    /**
     * RichFlatMapFunction -> FlatMapFunction 的富函数 有生命周期的
     * SensorReading -> 输入类型
     * Tuple2<String,Double> -> 输出类型
     */
    public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
   
     

        private ValueState<Tuple2<Long, Double>> valueState;

        @Override
        public void open(Configuration conf) throws Exception {
   
     
            // 初始化
            valueState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple2<Long, Double>>("value-state", Types.TUPLE(Types.LONG, Types.DOUBLE)));
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
   
     
            Tuple2<Long, Double> lastState = valueState.value();
            // 没有初始化
            if (lastState == null) {
   
     
                lastState = Tuple2.of(0L, 0.0d);
            }

            lastState.f0 += 1;
            lastState.f1 += sensorReading.getTemperature();

            valueState.update(lastState);

            if (lastState.f0 >= 3) {
   
     
                double avg = lastState.f1 / lastState.f0;
                collector.collect(new Tuple2<>(sensorReading.getId(), avg));
                valueState.clear();
            }
        }
    }
}

3.3.2、ListState - 案例

根据传感器id每接收到三条数据就计算平均温度并输出

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Collections;

public class State_ListState {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        inputDataStream.keyBy(SensorReading::getId)
                .flatMap(new CustomFlatMapFunction())
                .print();
        env.execute();
    }

    public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
   
     

        private ListState<Tuple2<String, Double>> listState;

        @Override
        public void open(Configuration parameters) throws Exception {
   
     
            listState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Double>>("list-state", Types.TUPLE(Types.STRING, Types.DOUBLE)));
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
   
     
            Iterable<Tuple2<String, Double>> lastListState = listState.get();
            // 还没有初始化
            if (lastListState == null) {
   
     
                listState.addAll(Collections.emptyList());
            }
            // 添加元素
            listState.add(new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()));

            // 判断
            ArrayList<Tuple2<String, Double>> listTuples = Lists.newArrayList(listState.get());
            if (listTuples.size() >= 3) {
   
     
                long count = listTuples.size();
                double tempTotal = 0.0;
                for (Tuple2<String, Double> tuple2 : listTuples) {
   
     
                    tempTotal += tuple2.f1;
                }
                double avg = tempTotal / count;
                collector.collect(new Tuple2<>(sensorReading.getId(), avg));
                listState.clear();
            }
        }
    }
}

3.3.3、MapState - 案例

根据传感器id每接收到三条数据就计算平均温度并输出

public class State_MapState {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        inputDataStream.keyBy(SensorReading::getId)
                .flatMap(new CustomFlatMapFunction())
                .print();
        env.execute();
    }

    public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
   
     

        private MapState<String, Double> mapState;

        @Override
        public void open(Configuration parameters) throws Exception {
   
     
            mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("map-state", String.class, Double.class));
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
   
     
            mapState.put(UUID.randomUUID().toString().substring(0, 8), sensorReading.getTemperature());
            if (Lists.newArrayList(mapState.keys()).size() >= 3) {
   
     
                int count = 0;
                double tempTotal = 0.0;
                for (Double temp : Lists.newArrayList(mapState.values())) {
   
     
                    count++;
                    tempTotal += temp;
                }

                double avg = tempTotal / count;
                collector.collect(new Tuple2<String, Double>(sensorReading.getId(), avg));
                mapState.clear();
            }
        }
    }
}

3.3.4、ReducingState - 案例

根据传感器id统计所有温度的总和

public class State_ReducingState {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        inputDataStream.keyBy(SensorReading::getId)
                .flatMap(new CustomFlatMapFunction())
                .print();
        env.execute();
    }

    public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, Double>> {
   
     

        private ReducingState<Double> reducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
   
     
            reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Double>("reducing-state", new ReduceFunction<Double>() {
   
     
                @Override
                public Double reduce(Double input1, Double input2) throws Exception {
   
     
                    return input1 + input2;
                }
            }, Double.class));
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, Double>> collector) throws Exception {
   
     
            reducingState.add(sensorReading.getTemperature());
            collector.collect(new Tuple2<>(sensorReading.getId(), reducingState.get()));
        }
    }
}

3.3.5、AggregateState - 案例

根据传感器id聚合性输出所有温度数据(持续性)

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class State_AggregatingState {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        inputDataStream.keyBy(SensorReading::getId)
                .flatMap(new CustomFlatMapFunction())
                .print();
        env.execute();
    }

    public static class CustomFlatMapFunction extends RichFlatMapFunction<SensorReading, Tuple2<String, String>> {
   
     

        private AggregatingState<SensorReading, String> aggregatingState;

        @Override
        public void open(Configuration parameters) throws Exception {
   
     
            /**
             * SensorReading -> 输入类型
             * String  -> 累加器
             * String -> 输出类型
             */
            AggregatingStateDescriptor<SensorReading, String, String> descriptor = new AggregatingStateDescriptor<>("aggregating-state", new AggregateFunction<SensorReading, String, String>() {
   
     
                // 初始化输出类型前缀
                @Override
                public String createAccumulator() {
   
     
                    return "温度列表:";
                }

                // 每来一条数据就拼接返回值
                @Override
                public String add(SensorReading sensorReading, String acc) {
   
     
                    return acc + "," + sensorReading.getTemperature();
                }

                // 输出返回值
                @Override
                public String getResult(String acc) {
   
     
                    return acc;
                }

                // 不同分区的结果进行拼接
                @Override
                public String merge(String acc1, String acc2) {
   
     
                    return acc1 + "," + acc2;
                }
            }, String.class);

            aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
        }

        @Override
        public void flatMap(SensorReading sensorReading, Collector<Tuple2<String, String>> collector) throws Exception {
   
     
            aggregatingState.add(sensorReading);
            collector.collect(new Tuple2<>(sensorReading.getId(), aggregatingState.get()));
        }
    }
}