一、问题
在Flink1.12.0有个重要的功能,批流一体,就是可以用之前流处理的API处理批数据。
只需要调用setRuntimeMode(RuntimeExecutionMode executionMode)。
RuntimeExecutionMode 就是一个枚举,有流、批和自动。
@PublicEvolving
public enum RuntimeExecutionMode {
STREAMING,
BATCH,
AUTOMATIC;
private RuntimeExecutionMode() {
}
}
下面用最最简单的WordCount程序演示。
1. 待测试的文本txt
apple flink flink
hello es flink
study flink
2. WordCount,不设置setRuntimeMode
public class Test01_WordCount_Tuple {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.readTextFile(BaseConstant.WORD_COUNT_TEST);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split(" ");
if (split != null && split.length > 0) {
Stream<String> stream = Arrays.stream(split);
stream.forEach(word -> out.collect(Tuple2.of(word, 1)));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne
.keyBy(f -> f.f0)
.sum(1);
sum.print();
env.execute();
}
}
运行结果:可见是按照流处理的方式处理了批数据,这不是我们想要的
5> (study,1)
7> (flink,1)
6> (es,1)
7> (flink,2)
7> (apple,1)
7> (flink,3)
7> (flink,4)
3> (hello,1)
3. WordCount,且setRuntimeMode(RuntimeExecutionMode.BATCH)
只需要设置env.setRuntimeMode(RuntimeExecutionMode.BATCH),依旧用流处理的keyBy方法。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
和之前唯一不同的是添加了这行,设置为批处理
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> source = env.readTextFile(BaseConstant.WORD_COUNT_TEST);
运行结果
7> (flink,4)
这个时候发现只有flink这个单词有输出,显然这是不对的。
4. 解决问题
通过百度发现该bug已经在Flink1.12.1修复了,bug是因为在批处理reduce里对单个值的累加器有问题。
升级到Flink1.12.1及以上(写博客时,最新是1.12.2)。发现可以解决问题。
升级Flink后运行结果如下:这个我们想要的批处理结果。
5> (study,1)
6> (es,1)
7> (apple,1)
3> (hello,1)
7> (flink,4)