一、前言
在上一篇博客Flink 实战(二)DataStream Transformations 常用算子(上),我总结了一些基础的算子,这些算子从功能上看属于映射、过滤和聚合的类型。本篇想介绍些归约功能的算子。
还有学习一门新的技术,的确需要坚持和多实践,不要急于求成。反正时间一大把,我们可以专心的学一两门技术,做到熟练至精通。不要想着学很多十八般武艺,成为所谓的全才。
例如一个人比较精通Elasticsearch和Flink,那么同事遇到相关问题时也会想到来请教他。在开发会议上给出的建议也有一定的分量。
二、Reduce
Reduce相当归并操作,比如输入的值有很多,但是我们只想返回一个值,例如求和,最大值,最小值,平均值等。
2.1 Java Lambda中的Reduce
//求和
int sum = Arrays.stream(nums).reduce(0, (a, b) -> a + b);
- reduced第一个参数0,是一个初始值
- (a, b) -> a + b是IntBinaryOperator类型,将2个元素结合生成一个新的元素
下图详细描述了reduce求和的过程
- reduce还可以用来求最大值、最小值
//求最大值
OptionalInt max= Arrays.stream(nums).reduce(Integer::max);
reduce求最大值图如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DIJ3hN2W-1606810078967)(https://statics.sdk.cn/articles/img/202011/58399208bba3dbee9655484365_1001173.png?x-oss-process=style/thumb)]
完整的代码示例如下:
public class ReduceJavaDemo {
public static void main(String[] args) {
int[] nums = {
4, 5, 3, 9};
//带初始值的求和
int sum = Arrays.stream(nums).reduce(0, (a, b) -> a + b);
System.out.println(sum);
//不带初始值的求和
//reduce还有一个重载的变体,它不接受初始值,但是会返回一个Optional对象
OptionalInt reduceSum = Arrays.stream(nums).reduce(Integer::sum);
if (reduceSum.isPresent()) {
System.out.println(reduceSum.getAsInt());
}
//求最大值
OptionalInt max= Arrays.stream(nums).reduce(Integer::max);
if (max.isPresent()){
System.out.println(max.getAsInt());
}
}
}
2.2 Flink中的Reduce
- 转换类型:KeyedStream→DataStream
- 说明:在分区的数据流上调用reduce函数:将当前元素与最后一个reduce的值合并生成新值。
reduce函数是将KeyedStream转换为DataStream,也就是reduce调用前必须进行分区,即得先调用keyBy()函数 - 举例(我这边为了测试,不是流处理):
public class ReduceFlinkDemo {
public static void main(String[] args) throws Exception {
//初始环境变量
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//设置数据源
Integer[] nums = {
1, 2, 3, 4, 5};
List<Integer> integers = Arrays.asList(nums);
DataSource<Integer> dataSource = env.fromCollection(integers);
//利用reduce求和
ReduceOperator<Integer> sum = dataSource.reduce(
(ReduceFunction<Integer>) (value1, value2) -> value1 + value2);
sum.print();
//利用reduce求最大值
ReduceOperator<Integer> max = dataSource.reduce(Integer::max);
max.print();
}
}
2.3 Flink的数据源是对象集合
这个时候groupBy不能再用元组的下标了,得用字段名称。
public class ReduceFlinkPojoDemo {
public static void main(String[] args) throws Exception {
//初始环境变量
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Flink", 2));
wordCountList.add(new WordCount("Elasticsearch", 1));
wordCountList.add(new WordCount("Elasticsearch", 5));
DataSource<WordCount> dataSource = env.fromCollection(wordCountList);
ReduceOperator<WordCount> sum = dataSource
.groupBy(WordCount::getWord)
.reduce((ReduceFunction<WordCount>) (value1, value2) ->
new WordCount(value1.getWord(), value1.getCount() + value2.getCount()));
sum.print();
}
}
输出结果如下:
WordCount{word='Elasticsearch', count=6}
WordCount{word='Flink', count=5}
三、Union
union简介:
DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。
下图union对白色和深色两个数据流进行合并,生成一个数据流。
- 在 DataStream 上使用 union 算子可以合并多个同类型的数据流,
- 并生成同类型的新数据流,即可以将多个 DataStream 合并为一个新的 DataStream。
- 数据将按照先进先出(First In First Out) 的模式合并,且不去重。
- 但是Union有一个限制,就是所有合并的流类型必须是一致的。
public class UnionFlinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream1 = env.socketTextStream(BaseConstant.URL, 9999);
DataStreamSource<String> stream2 = env.socketTextStream(BaseConstant.URL, 8888);
DataStreamSource<String> stream3 = env.socketTextStream(BaseConstant.URL, 6666);
DataStream<String> unionStream = stream1.union(stream2).union(stream3);
unionStream.print();
env.execute("UnionDemo");
}
}
四、Connect
connect的作用和union相似,也是连接不同数据流的:
- 两个DataStream 经过 connect 之后被转化为 ConnectedStreams,
- ConnectedStreams 会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
connect和union的区别:
- connect 只能连接两个数据流,union 可以连接多个数据流。
- connect 所连接的两个数据流的数据类型可以不一致。
- union所连接的两个数据流的数据类型必须一致。
使用connect有如下注意点:
1、 对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型;
2、 在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据;
3、 Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用;
/**
* connect 只能连接两个数据流,union 可以连接多个数据流;
*
* connect 所连接的两个数据流的数据类型可以不一致,
* union所连接的两个数据流的数据类型必须一致。
*
* 两个DataStream 经过 connect 之后被转化为 ConnectedStreams,
* ConnectedStreams 会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
*/
public class ConnectFlinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source1 = env.fromElements("a", "b", "c", "a");
DataStreamSource<Integer> source2 = env.fromElements(1, 2, 3, 2);
//将2个不同类型的数据源连接起来
ConnectedStreams<String, Integer> connectedStreams = source1.connect(source2);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = connectedStreams.map(new CoMapFunction<String, Integer, Tuple2<String, Integer>>() {
//对数据源1处理
@Override
public Tuple2<String, Integer> map1(String value) throws Exception {
return Tuple2.of(value, 1);
}
//对数据源2处理
@Override
public Tuple2<String, Integer> map2(Integer value) throws Exception {
return Tuple2.of(String.valueOf(value), 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map
.keyBy(0)
.sum(1);
sum.print();
env.execute();
}
}
五、Split & Select
Split和Select一般是一起使用的。
- Split根据某种规则从原始数据流中获取符合规则的数据,例如将数字按照奇数、偶数分2个流。
- Select就是从Split后的结果里,选择某个流。
下图举例把图像,按照(白色)和(非白色)Split分成2个不同的流。
下面代码举例把一串数字按照奇数、偶数分组。
public class SplitDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");
} else {
outPut.add("odd");
}
return outPut;
}
});
DataStream<Integer> odd = split.select("odd");
odd.print();
env.execute();
}
}
六、结语
本篇介绍了DataStream的常用算子,主要是流计算使用的,对于批处理DataSet,它也有很多算子,但是功能上差不多,而且批处理一般用的不太多。如果掌握好DataStream的算子,那么再看DataSet的也很容易理解。