Flink时间与窗口

举报
米兰的小铁匠 发表于 2020/08/24 15:44:40 2020/08/24
【摘要】 在flink的世界里,时间和窗口是两个非常重要的性质,这二者是如何运转起来以及如何交互,本文将详细介绍这两个特性。基于时间本文会介绍接入时间、处理时间、事件时间、watermark等概念,基于窗口本文会介绍窗口类型、窗口分配器、窗口触发器、剔除器、迟到数据处理、窗口状态、窗口join、窗口整体执行流程等概念。

在flink的世界里,时间和窗口是两个非常重要的性质,这二者是如何运转起来以及如何交互,本文将详细介绍这两个特性。

1.0 时间

   flink定义了三种时间机制,事件发生时间 EventTime 事件处理时间 ProcessingTime 接入时间IngestionTime。

1.1 接入时间

  接入时间指的是事件接入flink系统的时间,其依赖于sourceFunction所在节点的系统时间。该语义下无法提供确切可复现的结果。

1.2 处理时间

   处理时间是指数据在算子上被计算时所在节点的系统时间,是事件被处理的时间,处理时间相对容易,延迟相对较低,但是结果一般不可复现,窗口算子使用该时间语义会产生一些不一致的结果。

1.3 事件时间

   事件时间(Event Time)是事件实际发生的时间,单调递增,每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中,时间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。数据世界的时间,根据数据携带的时间戳指定时间世界,也可以是一个整体递增可有一定程度的乱序的long型字段, 改时间语义下,结果确定 可重现, eventtime并不一定是一个真实时间戳,只要是一个整体递增的字段即可。可以视为模拟真实世界的时间。事件时间可以产生一致并且可重现的结果,只有依靠水位线指定的某个时间间隔内所有的时间戳都已到达,才会触发窗口。

1.4 watermark

   在事件时间语义下,watermark用于记录每个任务的当前的事件时间,算子会使用该时间来触发计算,flink可以基于Long型的整体基本有序递增字段来标识时间戳,水位线可以在结果的完整性和延迟之间做一个权衡,同时处理乱序的记录。

1.4.1 watermark生成

   使用事件处理时间时,必须显式的指定时间戳和生成watermark,flink内置三种方式实现

  1. 在sourceFunction中完成,为每条读出的记录分为时间戳和生成watermark

public interface SourceFunction<T> extends Function, Serializable {
 void collectWithTimestamp(T element, long timestamp);   //  为每条记录附加一个时间戳

/**
 * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
 * elements with a timestamp {@code t' <= t} will occur any more. If further such
 * elements will be emitted, those elements are considered <i>late</i>.
 * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
 * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
 * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
 * automatic ingestion time watermarks.
 */
 @PublicEvolving
 void emitWatermark(Watermark mark);    // 生成水位线并发送出去
 }		

 

  2. 周期性生成

   使用AssignerWithPeriodicWatermarks自定义函数,周期性的调用生成方法,其为现实时间驱动。

  3. 定点分配器

    使用AssignerWithPunctuatedWatermarks自定义函数,根据指定的输入事件的某个属性来生成水位线,其由数据驱动。

1.4.2 watermark传播

    watermark以广播的形式在算子间传播,通过算子的任务进行接收和发送,基本原则是单输入取其大,多输入取其小

    this figure shows how a task with four input partitions and three output partitions receives watermarks, updates its partition watermarks and event-time clock, and emits watermarks.此图描述了watermark的生成更新和发射过程。

1.5 事件时间和处理时间及watermark的关系

      其中横轴代表事件时间,纵轴代表处理时间, 在最理想情况下,事件时间和处理时间一样,虚线即是最理想情况,红色的线代表实际情况即实际watermark曲线。实际情况下处理时间永远比事件时间晚,红色线和虚线的垂直距离代表滞后时间,  红色线和虚线的水平距离代表了处理时间和事件时间之间的偏移,事件时间和处理时间之间的时间差并不是静态的,随着时间它会发生变动。

1.6 计时器与时间服务

     TimerService通常在Context和OnTimerContext对象中,普通的flink算子无法访问时间信息,但ProcessFunction系列算子可以访问问时间戳和watermark,还可以注册计时器,同时可以将数据输出到多个输出流。以ProcessFunction为例

