Flink自定义WindowAssigner
【摘要】 通过自定义WindowAssigner,实现灵活开窗
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/** An example of grouped stream windowing into tumbling time windows. */
public class EventTimeCustomWindowExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Tuple2<String, LocalDateTime>> source =
Arrays.asList(
Tuple2.of("a", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 22))),
Tuple2.of("b", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 24))),
Tuple2.of("c", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 25))),
Tuple2.of("d", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 29))),
Tuple2.of("e", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 30))),
Tuple2.of("f", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 31))),
Tuple2.of("g", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 33))),
Tuple2.of("h", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 50))),
Tuple2.of("i", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 1))),
Tuple2.of("j", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 3))),
Tuple2.of("k", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 4))));
SingleOutputStreamOperator<Tuple2<String, LocalDateTime>> stream =
env.fromCollection(source)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, LocalDateTime>>noWatermarks()
.withTimestampAssigner(
(record, ts) ->
record.f1
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli()));
stream.keyBy(value -> 1)
.window(
CustomEventTimeWindowAssigner.of(
Time.minutes(3), LocalTime.of(11, 29), LocalTime.of(13, 2)))
.apply(new WindowFunction<Tuple2<String, LocalDateTime>, Object, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow timeWindow,
Iterable<Tuple2<String, LocalDateTime>> iterable, Collector<Object> collector) throws Exception {
System.out.println("");
SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("timeWindow.getStart() " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(sdf.format(new Date(timeWindow.getStart()))));
Iterator<Tuple2<String, LocalDateTime>> it = iterable.iterator();
while(it.hasNext()) {
System.out.println(it.next());
}
System.out.println("timeWindow.getEnd() " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(sdf.format(new Date(timeWindow.getEnd()))));
}
});
//.reduce(new JoiningReducer())
//.print();
env.execute();
}
private static class JoiningReducer implements ReduceFunction<Tuple2<String, LocalDateTime>> {
@Override
public Tuple2<String, LocalDateTime> reduce(
Tuple2<String, LocalDateTime> value1, Tuple2<String, LocalDateTime> value2) {
return new Tuple2<>(
value1.f0 + ", " + value2.f0,
(value2.f1.isBefore(value1.f1) ? value2.f1 : value1.f1));
}
}
}
自定义WindowAssigner
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
@PublicEvolving
public class CustomEventTimeWindowAssigner<T> extends WindowAssigner<T, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final LocalTime staticWindowStart;
private final LocalTime staticWindowEnd;
public CustomEventTimeWindowAssigner(
long size, LocalTime staticWindowStart, LocalTime staticWindowEnd) {
this.size = size;
this.staticWindowStart = staticWindowStart;
this.staticWindowEnd = staticWindowEnd;
}
@Override
public Collection<TimeWindow> assignWindows(
T element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
long start = getWindowStart(timestamp, size);
long end = getWindowEnd(timestamp, size);
return Collections.singletonList(new TimeWindow(start, end));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
private long getWindowStart(long timestamp, long windowSize) {
long windowStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, windowSize);
LocalTime windowStartLocalTime = timestampToLocalTime(windowStart);
if (windowStartLocalTime.isAfter(staticWindowStart)
&& windowStartLocalTime.isBefore(staticWindowEnd)) {
while (timestampToLocalTime(windowStart).isAfter(staticWindowStart)) {
windowStart -= windowSize;
}
}
return windowStart;
}
private long getWindowEnd(long timestamp, long windowSize) {
long windowEnd = getWindowStart(timestamp, windowSize) + windowSize;
LocalTime windowEndLocalTime = timestampToLocalTime(windowEnd);
if (windowEndLocalTime.isAfter(staticWindowStart)
&& windowEndLocalTime.isBefore(staticWindowEnd)) {
while (timestampToLocalTime(windowEnd).isBefore(staticWindowEnd)) {
windowEnd += windowSize;
}
}
return windowEnd;
}
@SuppressWarnings("unchecked")
@Override
public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return (Trigger<T, TimeWindow>) new CustomEventTimeTrigger();
}
@Override
public String toString() {
return "CustomDynamicEventTimeWindows()";
}
public static <T> CustomEventTimeWindowAssigner<T> of(
Time size, long staticWindowStartMs, long staticWindowEndMs) {
return new CustomEventTimeWindowAssigner<>(
size.toMilliseconds(),
timestampToLocalTime(staticWindowStartMs),
timestampToLocalTime(staticWindowEndMs));
}
public static <T> CustomEventTimeWindowAssigner<T> of(
Time size, LocalTime staticWindowStart, LocalTime staticWindowEnd) {
return new CustomEventTimeWindowAssigner<>(
size.toMilliseconds(), staticWindowStart, staticWindowEnd);
}
private static LocalTime timestampToLocalTime(long timestampMs) {
Instant instant = Instant.ofEpochMilli(timestampMs);
return LocalDateTime.ofInstant(instant, ZoneId.systemDefault()).toLocalTime();
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
/** The trigger for this custom window assigner. */
private class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
LocalTime localTime = timestampToLocalTime(timestamp);
if (localTime.isAfter(staticWindowStart) && localTime.isBefore(staticWindowEnd)) {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
} else {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString() {
return "CustomEventTimeTrigger()";
}
}
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)