public class SensorRecord {
//设备ID
private String id;
//当前温度
private Double record;
//当前时间戳
private Long time;
//无参构造器必要
public SensorRecord() {
}
//省略别的代码
}
在Flink1.12中,定义状态描述器时,不建议加默认值了,官方推荐在使用前,判断state是否为null,如果为null给默认值。
给默认值很重要,如果lastTempState.value() == null,而且你任然使用了state里的值,那肯定会报异常的。
//Flink1.12版本不推荐在定义描述器时给默认值,所以先判断为null时,给个默认值
if (lastTempState.value() == null) {
lastTempState.update(new SensorRecord("", 0.0, 0L));
}
if (timerTsState.value() == null){
timerTsState.update(0L);
}
定时器的使用,可以用处理时间,也可以用数据的事件事件
//处理时间定时器
ctx.timerService().registerProcessingTimeTimer(ts);
//事件时间定时器
ctx.timerService().registerEventTimeTimer(ts);
当前定时器触发时,会掉用onTimer方法,再这里可以做些数据处理
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,输出报警信息
out.collect("传感器" + ctx.getCurrentKey().toString() + "温度值连续上升");
timerTsState.clear();
}
完整的代码如下:
package learn.test05_state;
import learn.common.constant.BaseConstant;
import learn.common.entity.SensorRecord;
import learn.common.util.SensorRecordUtils;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class Demo12_ValueState {
public static void main(String[] args) throws Exception {
//执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//便于测试,设置1
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream(BaseConstant.URL, BaseConstant.PORT);
DataStream<SensorRecord> dataStream = source
.map(new SensorRecordUtils.BeanMap())
.filter(a -> a != null);
SingleOutputStreamOperator<String> process = dataStream.keyBy(a -> a.getId())
.process(new MyProcessFunction(10));
process.print();
env.execute("Demo12_ValueState");
}
/**
* 实现自定义处理函数,监测一段时间内温度连续上升
*/
public static class MyProcessFunction extends KeyedProcessFunction<String, SensorRecord, String> {
//时间间隔
private Integer interval;
public MyProcessFunction(Integer interval) {
this.interval = interval;
}
private transient ValueState<SensorRecord> lastTempState;
private transient ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
//上一次的温度的state描述器
ValueStateDescriptor<SensorRecord> lastTempStateDescriptor = new ValueStateDescriptor<>("last-temp", SensorRecord.class);
//获取state
lastTempState = getRuntimeContext().getState(lastTempStateDescriptor);
//定时器结束时间的state描述器
ValueStateDescriptor<Long> timerTsStateDescriptor = new ValueStateDescriptor<Long>("timer-ts", Long.class);
timerTsState = getRuntimeContext().getState(timerTsStateDescriptor);
}
@Override
public void processElement(SensorRecord value, Context ctx, Collector<String> out) throws Exception {
//Flink1.12版本不推荐在定义描述器时给默认值,所以先判断为null时,给个默认值
if (lastTempState.value() == null) {
lastTempState.update(new SensorRecord("", 0.0, 0L));
}
if (timerTsState.value() == null){
timerTsState.update(0L);
}
//取出状态-上次的温度
SensorRecord lastTemp = lastTempState.value();
//取出定时器的结束时间
Long timerTs = timerTsState.value();
//如果温度上升,注册10秒定时器,开始等待
if (value.getRecord() > lastTemp.getRecord() && timerTs == 0) {
//计算定时器时间戳
//为了方便测试,用了当前系统的处理时间,而不是数据中的事件时间
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
System.out.println("ts = " + ts);
//在处理时间ProcessingTime上,设置了定时器
ctx.timerService().registerProcessingTimeTimer(ts);
timerTsState.update(ts);
} else if (value.getRecord() < lastTemp.getRecord()) {
//温度减低了,删掉定时器
if (timerTs != null) {
ctx.timerService().deleteProcessingTimeTimer(timerTs);
}
//温度减低了,state定时器结束时间清空
timerTsState.clear();
}
//更新最近一次温度的state
lastTempState.update(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,输出报警信息
out.collect("传感器" + ctx.getCurrentKey().toString() + "温度值连续上升");
timerTsState.clear();
System.out.println("timerTsState.clear()");
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}