public interface TimerService {
   当前处理时间
	long currentProcessingTime();
	水位线
	long currentWatermark();
    注册处理时间计时器
	void registerProcessingTimeTimer(long time);
      事件时间计时器
	void registerEventTimeTimer(long time);
}

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
	对每一条记录都会调用该函数, 可以通过Context访问时间戳和TimerService,
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
     回调函数,在计时器被触发时调用
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
	public abstract class Context {
	public abstract TimerService timerService();
      多流发送
	public abstract <X> void output(OutputTag<X> outputTag, X value);
	}
}

任务内部的时间服务会维护一些计时器,他们通过接收水位线来激活,再用timeservice去遍历触发,timeservice遍历是单线程的,timer在队列中是排好序存在优先队列中,
以HeapInternalTimerService示例如下

/**
 * {@link InternalTimerService} that stores timers on the Java heap.  // 保存在 java heap中
 */
public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
	private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;   
	private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;   Timer 被存在优先队列中,遍历第一个即可
	}
	}

当任务接到watermark时会基于watermark的时间戳来更新事件时间,同时任务将更新后的事件时间将watermark发出。


2.0 窗口

窗口是将数据集分割成成一个个有限长度数据区间的机制,即在数据流中增加的临时处理边界。其为前闭后开的区间。看下timewindow的定义。

先看timewindow的属性

public class TimeWindow extends Window {
	private final long start;
	private final long end;
	public long getStart() {
		return start;
	}
	public long getEnd() {
		return end;
	}
	@Override
	public long maxTimestamp() {
		return end - 1; 后面为开区间
	}

首先抛出一些概念, watermark水位线(上一章节已讲)、窗口类型、windowAssigner窗口分配器、Trigger窗口触发器、Evictor剔除器、Lateness延迟数据处理、outputtag输出标签、窗口算子(增量处理函数、全量处理函数),windowoperator、窗口状态、基于窗口的join。后续会详细讲解。

接下来抛出几个问题

1. what results are calculated  计算什么结果

2. where in event time are results calculated  事件时间到达何处计算结果

3. when in processing time are results materialized 什么时间输出结果

4. how do refinements of results relate  如何更新结果


窗口是基于流的计算,常见用法如下

      stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

2. 1窗口类型

flink的窗口分为滚动窗口、滑动窗口和会话窗口以及全局窗口。

滚动窗口按照固定时间或大小切分数据,窗口之间的元素没有重合。

滑动窗口是滚动窗口上增加滑动步长,滑动步长小于窗口长度窗口之间的元素有重叠,滑动步长大于窗口长度,一些元素不会回分配给任何窗口并被丢弃。

会话窗口是将一定时间内活跃度较高的数据分成一个窗口,长度可变,不重叠。其只需要定义sessiongap即可。

全局窗口将相同的key聚合到一起,没有起点和终点,需要自己去定义触发器。其他三种窗口flink都自带默认的触发器。四大窗口都有默认对应的窗口分配器,触发器和窗口分配器下文中会有讲到。同时也可以区分为timewindow和countwindow,其中countwindow是属于globalwindow的范畴。

2.2WindowAssigner 窗口分配器

Flink为常见的窗口内置了窗口分配器,窗口分配器会基于事件的某个属性将事件分配到一个或多个窗口,上一节介绍的四种类型窗口flink有内置有对应的窗口分配器,如基于时间的分配器根据时间戳或处理时间将时间分配到一个或多个窗口,

public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    可重写此方法将数据流中的元素分配至0-n个窗口中
	public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
	
	Returns {@code true} if elements are assigned to windows based on event time,
	public abstract boolean isEventTime();
	}
}

flink自带的窗口分配器如下:

你也可以继承windowassigner去实现窗口分配器,也可以实现动态窗口。

2.3 Trigger触发器

对于Trigger,用来指定如何触发窗口,窗口满足触发条件的话就会触发定义的windowfunction计算,不同的windowAssiger对应不同的trigger。

