14、Flink 实战 - Keyed State状态管理之MapState使用案例

一、MapState的方法

MapState的方法和Java的Map的方法极为相似,所以上手相对容易。
常用的有如下:

  • get()方法获取值
  • put(),putAll()方法更新值
  • remove()删除某个key
  • contains()判断是否存在某个key
  • isEmpty() 判断是否为空
     

二、定义MapStateDescriptor和获取MapState

MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
        "diff-count-map",//state的id
        String.class,//key的类型
        Integer.class);//value的类型

diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);

三、统计温度升高/降低超过阈值的次数

程序主题和上一篇博客Keyed State状态之ValueState一样

public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, SensorRecord, Tuple3<String, Integer, Integer>> {
   
     

    private int tempDiff;

    public MyKeyedProcessFunction(int tempDiff) {
   
     
        this.tempDiff = tempDiff;
    }

    //上次温度
    private transient ValueState<Double> lastTemp;

    //温度升高/降低超过预警值的次数
    private transient MapState<String, Integer> diffCountMap;

    private String upKey = "upKey";

    private String downKey = "downKey";

    @Override
    public void open(Configuration parameters) throws Exception {
   
     
        super.open(parameters);

        ValueStateDescriptor<Double> lastTempDescriptor = new ValueStateDescriptor<Double>(
                "last-temp",
                Double.class);

        lastTemp = getRuntimeContext().getState(lastTempDescriptor);

        MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
                "diff-count-map",
                String.class,
                Integer.class);

        diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);
    }

    @Override
    public void processElement(SensorRecord value, Context ctx, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
   
     

        //第一条数据,需要处理
        if (lastTemp.value() == null) {
   
     
            lastTemp.update(Double.MIN_VALUE);

            if (!diffCountMap.contains(upKey)){
   
     
                diffCountMap.put(upKey, 0);
            }

            if (!diffCountMap.contains(downKey)){
   
     
                diffCountMap.put(downKey, 0);
            }
        }
        else {
   
     
            boolean needOut = false;

            //温度升高超过阈值
            if (value.getRecord() - lastTemp.value() > tempDiff){
   
     
                diffCountMap.put(upKey, diffCountMap.get(upKey) + 1);
                needOut = true;
            }
            //温度降低超过阈值
            else if (lastTemp.value() - value.getRecord() > tempDiff) {
   
     
                diffCountMap.put(downKey, diffCountMap.get(downKey) + 1);
                needOut = true;
            }

            if (needOut && !diffCountMap.isEmpty()){
   
     
                out.collect(Tuple3.of(value.getId(), diffCountMap.get(upKey), diffCountMap.get(downKey)));
            }
        }

        if (value.getRecord() != null) {
   
     
            lastTemp.update(value.getRecord());
        }
    }
}