一、前言
算子会将一个或多个DataStream转换成一个新的DataStream。
在工作中使用最多的也就这些DataStream转换算子,学好这些算子是入门Flink的必要。
好在Flink的某些算子和Java8的lambda函数很像,这便于理解。下面我会先介绍Java的语法,再介绍Flink的语法,由浅入深。
下面可以从图中看到DataStream和不同Stream之间,经过不同算子可以相互转换。
二、Map
2.1 Java Lambda的Map
Map对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。
简单来说,Map是映射的意思,可将T类型输入,转成R类型的输出。
如下示例是获取WordCount对象集合中的word字段集合。List转换成List代码如下:
public class MapJavaDemo {
public static void main(String[] args) {
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Elasticsearch", 1));
//将每个字段从WordCount类型转成String类型,生成List<String>
List<String> words = wordCountList.stream()
.map(WordCount::getWord)
.collect(Collectors.toList());
System.out.println(words);
}
}
运行结果如下:
[Flink, Elasticsearch]
上面.map(WordCount::getWord)也可以写成.map(a -> a.getWord()),WordCount::getWord是一种简写方法罢了,a -> a.getWord()表达的更加直白。
2.2 Flink的Map
转换类型:DataStream → DataStream
- 说明:读取一个元素并生成一个新的元素,例如
- 举例:
输入 | map转换 | 输出 |
---|---|---|
1,2,3 | 乘以2 | 2,4,6 |
a,b,b | 添加一个元素1,组成Tuple2<String, Integer> | (a,1),(b,1),(b,1) |
下面代码举例
- 第一步:获取WordCount对象中word集合
- 第二步:将每个count * 2
public class MapFlinkDemo {
public static void main(String[] args) throws Exception {
//创建环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Elasticsearch", 1));
//设置数据源
DataSource<WordCount> dataSource = env.fromCollection(wordCountList);
//转换1:获取WordCount对象中word集合
MapOperator<WordCount, String> words = dataSource.map(a -> a.getWord());
//输出结果1
words.print();
//转换2:将每个count * 2
MapOperator<WordCount, Object> doubleCount = dataSource.map(a -> {
a.setCount(a.getCount() * 2);
return a;
});
//输出结果2
doubleCount.print();
}
}
运行结果如下:
Flink
Elasticsearch
WordCount{
word='Flink', count=6}
WordCount{
word='Elasticsearch', count=2}
三、FlatMap
FlatMap也称扁平化map,它可把多个集合放到一个集合里。
举例说明:
- WordCount程序中是把输入值,按照空格(" ")切分。
- “i love you"和"you love me” -> [“i”,“love”,“you”]和[“you”,“love”,“me”]
- Arrays.stream()将上面2个数组生成的2个Stream流,
- flatMap()把2个Stream流合成1个Stream
3.1 Java Lambda的FlatMap
public class FlatMapJavaDemo {
public static void main(String[] args) {
List<String> wordCountList = new ArrayList<>();
wordCountList.add("i love you");
wordCountList.add("you love me");
Map<String, Long> collect = wordCountList.stream()
//这一行会生成2个String[]:["i","love","you"]和["you","love","me"]
.map(a -> a.split(" "))
//Arrays.stream()将上面2个数组生成的2个Stream<String>流,
//flatMap()把2个Stream<String>流合成1个Stream<String>
.flatMap(Arrays::stream)
//在1个Stream<String>上面根据单词聚合统计
.collect(Collectors.groupingBy(w -> w, Collectors.counting()));
System.out.println(collect);
}
}
运行结果如下:
{
love=2, me=1, i=1, you=2}
3.2 Flink的FlatMap
- 转换类型:DataStream → DataStream
- 说明:多组数据->生成多个流->合并成一个流
- 举例:
输入 | flatMap转换 | 输出 |
---|---|---|
“I love coding”, “I love flink” | 切分后,组成Tuple2<String, Integer> | (flink,1)2个(love,1)2个(I,1)(coding,1) |
public class FlatMapFlinkDemo {
public static void main(String[] args) throws Exception {
//创建环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<String> wordCountList = new ArrayList<>();
wordCountList.add("i love you");
wordCountList.add("you love me");
//设置数据源
DataSource<String> dataSource = env.fromCollection(wordCountList);
//数据转换
FlatMapOperator<String, String> wordStream = dataSource.flatMap(
(FlatMapFunction<String, String>) (in, out) -> {
Arrays.stream(in.split(" ")).forEach(out::collect);
}
).returns(Types.STRING);
wordStream.print();
}
}
四、Filter
4.1 Java Lambda的Filter
这里的filter方法接收的是Predicate<? super T> predicate,这个返回boolean类型。
public class FilterJavaDemo {
public static void main(String[] args) {
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Elasticsearch", 1));
List<WordCount> collect = wordCountList.stream().filter(a -> a.getCount() > 1).collect(Collectors.toList());
System.out.println(collect);
}
}
[WordCount{word='Flink', count=3}]
4.2 Flink的Filter
- 转换类型:DataStream → DataStream
- 说明:该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出
- 举例:
输入 | flatMap转换 | 输出 |
---|---|---|
1, 2, 3, 4, 5, 6 | 找到奇数 | 1,3,5 |
public class FilterFlinkDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Elasticsearch", 1));
DataSource<WordCount> source = env.fromCollection(wordCountList);
FilterOperator<WordCount> filter = source.filter(a -> a.getCount() > 1);
filter.print();
}
}
运行结果如下:
WordCount{word='Flink', count=3}
五、KeyBy
在Flink中KeyBy就是分组,如果是DataSet用groupBy,是DataStream用keyBy。
5.1 Java lambda的groupingBy
先按第一个字段分组,再按第二个字段求和
public class GroupByJavaDemo {
public static void main(String[] args) {
List<WordCount> wordCountList = new ArrayList<>();
wordCountList.add(new WordCount("Flink", 3));
wordCountList.add(new WordCount("Flink", 2));
wordCountList.add(new WordCount("Elasticsearch", 1));
wordCountList.add(new WordCount("Elasticsearch", 5));
//先按第一个字段分组,再按第二个字段求和
Map<String, Integer> groupByThenSum = wordCountList.stream()
.collect(
Collectors.groupingBy(
WordCount::getWord,
Collectors.summingInt(WordCount::getCount)
)
);
System.out.println(groupByThenSum);
}
}
{Elasticsearch=6, Flink=5}
5.2 Flink的KeyBy
- 转换类型:DataStream → KeyedStream
- 说明:具有相同key的所有记录会分配给到同一分区,类似SQL的group by,在内部,keyBy()是使用hash分区实现
- 如果是元祖,keyBy()里是下标,如keyBy(0), keyBy(0,1)
- 如果是POJO,keyBy()里是字段名,如keyBy(“word”),如果根据多个字段分组,keyBy(“field1”, “field2”)
- 举例:WordCount程序
public class StreamWordCountLambdaBetter {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建数据源
DataStreamSource<String> lines = env.socketTextStream(BaseConstant.URL, BaseConstant.PORT);
//将一行数据,按照空格分隔,将每个字符转成Tuple2.of(s, 1)
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
(String line, Collector<Tuple2<String, Integer>> out) -> {
Arrays.stream(line.split(" ")).forEach(thisWord -> {
out.collect(Tuple2.of(thisWord, 1));
});
}
).returns(Types.TUPLE(Types.STRING, Types.INT));
//统计操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = wordAndOne
.keyBy(value -> value.f0)
.sum(1);
//sink
sumResult.print();
//执行
env.execute("StreamWordCountLambdaBetter");
}
}
六、Aggregations
6.1 Java Lambda的聚合函数
Java8中聚合函数非常简单,很容易理解和上手。
long count = pigs.stream().filter(a -> a.getAge() > 5).count();
System.out.println("age > 5的人数 = " + count);
//limit,取前几个
System.out.println("top2");
Stream<Pig> top2Pigs = pigs.stream().sorted(comparing(Pig::getAge)).limit(2);
top2Pigs.forEach(System.out::println);
int sumAge = pigs.stream().mapToInt(Pig::getAge).sum();
int maxAge = pigs.stream().mapToInt(Pig::getAge).max().getAsInt();
int minAge = pigs.stream().mapToInt(Pig::getAge).min().getAsInt();
double avgAge = pigs.stream().mapToInt(Pig::getAge).average().getAsDouble();
System.out.println("sumAge = " + sumAge);
System.out.println("maxAge = " + maxAge);
System.out.println("minAge = " + minAge);
System.out.println("avgAge = " + avgAge);
Optional<Pig> pigMaxAgeOptional = pigs.stream().collect(Collectors.maxBy(comparing(Pig::getAge)));
if (pigMaxAgeOptional.isPresent()){
System.out.println("maxAge = " + pigMaxAgeOptional.get().getAge());
}
6.2 Flink中的Aggregations
Flink中聚合有如下
其中要注意:
- min根据指定的字段取最小,它只返回最小的那个字段,而不是整个数据元素,对于其他的字段取了第一次取的值,不能保证每个字段的数值正确。
- minBy根据指定字段取最小,返回的是整个元素。
keyedStream.sum(0);
keyedStream.sum(“key”);
keyedStream.min(0);
keyedStream.min(“key”);
keyedStream.max(0);
keyedStream.max(“key”);
keyedStream.minBy(0);
keyedStream.minBy(“key”);
keyedStream.maxBy(0);
keyedStream.maxBy(“key”);
public class StreamMinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List data = new ArrayList<Tuple3<String, String, Integer>>();
data.add(new Tuple3<>("男", "老王", 80));
data.add(new Tuple3<>("男", "小王", 25));
data.add(new Tuple3<>("男", "老李", 85));
data.add(new Tuple3<>("男", "小李", 20));
DataStreamSource peoples = env.fromCollection(data);
//求年龄最小的人
SingleOutputStreamOperator minResult = peoples.keyBy(0).min(2);
minResult.print();
env.execute("StreamMinTest");
}
}
返回结果如下:
1> (男,老王,80)
1> (男,老王,25)
1> (男,老王,25)#年龄算对了,但是别的字段还是第一个老王
1> (男,老王,20)#年龄最小的居然还是第一个老王?
七、总结
本篇博客只是总结了部分Java8和Flink的算子,他们在Java和Flink上有共性,所以拿来一起讲,
这个是网上别的Flink博客所没有的。
感觉学习还是温故而知新,不能停下学习的脚步,也得时不时的回顾以前的知识,才能有新的收获。
下一篇博客Flink 实战(五) DataStream 常用算子(下)得讲讲Transformations 常用算子中的Reduce、Fold、Union、Connect、Split & Select。