Flink实战(四)Flink 的WaterMark机制

举报
Maynor学长 发表于 2022/10/27 15:20:30 2022/10/27
【摘要】 4、Flink 的WaterMark机制Watermarks(水位线)机制是event time处理进度的标志表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )基于watermark来进行窗口触发计算的判断 1 什么是乱序当数据是一条一条规规矩矩的按照流程发送,经过MQ传输,Flink接受后处理,这个时候,就是有序的处理。当出现异常,有些数据延迟了,排在后面的...

4、Flink 的WaterMark机制

Watermarks(水位线)机制

是event time处理进度的标志

表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )

基于watermark来进行窗口触发计算的判断

1 什么是乱序

  • 当数据是一条一条规规矩矩的按照流程发送,经过MQ传输,Flink接受后处理,这个时候,就是有序的处理。

    image-20201105113906832

  • 当出现异常,有些数据延迟了,排在后面的数据出现在前面了,这就出现了乱序。

    image-20201105114136227

  • 思考:我们应该以哪个时间类型来判定乱序呢?

2 WaterMark的概念

  • WaterMark(水位线)主要用来处理乱序事件,而正确地处理乱序事件,通常用WaterMark机制结合窗口来实现。
  • 从流处理原始设备产生事件,到Flink程序读取数据,再到Flink多个算子处理数据,在这个过程中由于网络或者系统等外部因素影响下,导致数据是乱序的,为了保证计算结果的正确性,需要等待数据,这就带来了计算的延迟。
  • 对于延迟太久的数据,不能无限期的等下去,所以必须有一个机制,来保证特定的时间后一定会触发窗口进行计算,这个触发机制就是WaterMark。

3 Watermark的原理

  • 在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。

    image-20201105120912885

    • 乱序会导致各种统计结果有问题。比如一个Time Window本应该计算1、2、3,结果3迟到了,那么这个窗口统计就丢失数据了,结果就不准确了。
  • 这种情况下就需要用到水位线(WaterMark)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。

  • 当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。

  • 那么 Flink 是怎么计算 WaterMark 的值呢?

    • ==Watermark = 进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t)==
  • ==那么有 Watermark 的 Window 是怎么触发窗口函数的呢?==

(1) watermark >= window的结束时间
(2) 该窗口必须有数据 注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间

4 生活场景理解WaterMark

  • 经常户外徒步的同学应该知道徒步小队通常会有一个正两副领队,队首队尾各一名副队,队伍前面的由一名副领队开路,队伍后面由一名副领队收队,正队长在队伍中穿插协调。

    image-20201105135625952

  • 队尾的领队叫后队领队,它的职责是要保证所有队员都在前面,也就是说后领队是整个队伍的队尾。当收队的时候,看见队尾的领队,那就说明整个队伍都已经完全到达了。

  • 这个WaterMark就相当于给整个数据流设置一个后领队。但是窗口不知道具体要来多少数据,所有只能设置一个时间上的限制,以此来推测当前窗口最后一条数据是否已经到达。假设窗口大小为10秒,watermark为进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t)

  • 接下来它会进行以下处理

    • 每来一条数据,取当前窗口内所有数据最大的事件发生时间
    • 用最大的事件发生时间扣减指定乱序时间
    • 看看是否符合触发窗口关闭计算的条件
    • 如果不符合,则继续进数据
    • 如果符合,则关闭窗口开始计算
  • 你看,多像户外徒步

  • 每来一个人,就问问他出发的时是多少号,然后确认所有已到队员的最大号码

  • 用最大的号码对比一下后领队的号码

  • 如果比后领队的号码小,就不收队,如果号码大于等于后领队的号码,就收队!!!

5 Watermark 使用的三种情况

  • (1)顺序数据流中的watermark

    • 在某些情况下,基于Event Time的数据流是有序的(相对event time)。在有序流中,watermark就是一个简单的周期性标记。

      	如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置乱序时间了,那么 t 就是 0。
        	所以 watermark= maxEventtime-0 = maxEventtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。
      

  • (2)乱序数据流中的watermark

    • 现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。

        	比如下图,假设窗口大小为1小时,延迟时间设为10分钟。明显,数据09:38已经迟到,但它依然会被正确计算,只有当有数据时间大于10:10的数据到达之后(即对应的watermark大于等于10:10-10min) 09:00~10:00的窗口才会执行计算。
      

  • (3)并行数据流中的 Watermark

    • 对应并行度大于1的source task,它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子,并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时,就会立马触发窗口操作。

      在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。

    image-20201117161802773

6、使用EventTime来作为基准时间处理本地json数据


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkSQLEvent {
    public static void main(String[] args) {
        //1.创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        //2.创建source table,这种方式会自动注册表
        tEnv.executeSql("CREATE TABLE userbase ("+
                " id Integer,"+
                " name STRING,"+
                " email STRING,"+
                " date_time TIMESTAMP(3),"+
                " WATERMARK FOR date_time AS date_time - INTERVAL '10' SECOND"+
                ") WITH ("+
                " 'connector' = 'filesystem',"+
                " 'path' = 'input/userbase.json',"+
                " 'format' = 'json'"+
                ")");
        //3.Flink SQL 查询
        Table resultTable = tEnv.sqlQuery("select * from userbase");

        resultTable.printSchema();

        //4.执行Flink SQL
        resultTable.execute().print();
    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。