public abstract class Trigger<T, W extends Window> implements Serializable {
        // 对接入窗口的每个元素触发操作
	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; 
	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
	public void onMerge(W window, OnMergeContext ctx) throws Exception  // 对窗口和状态进行合并
	public abstract void clear(W window, TriggerContext ctx) throws Exception; // 清除方法

下面看一下触发窗口的返回结果定义

public enum TriggerResult {
    当前不触发计算
	CONTINUE(false, false),
    // 触发计算 并清除数据
	FIRE_AND_PURGE(true, true),
      触发计算 保留数据
	FIRE(true, false), 
       不触发计算、清除数据
	PURGE(false, true);
}

窗口触发可以是时间数量单次触发 多次触发,flink内置实现的trigger如下图,

同时你可以继承该类自定义一些触发器的实现来满足你的业务需求。

2.4 Evictor 清除器 

在触发器触发后,窗口函数执行前后清除窗口内的元素,注意使用此方法后不可以使用增量计算函数

public interface Evictor<T, W extends Window> extends Serializable {
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); // 窗口函数之前前清理元素
	void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); // 窗口函数执行后清理元素
}

flink提供了三种清理器如下图,

同时你可以实现该接口自定义清除器。

2.5 Lateness 延迟数据处理

水位线可以在结果的完整性和延迟之前做个平衡,对于算子watermark已经超出结束时间后到来的数据,视为迟到数据,对于迟到数据有三种处理方式

1. 直接丢弃

2. 将迟到数据重定向到单独的数据流

3. 将迟到数据更新并发出结果

重定向迟到数据可以通过sideOutputLateData 将迟到数据发送至副输出。

更新迟到数据可以使用allowedLateness(Time.()),配置该属性的窗口算子可以在watermark超出结束时间戳后不会立即删除窗口,窗口会继续保留该迟到时间的长度,水位线超出窗口endTime + latenessTime 后窗后才会被删除。

2.6 输出标签 outputTag

窗口的迟到数据可以创建OutputTag,再通过标签从窗口结果中将迟到数据筛选出来(getSideOutput()),

你只需要创建一个OutputTag,并使用sideOutputLateData(new OutputTag("lateTag"))。

同时在processFunction系列的函数中也可以使用该功能。

2.7 windowFunction

窗口将无界数据流上划分出有界数据区间,对窗口中元素做计算的函数即为窗口函数,按计算方式的不同可以划分为增量函数和全量函数。

2.7.1 增量函数

使用增量函数时窗口只维护中间结果的状态值,对于加入窗口的每个元素,窗口都会基于该状态值计算结果并更新状态,不会保存原始数据,ReduceFunction、AggregateFunction、FlodFunction均属于增量聚合函数。

2.7.2 全量函数

增量函数不能满足业务逻辑时,可以使用全量函数,如求窗口的中位数,使用全量函数时窗口会缓存属于该窗口的所有元素,所有元素收集完成后在执行遍历计算。全量聚合函数有ProcessWindowsFunction,该函数功能十分强大,其内部类Context可以获取窗口的元数据,以及操作窗口状态(其和ProcessFunction都是继承自RichFunction),同时支持多输出。

2.7.3 增量函数与全量函数的整合

增量聚合函数虽然性能强大,但很多时候需要获取一些窗口元数据信息,如key、窗口边界之类的信息,此时就可以结合增量函数和全量函数使用,增量函数聚合结果,窗口触发后,将聚合后的一条结果输出到全量函数中取获取窗口元数据以及状态数据。(This allows it to incrementally compute windows while having access to the additional window meta information of the ProcessWindowFunction.)

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

增量全量函数整合的工作流程如下:


2.8 窗口状态

     除了带剔除器的窗口,每个窗口都会有状态,状态在windowoperator初始化使生成,在窗口结束后销毁,AggregateFunction对应AggregatingState,FlodFunction对应FoldingState,ReduceFunction对应ReducingState,它们都存储增量计算的中间结果,ProcessWindowFunction 对应listState,数据到来时,会放在listState中,窗口触发时遍历全量计算,这四种状态均继承于AppendingState,都可以做Merge操作。除了通过RichFunction或者继承CheckpointedFunction来操作状态外,ProcessWindowFunction 也可以操作状态,每个窗口实例保存对应的状态数据,可以通过Context对象获取state数据。

