Flink实战(四)Flink 的WaterMark机制
4、Flink 的WaterMark机制
Watermarks(水位线)机制
是event time处理进度的标志
表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )
基于watermark来进行窗口触发计算的判断
1 什么是乱序
-
当数据是一条一条规规矩矩的按照流程发送,经过MQ传输,Flink接受后处理,这个时候,就是有序的处理。
-
当出现异常,有些数据延迟了,排在后面的数据出现在前面了,这就出现了乱序。
-
思考:我们应该以哪个时间类型来判定乱序呢?
2 WaterMark的概念
- WaterMark(水位线)主要用来处理乱序事件,而正确地处理乱序事件,通常用WaterMark机制结合窗口来实现。
- 从流处理原始设备产生事件,到Flink程序读取数据,再到Flink多个算子处理数据,在这个过程中由于网络或者系统等外部因素影响下,导致数据是乱序的,为了保证计算结果的正确性,需要等待数据,这就带来了计算的延迟。
- 对于延迟太久的数据,不能无限期的等下去,所以必须有一个机制,来保证特定的时间后一定会触发窗口进行计算,这个触发机制就是WaterMark。
3 Watermark的原理
-
在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。
- 乱序会导致各种统计结果有问题。比如一个Time Window本应该计算1、2、3,结果3迟到了,那么这个窗口统计就丢失数据了,结果就不准确了。
-
这种情况下就需要用到水位线(WaterMark)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。
-
当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
-
那么 Flink 是怎么计算 WaterMark 的值呢?
- ==Watermark = 进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t)==
-
==那么有 Watermark 的 Window 是怎么触发窗口函数的呢?==
(1) watermark >= window的结束时间
(2) 该窗口必须有数据 注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间
4 生活场景理解WaterMark
-
经常户外徒步的同学应该知道徒步小队通常会有一个正两副领队,队首队尾各一名副队,队伍前面的由一名副领队开路,队伍后面由一名副领队收队,正队长在队伍中穿插协调。
-
队尾的领队叫后队领队,它的职责是要保证所有队员都在前面,也就是说后领队是整个队伍的队尾。当收队的时候,看见队尾的领队,那就说明整个队伍都已经完全到达了。
-
这个
WaterMark
就相当于给整个数据流设置一个后领队。但是窗口不知道具体要来多少数据,所有只能设置一个时间上的限制,以此来推测当前窗口最后一条数据是否已经到达。假设窗口大小为10秒,watermark为进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t) -
接下来它会进行以下处理
- 每来一条数据,取当前窗口内所有数据最大的事件发生时间
- 用最大的事件发生时间扣减指定乱序时间
- 看看是否符合触发窗口关闭计算的条件
- 如果不符合,则继续进数据
- 如果符合,则关闭窗口开始计算
-
你看,多像户外徒步
-
每来一个人,就问问他出发的时是多少号,然后确认所有已到队员的最大号码
-
用最大的号码对比一下后领队的号码
-
如果比后领队的号码小,就不收队,如果号码大于等于后领队的号码,就收队!!!
5 Watermark 使用的三种情况
-
(1)顺序数据流中的watermark
-
在某些情况下,基于Event Time的数据流是有序的(相对event time)。在有序流中,watermark就是一个简单的周期性标记。
如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置乱序时间了,那么 t 就是 0。 所以 watermark= maxEventtime-0 = maxEventtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。
-
-
(2)乱序数据流中的watermark
-
现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。
比如下图,假设窗口大小为1小时,延迟时间设为10分钟。明显,数据09:38已经迟到,但它依然会被正确计算,只有当有数据时间大于10:10的数据到达之后(即对应的watermark大于等于10:10-10min) 09:00~10:00的窗口才会执行计算。
-
-
(3)并行数据流中的 Watermark
-
对应并行度大于1的source task,它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子,并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时,就会立马触发窗口操作。
在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。
-
6、使用EventTime来作为基准时间处理本地json数据
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkSQLEvent {
public static void main(String[] args) {
//1.创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2.创建source table,这种方式会自动注册表
tEnv.executeSql("CREATE TABLE userbase ("+
" id Integer,"+
" name STRING,"+
" email STRING,"+
" date_time TIMESTAMP(3),"+
" WATERMARK FOR date_time AS date_time - INTERVAL '10' SECOND"+
") WITH ("+
" 'connector' = 'filesystem',"+
" 'path' = 'input/userbase.json',"+
" 'format' = 'json'"+
")");
//3.Flink SQL 查询
Table resultTable = tEnv.sqlQuery("select * from userbase");
resultTable.printSchema();
//4.执行Flink SQL
resultTable.execute().print();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)