Flink自定义WindowAssigner

举报
yangxiao_mrs 发表于 2024/01/20 11:54:48 2024/01/20
【摘要】 通过自定义WindowAssigner,实现灵活开窗

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/** An example of grouped stream windowing into tumbling time windows. */
public class EventTimeCustomWindowExample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<Tuple2<String, LocalDateTime>> source =
            Arrays.asList(
                Tuple2.of("a", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 22))),

                Tuple2.of("b", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 24))),
                Tuple2.of("c", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 25))),

                Tuple2.of("d", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 29))),
                Tuple2.of("e", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 30))),
                Tuple2.of("f", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 31))),
                Tuple2.of("g", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 33))),
                Tuple2.of("h", LocalDateTime.of(LocalDate.now(), LocalTime.of(11, 50))),
                Tuple2.of("i", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 1))),

                Tuple2.of("j", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 3))),
                Tuple2.of("k", LocalDateTime.of(LocalDate.now(), LocalTime.of(13, 4))));

        SingleOutputStreamOperator<Tuple2<String, LocalDateTime>> stream =
            env.fromCollection(source)
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, LocalDateTime>>noWatermarks()
                        .withTimestampAssigner(
                            (record, ts) ->
                                record.f1
                                    .atZone(ZoneId.systemDefault())
                                    .toInstant()
                                    .toEpochMilli()));

        stream.keyBy(value -> 1)
            .window(
                CustomEventTimeWindowAssigner.of(
                    Time.minutes(3), LocalTime.of(11, 29), LocalTime.of(13, 2)))
            .apply(new WindowFunction<Tuple2<String, LocalDateTime>, Object, Integer, TimeWindow>() {
                @Override
                public void apply(Integer integer, TimeWindow timeWindow,
                    Iterable<Tuple2<String, LocalDateTime>> iterable, Collector<Object> collector) throws Exception {
                    System.out.println("");

                    SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println("timeWindow.getStart() " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(sdf.format(new Date(timeWindow.getStart()))));


                    Iterator<Tuple2<String, LocalDateTime>> it =  iterable.iterator();
                    while(it.hasNext()) {
                        System.out.println(it.next());
                    }

                    System.out.println("timeWindow.getEnd() " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(sdf.format(new Date(timeWindow.getEnd()))));
                }
            });
            //.reduce(new JoiningReducer())
            //.print();

        env.execute();
    }

    private static class JoiningReducer implements ReduceFunction<Tuple2<String, LocalDateTime>> {

        @Override
        public Tuple2<String, LocalDateTime> reduce(
            Tuple2<String, LocalDateTime> value1, Tuple2<String, LocalDateTime> value2) {
            return new Tuple2<>(
                value1.f0 + ", " + value2.f0,
                (value2.f1.isBefore(value1.f1) ? value2.f1 : value1.f1));
        }
    }
}

自定义WindowAssigner

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;


@PublicEvolving
public class CustomEventTimeWindowAssigner<T> extends WindowAssigner<T, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final LocalTime staticWindowStart;
    private final LocalTime staticWindowEnd;

    public CustomEventTimeWindowAssigner(
        long size, LocalTime staticWindowStart, LocalTime staticWindowEnd) {
        this.size = size;
        this.staticWindowStart = staticWindowStart;
        this.staticWindowEnd = staticWindowEnd;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
        T element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            long start = getWindowStart(timestamp, size);
            long end = getWindowEnd(timestamp, size);
            return Collections.singletonList(new TimeWindow(start, end));
        } else {
            throw new RuntimeException(
                "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
                    + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
                    + "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    private long getWindowStart(long timestamp, long windowSize) {
        long windowStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, windowSize);
        LocalTime windowStartLocalTime = timestampToLocalTime(windowStart);
        if (windowStartLocalTime.isAfter(staticWindowStart)
            && windowStartLocalTime.isBefore(staticWindowEnd)) {
            while (timestampToLocalTime(windowStart).isAfter(staticWindowStart)) {
                windowStart -= windowSize;
            }
        }
        return windowStart;
    }

    private long getWindowEnd(long timestamp, long windowSize) {
        long windowEnd = getWindowStart(timestamp, windowSize) + windowSize;
        LocalTime windowEndLocalTime = timestampToLocalTime(windowEnd);
        if (windowEndLocalTime.isAfter(staticWindowStart)
            && windowEndLocalTime.isBefore(staticWindowEnd)) {
            while (timestampToLocalTime(windowEnd).isBefore(staticWindowEnd)) {
                windowEnd += windowSize;
            }
        }
        return windowEnd;
    }

    @SuppressWarnings("unchecked")
    @Override
    public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return (Trigger<T, TimeWindow>) new CustomEventTimeTrigger();
    }

    @Override
    public String toString() {
        return "CustomDynamicEventTimeWindows()";
    }

    public static <T> CustomEventTimeWindowAssigner<T> of(
        Time size, long staticWindowStartMs, long staticWindowEndMs) {
        return new CustomEventTimeWindowAssigner<>(
            size.toMilliseconds(),
            timestampToLocalTime(staticWindowStartMs),
            timestampToLocalTime(staticWindowEndMs));
    }

    public static <T> CustomEventTimeWindowAssigner<T> of(
        Time size, LocalTime staticWindowStart, LocalTime staticWindowEnd) {
        return new CustomEventTimeWindowAssigner<>(
            size.toMilliseconds(), staticWindowStart, staticWindowEnd);
    }

    private static LocalTime timestampToLocalTime(long timestampMs) {
        Instant instant = Instant.ofEpochMilli(timestampMs);
        return LocalDateTime.ofInstant(instant, ZoneId.systemDefault()).toLocalTime();
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    /** The trigger for this custom window assigner. */
    private class CustomEventTimeTrigger extends Trigger<Object, TimeWindow> {

        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
            LocalTime localTime = timestampToLocalTime(timestamp);
            if (localTime.isAfter(staticWindowStart) && localTime.isBefore(staticWindowEnd)) {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            } else {
                if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                    // if the watermark is already past the window fire immediately
                    return TriggerResult.FIRE;
                } else {
                    ctx.registerEventTimeTimer(window.maxTimestamp());
                    return TriggerResult.CONTINUE;
                }
            }
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ctx.deleteProcessingTimeTimer(window.maxTimestamp());
        }

        @Override
        public String toString() {
            return "CustomEventTimeTrigger()";
        }
    }
}



【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。