Flink SQL中的WindowOperator

举报
潇湘暮雨 发表于 2020/12/27 20:24:16 2020/12/27
【摘要】         在流处理的实际应用中,数据是连续不断的,因此我们不可能等待所有的数据都到了才开始处理。虽然我们可以对每来一条数据就处理一次,但是有一些场景我们需要使用一段时间内的数据进行计算,比如统计在一分钟内用户对某商品的浏览量,而这就是window窗口的使用场景。Flink sql中对于窗口的使用主要是通过TUMBLE、HOP和SESSION,而这些操作都是基于WindowOperato...

        在流处理的实际应用中,数据是连续不断的,因此我们不可能等待所有的数据都到了才开始处理。虽然我们可以对每来一条数据就处理一次,但是有一些场景我们需要使用一段时间内的数据进行计算,比如统计在一分钟内用户对某商品的浏览量,而这就是window窗口的使用场景。

Flink sql中对于窗口的使用主要是通过TUMBLEHOPSESSION,而这些操作都是基于WindowOperator最为窗口算子进行实现,因此本文主要简单介绍WindowOperator

public void open() throws Exception

public void processElement(StreamRecord<BaseRow> record) throws Exception

public void onEventTime(InternalTimer<K, W> timer) throws Exception

public void onProcessingTime(InternalTimer<K, W> timer) throws Exception

protected abstract void emitWindowResult(W window) throws Exception;

private void registerCleanupTimer(W window)

private long cleanupTime(W window)

WindowOperator主要有以上几个方法,其中open方法主要是进行一些逻辑的初始化,processElement方法是对每个数据的处理过程和逻辑,onEventTime是由事件时间触发的方法,onProcessingTime是由处理时间触发的方法,emitWindowResult是输出计算结果,是由实现WindowOperator接口的类实现,registerCleanupTimer方法是注册窗口状态的实际清除时间,cleanupTime是获取窗口的清理时间。

  • 首先是processElement方法。该方法是对每个数据的处理过程和处理逻辑的实现,其代码逻辑如下:
  • 首先根据实际数据的值和所使用的时间类型即事件时间和处理时间,获取数据的时间戳timestamp,作为后续处理的判断依据。若为事件时间,则从数据中根据索引进行提取,若为处理时间,则获取当前的时间。
        long timestamp;
		if (windowAssigner.isEventTime()) {
			timestamp = inputRow.getLong(rowtimeIndex);
		} else {
			timestamp = internalTimerService.currentProcessingTime();
		}
  • 其后,也就是最重要的逻辑。首先会根据数据的实际情况和时间戳为其分配相对应的窗口window,根据所配置的triggerContext中的触发器判断是否输出结果到下游,当满足触发条件时,则会输出计算结果。但无论结果怎样,都会调用registerCleanupTimer方法注册窗口的清理时间,其实现后续介绍。
                   // the actual window which the input row is belongs to

                   Collection<W> actualWindows = windowFunction.assignActualWindows(inputRow, timestamp);

                   for (W window : actualWindows) {

                            isElementDropped = false;

                            triggerContext.window = window;

                            boolean triggerResult = triggerContext.onElement(inputRow, timestamp);

                            if (triggerResult) {

                                     emitWindowResult(window);

                            }

                            // register a clean up timer for the window

                            registerCleanupTimer(window);

                   }
  • 之后是registerCleanupTimer方法,该方法是注册窗口状态的清理时间,清理前窗口的状态值可以使用计算,清理后则不存在。
    • 该方法首先会调用cleanupTime方法来获取真正的清理时间。因为事件时间可能存在数据延迟到达的情况,所以又根据事件时间和其他时间分为两种情况。如果是事件时间,则其清理时间一般为窗口的最大时间和允许迟到的时间之和(该允许迟到时间应该是使用watermark后再次允许的时间),至于为什么存在比0小的可能,作者尚未弄明白,尚待咨询。对于处理时间,因为其不存在迟到一说,所以其窗口清理时间就是窗口的最大值。
                   if (windowAssigner.isEventTime()) {

                            long cleanupTime = Math.max(0, window.maxTimestamp() + allowedLateness);

                            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;

                   } else {

                            return Math.max(0, window.maxTimestamp());

                   }
  • 再之后是onEventTime方法,该方法主要是所注册的事件时间到时时所调用的方法。由于WindowOperator实现了Triggerable接口,而且其获取InternalTimerService对象时所传的时间触发时的调用对象为其本身,所以对于triggerContext中的触发器无论是事件时间还是处理时间其时间到达触发条件时调用的都是WindowOperatoronEventTime方法。
internalTimerService = getInternalTimerService("window-timers", windowSerializer, this)
  • 首先,会根据InternalTimer中获取触发的窗口,然后调用triggerContext中的触发器的相应方法判断是否输出结果。
                    triggerContext.window = timer.getNamespace();

                   if (triggerContext.onEventTime(timer.getTimestamp())) {

                            // fire

                            emitWindowResult(triggerContext.window);

                   }
  • 其后,如果当前是事件时间,那说明窗口清理时间到了,需要清楚窗口状态。之所以是窗口清理时间是因为当前的实现逻辑中,窗口清理时间是根据事件时间进行注册的。可是是否清理,还要判断当前的触发时间和之前获取的清理时间是否相同,其具体逻辑在cleanWindowIfNeeded方法中。
                   if (windowAssigner.isEventTime()) {

                            windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());

                   }
  • 最后是onProcessingTime方法。该方法主要是所注册的处理时间到时时所调用的方法。该方法与OnEventTime方法相似,同样是获取当前window,并根据triggerContext中触发器的结果判断是否导出计算结果。不过,不同的是窗口的清理不同,这主要是因为在处理时间语义中,窗口的清理时间是由处理时间决定的。处理时间可能会设置多个触发时段,但是只有窗口的最大值才是清理窗口的触发时间,这个的判断逻辑在cleanWindowIfNeeded方法内,它主要是判断当前触发时间和注册的清理触发时间是否和之前获得的触发时间是否相等来判断是否触发清理窗口状态。
                  triggerContext.window = timer.getNamespace();

                   if (triggerContext.onProcessingTime(timer.getTimestamp())) {

                            // fire

                            emitWindowResult(triggerContext.window);

                   }

                   if (!windowAssigner.isEventTime()) {

                            windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());

                   }
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。