1. 使用Java求日活的WindowFunction使用
DataStream<AppLogBean> homeExposureStream = appExposureStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AppLogBean>(Time.seconds(0)) {
@Override
public long extractTimestamp(AppLogBean element) {
return element.getTime() * 1000;
}
})
.filter(new FilterFunction<AppLogBean>() {
@Override
public boolean filter(AppLogBean value) throws Exception {
return "home_exposure".equals(value.getTopic()) && StringUtils.isNotBlank(value.getScdata());
}
});
SingleOutputStreamOperator<Tuple2<String, String>> userIdStream = homeExposureStream
.map(new MapFunction<AppLogBean, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(AppLogBean appLogBean) throws Exception {
String resultUserId = "1";
JSONObject scdataJson = JSONObject.parseObject(appLogBean.getScdata());
String user_id = scdataJson.getString("user_id");
resultUserId = user_id;
return Tuple2.of("dummy", resultUserId);
}
});
SingleOutputStreamOperator<String> result = userIdStream
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
.aggregate(new UniqueVisitorAggregateFunction(), new UniqueVisitorProcessWindowFunction());
result.print("result>>>>>>>>>");
}
public static class UniqueVisitorProcessWindowFunction extends ProcessWindowFunction<Long, String, String, TimeWindow> {
private final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
System.out.println("##### 当前的watermark为" + df.format(context.currentWatermark()));
System.out.println("##### 窗口开始时间######" + df.format(context.window().getStart()));
System.out.println("##### 窗口结束时间######" + df.format(context.window().getEnd()));
System.out.println("##### 该窗口当前统计的UV" + elements.iterator().next());
out.collect("UV " + elements.iterator().next());
}
}
public static class UniqueVisitorAggregateFunction implements AggregateFunction<Tuple2<String, String>, Tuple2<Set<String>, Long>, Long> {
@Override
public Tuple2<Set<String>, Long> createAccumulator() {
return Tuple2.of(new HashSet<>(), 0L);
}
@Override
public Tuple2<Set<String>, Long> add(Tuple2<String, String> value, Tuple2<Set<String>, Long> accumulator) {
if (!accumulator.f0.contains(value.f1)) {
accumulator.f0.add(value.f1);
accumulator.f1 += 1;
}
return accumulator;
}
@Override
public Long getResult(Tuple2<Set<String>, Long> accumulator) {
return accumulator.f1;
}
@Override
public Tuple2<Set<String>, Long> merge(Tuple2<Set<String>, Long> a, Tuple2<Set<String>, Long> b) {
return null;
}
2. 使用Scala演示WindowFunction的使用
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env
.socketTextStream("localhost", 9999)
.map(new MyMapToSensorReading)
.keyBy(_.id)
.timeWindow(Time.seconds(5))
val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {
override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)
}
})
val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
})
val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
out.collect((window.getStart, input.size))
}
})
val otherResult: DataStream[SensorReading] = sensorStream
.allowedLateness(Time.seconds(1))
.sideOutputLateData(new OutputTag[SensorReading]("late"))
.reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))
applyResult.print()
env.execute("WindowFunctionDemo")