public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
			AggregateFunction<T, ACC, V> aggregateFunction,
			ProcessWindowFunction<V, R, K, W> windowFunction,
			TypeInformation<ACC> accumulatorType,
			TypeInformation<V> aggregateResultType,
			TypeInformation<R> resultType) {
		//clean the closures
		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
		aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
	        获得key生成器
		KeySelector<T, K> keySel = input.getKeySelector();
		OneInputStreamOperator<T, R> operator;
		生成窗口对应的状态描述符
		AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
					aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
		获取opetrator传入windowassigner、key生成器、状态描述符、窗口函数、触发器、延迟处理方式以及迟到数据输出tag
		operator = new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalSingleValueProcessWindowFunction<>(windowFunction),
					trigger,
					allowedLateness,
					lateDataOutputTag);
		return input.transform(opName, resultType, operator);
	}
	@Override
	public void open() throws Exception {
		super.open();
		this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
		timestampedCollector = new TimestampedCollector<>(output);
		internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
		triggerContext = new Context(null, null);
		processContext = new WindowContext(null);
		// NOTE - the state may be null in the case of the overriding evicting window operator
		状态初始化,带evictor的窗口除外
		if (windowStateDescriptor != null) {
			windowState = (InternalAppendingState<K, W, IN, ACC, ACC>)
				getContextStateHelper().getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
		}

		}
	}

      public abstract class Context implements java.io.Serializable {
		/** Returns the current processing time. */
		public abstract long currentProcessingTime();

		/** Returns the current event-time watermark. */
		public abstract long currentWatermark();

		该状态仅用于当前窗口
		public abstract KeyedStateStore windowState();

		该状态可以跨窗口
		public abstract KeyedStateStore globalState();
		
		Emits a record to the side output identified by the {@link OutputTag}.
		public abstract <X> void output(OutputTag<X> outputTag, X value);
	}

操作状态时一定要主要状态泄露问题,flink系列下一期会写一篇flink状态与容错详细讲解状态。

2.9 windowOperator

接下来结合代码讲下window的工作流程,

public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
	 1.获得窗口对应的windows,由windowAssigner完成
	  final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);
      2.获得数据对应的key,此值为keyby时有KeySelector生成
	  final K key = this.<K>getKeyContext().getCurrentKey();
		
	 for (W window: elementWindows) {
	    // drop if the window is already late, 如果是迟到数据,则抛弃
	    if (isWindowLate(window)) {
		continue;
	   }
	   isSkippedElement = false;
	   windowState.setCurrentNamespace(stateWindow);
           将元素存入windowState
	   windowState.add(element.getValue()); 
      判断该元素是否触发trigger
	   TriggerResult triggerResult = triggerContext.onElement(element);
      如果窗口被触发,则拿到窗口状态,放入窗口函数中计算
           if (triggerResult.isFire()) {
		ACC contents = windowState.get();
		emitWindowContents(actualWindow, contents);
	    }
      清理窗口状态
	    if (triggerResult.isPurge()) {
		windowState.clear();
	    }
	    注册计时器,触发时清理窗口
	    registerCleanupTimer(actualWindow);
	    }
	    if(isSkippedElement && isElementLate(element)) {
		为迟到数据打上标签
		if (lateDataOutputTag != null){
		  sideOutput(element);
		} 
		}
	  }
	private void emitWindowContents(W window, ACC contents) throws Exception {
		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
		processContext.window = window;
		窗口函数在此处被调用
		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
	}

2.10 窗口join

      窗口可以按照相同的条件对两个流进行关联操作,生成JoinedStream,where为第一个流指定key,equalTo为第二个流指定key,并通过window方法指定windowassigner,最后指定窗口函数。(A window join joins the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a window assigner and are evaluated on elements from both of the streams.),流同时也可以基于Interval进行join操作、

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

基于窗口的join操作流程图如下:

总结,回答上文四个问题, function将解决what问题,window解决where问题,Trigger、watermark、lateness解决when的问题,Lateness三种处理方式可以是how的解决方式之一。

3. 后记

所以如果要体验或使用云上flink服务的话,请来华为云 DLI-FLINK serverless云服务。


参考:

https://ci.apache.org/projects/flink/flink-docs-stable/

https://github.com/apache/flink/tree/blink

https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/

https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

https://www.oreilly.com/library/view/stream-processing-with/9781491974285/


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。