10、Flink 实战 - 元组Tuple POJO用KeySelector 多个字段keyBy

一、元组

假设有个流

DataStream<Tuple2<String, Integer>> wordAndOne = ....

1. 单个字段keyBy

用字段位置
wordAndOne.keyBy(0)

用字段表达式
wordAndOne.keyBy(v -> v.f0)

2. 多个字段keyBy

用字段位置
wordAndOne.keyBy(0, 1)

用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
   
     
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
   
     
        return Tuple2.of(value.f0, value.f1);
    }
});

上述可用lambda简化

wordAndOne.keyBy(
        (KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> 
                Tuple2.of(value.f0, value.f1)
);

二、POJO

假设有个流

DataStream<PeopleCount> source = ...

PeopleCount的类定义是

public class PeopleCount {
   
     

    private String province;

    private String city;

    private Integer counts;

    public PeopleCount() {
   
     
    }
	省略其他代码。。。
}

1. 单个字段keyBy

source.keyBy(a -> a.getProvince())

source.keyBy(PeopleCount::getProvince)

2. 多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
   
     
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
   
     
        return Tuple2.of(value.getProvince(), value.getCity());
    }
});

上述可用lambda简化

map.keyBy(
        (KeySelector<PeopleCount, Tuple2<String, String>>) value -> 
                Tuple2.of(value.getProvince(), value.getCity())
);