简介
算子状态中有一类很特殊,就是广播状态(BroadcastState)。从概念和原理上讲,广播状态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接复制就可以了。然而在应用上,广播状态却与其他算子状态大不相同。本节就专门来讨论一下广播状态的使用。
一、基本用法
让所有并行子任务都持有同一份状态,也就意味着一旦状态有变化,所以子任务上的实例都要更新。什么时候会用到这样的广播状态呢?
一个最为普遍的应用,就是“动态配置”或者“动态规则”。我们在处理流数据时,有时会基于一些配置(configuration)或者规则(rule)。简单的配置当然可以直接读取配置文件,一次加载,永久有效;但数据流是连续不断的,如果这配置随着时间推移还会动态变化,那又该怎么办呢?
一个简单的想法是,定期扫描配置文件,发现改变就立即更新。但这样就需要另外启动一个扫描进程,如果扫描周期太长,配置更新不及时就会导致结果错误;如果扫描周期太短,又会耗费大量资源做无用功。解决的办法,还是流处理的“事件驱动”思路——可以将这动态的配置数据看作一条流,将这条流和本身要处理的数据流进行连接(connect),就可以实时地更新配置进行计算了。
由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就是一个典型的广播状态。我们知道**,广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。**
在代码上,可以直接调用DataStream的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到一个“广播流”(BroadcastStream);进而将要处理的数据流与这条广播流进行连接(connect),就会得到“广播连接流”(BroadcastConnectedStream)。注意广播状态只能用在广播连接流中。
关于广播连接流,已经在之前做过介绍,这里可以复习一下:
MapStateDescriptor<String, Rule> ruleStateDescriptor = new
MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {...} );
这里定义了一个“规则流”ruleStream,里面的数据表示了数据流stream处理的规则,规则的数据类型定义为Rule。于是需要先定义一个MapStateDescriptor来描述广播状态,然后传入ruleStream.broadcast()得到广播流,接着用stream和广播流进行连接。这里状态描述器中的key类型为String,就是为了区分不同的状态值而给定的key的名称。
对于广播连接流调用.process()方法,可以传入“广播处理函数”KeyedBroadcastProcessFunction或者BroadcastProcessFunction来进行处理计算。广播处理函数里面有两个方法.processElement()和.processBroadcastElement(),源码中定义如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends
BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx,
Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx,
Collector<OUT> out) throws Exception;
...
}
这里的.processElement()方法,处理的是正常数据流,第一个参数value就是当前到来的流数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数value就是广播流中的规则或者配置数据。两个方法第二个参数都是一个上下文ctx,都可以通过调用.getBroadcastState()方法获取到当前的广播状态;**区别在于,.processElement()方法里的上下文是“只读”的(ReadOnly),因此获取到的广播状态也只能读取不能更改;**而.processBroadcastElement()方法里的Context则没有限制,可以根据当前广播流中的数据更新状态。
Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String,
Types.POJO(Rule.class))).get("my rule");
通过调用ctx.getBroadcastState()方法,传入一个MapStateDescriptor,就可以得到当前的叫作“rules”的广播状态;调用它的.get()方法,就可以取出其中“myrule”对应的值进行计算处理。
二、代码实例
接下来我们举一个广播状态的应用案例。考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。
package com.kunan.StreamAPI.FlinkStat;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Tuple;
public class BehaviorPatternDetect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//用户的行为数据流
DataStreamSource<Action> actionStream = env.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "order")
);
//行为模式流,基于它构建广播流
DataStreamSource<Pattern> patternStream = env.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "order")
);
//定义广播变量描述器
MapStateDescriptor<Void, Pattern> descriptor = new MapStateDescriptor<Void,Pattern>("pattern", Types.VOID, Types.POJO(Pattern.class));
BroadcastStream<Pattern> broadcastStream = patternStream.broadcast(descriptor);
//连接两条流进行处理
SingleOutputStreamOperator<Tuple2<String, Pattern>> matches = actionStream.keyBy(data -> data.userID)
.connect(broadcastStream)
.process(new PatternDetector());
matches.print();
env.execute();
}
//实现自定义的KeyedBroadcastProcessFunction
public static class PatternDetector extends KeyedBroadcastProcessFunction<String,Action,Pattern,Tuple2<String,Pattern>>{
//定义一个keyedState保存上一次用户行为
ValueState<String> preActionState;
@Override
public void open(Configuration parameters) throws Exception {
preActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action", String.class));
}
@Override
public void processElement(Action value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
//从广播状态中获取匹配模式
ReadOnlyBroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
Pattern pattern = patternState.get(null);
//获取用户上一次的行为
String prevAction = preActionState.value();
//判断是否匹配
if(pattern != null && prevAction != null){
if(pattern.action1.equals(prevAction) && pattern.action2.equals(value.action)){
out.collect(new Tuple2<>(ctx.getCurrentKey(),pattern));
}
}
//更新状态
preActionState.update(value.action);
}
@Override
public void processBroadcastElement(Pattern value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
//从上下文中获取广播状态,并用当前数据更新状态
//获取广播状态
BroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
patternState.put(null,value);
}
}
//定义用户行为事件和模式的POJO类
public static class Action{
public String userID;
public String action;
public Action() {
}
public Action(String userID, String action) {
this.userID = userID;
this.action = action;
}
@Override
public String toString() {
return "Action{" +
"userID='" + userID + '\'' +
", action='" + action + '\'' +
'}';
}
}
public static class Pattern{
public String action1;
public String action2;
public Pattern() {
}
public Pattern(String action1, String action2) {
this.action1 = action1;
this.action2 = action2;
}
@Override
public String toString() {
return "Pattern{" +
"action1='" + action1 + '\'' +
", action2='" + action2 + '\'' +
'}';
}
}
}
这里我们将检测的行为模式定义为POJO类Pattern,里面包含了连续的两个行为。由于广播状态中只保存了一个Pattern,并不关心MapState中的key,所以也可以直接将key的类型指定为Void,具体值就是null。在具体的操作过程中,我们将广播流中的Pattern数据保存为广播变量;在行为数据Action到来之后读取当前广播变量,确定行为模式,并将之前的一次行为保存为一个ValueState——这是针对当前用户的状态保存,所以用到了KeyedState。检测到如果前一次行为与Pattern中的action1相同,而当前行为与action2相同,则发现了匹配模式的一组行为,输出检测结果。