flink技术之Time和Watermark
时间类型
|
定义(生成时机)
|
核心特点
|
业务适用性
|
EventTime
|
事件 / 数据真实产生时的时间(如日志生成时间、订单创建时间)
|
反映事件本质,与数据本身强绑定,不依赖处理系统
|
需精准统计 “事件实际发生时段” 的场景(如按订单创建时间统计日销售额、按日志生成时间统计故障数),是生产环境首选
|
IngestionTime
|
数据到达 Flink 系统的时间(如进入 Source 算子的时间)
|
介于 EventTime 与 ProcessingTime 之间,无需用户指定时间戳
|
对时间精度要求不高、无需处理数据乱序的简单场景(如临时数据监控、非核心指标统计),实际应用较少
|
ProcessingTime
|
数据被 Flink 处理时的系统时间(如进入 Window 算子的时间)
|
依赖处理节点的系统时钟,计算简单但易受延迟影响
|
对时间准确性无要求、追求高吞吐的场景(如实时数据抽样、非时序相关的计算),不适用于核心业务统计
|
- 外卖支付场景:支付事件实际发生于 11:59(EventTime),因网络延迟 12:01 才被处理(ProcessingTime)。若统计 12 点前订单金额,需以 EventTime 为准才能纳入统计,符合业务本质。
2.故障日志统计:日志生成于 23:59:58(EventTime),23:59:59 进入 Flink(IngestionTime),00:00:01 到达 Window(ProcessingTime)。统计当日故障数时,只有 EventTime 能准确归属到 “当天”,避免跨天统计误差。
3.用户抢购场景:A 用户 11:01 操作(EventTime)但日志延迟,B 用户 11:02 操作(EventTime)日志先到达。按业务逻辑 A 应优先抢购成功,需基于 EventTime 排序;若按 ProcessingTime 则会误判 B 成功,体现 EventTime 对业务正确性的关键作用。
- 窗口提前触发,延迟数据无法被统计,造成结果缺失;
- 若等待所有延迟数据,会导致窗口无限期阻塞,影响实时性。
- 定义:Watermark 是为数据额外添加的 “时间戳标记”,本质是 “当前流中最大 EventTime 减去允许的最大延迟时间”,公式为:
- 窗口触发规则:仅当同时满足以下两个条件时,EventTime 窗口才会触发计算:
- 窗口内有数据(避免空窗口计算);
2.Watermark >= 窗口的结束时间(确保 “允许延迟时间内的数据已基本到达”)。
- 当流中出现 EventTime=11:00:04 的数据时,当前最大 EventTime=11:00:04,Watermark=11:00:01(11:00:04-3 秒),此时 Watermark < 窗口结束时间 11:00:05,窗口不触发;
- 当流中出现 EventTime=11:00:08 的数据时,当前最大 EventTime=11:00:08,Watermark=11:00:05(11:00:08-3 秒),此时 Watermark >= 窗口结束时间 11:00:05,窗口触发计算;
- 若后续有 EventTime=11:00:03 的延迟数据(在允许延迟 3 秒内,即 Watermark=11:00:05 前到达),仍会被纳入[11:00:00, 11:00:05)窗口;若数据延迟超过 3 秒(如 EventTime=11:00:01 但在 Watermark=11:00:05 后到达),则会被窗口丢弃(可通过 “侧输出流” 收集避免丢失)。
Time的分类
EventTime:事件时间,是事件/数据真真正正发生时/产生时的时间
IngestionTime:摄入时间,是事件/数据到达流处理系统的时间
ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间
EventTime的重要性
假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分
问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准
一条错误日志的内容为:
2020-11:11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11:11 23:59:59 --摄入时间
到达Window的时间为2020-11:12 00:00:01 --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质!
某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
A用户在 11:01:00 对 App 进行操作,B用户在 11:02:00 操作了 App,
但是A用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B用户的消息,然后再接受到A用户的消息,消息乱序了。
问题:
如果这个是一个根据用户操作先后顺序,进行抢购的业务,那么是A用户成功还是B用户成功?
答案:
应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,可能直接按B成功算
也就是说,实际开发中希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序,按照事件时间处理起来有难度!
在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。我们先来设想一下下面这个场景:
原本应该被该窗口计算的数据因为网络延迟等原因晚到了,就有可能丢失了
小结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决!
Watermark是什么?
Watermark就是给数据额外添加的一列时间戳!
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
Watermark能解决什么问题,如何解决的?
有了Watermark 就可以在一定程度上
解决数据乱序或延迟达到问题!
有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:
1.窗口有数据
2.Watermark >= 窗口的结束时间
满足以上条件则触发窗口计算!
以前窗口触发:系统时间到了窗口结束时间就触发
现在窗口触发:Watermark >= 窗口的结束时间
而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的
需要记住:
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
窗口触发时机 : Watermark >= 窗口的结束时间
Watermark图解原理
总结:
1.Watermark 是一个单独计算出来的时间戳
2.Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
3.Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
4.Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
5.延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!
Watermark代码演示
需求
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
代码演示-开发版
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/event-time/generating_watermarks/
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
*
* Desc 演示Flink-EventTime+Watermark在一定程度上解决数据乱序/延迟到达问题
* 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermarkDemo01 {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random ran = new Random();
while (flag){
String id = UUID.randomUUID().toString();
int uid = ran.nextInt(3);
int money = ran.nextInt(101);
//模拟数据延迟
long createTime = System.currentTimeMillis() - (1000 * ran.nextInt(3));//事件时间
ctx.collect(new Order(id,uid,money,createTime));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}).setParallelism(1);//方便观察
//TODO 3.transformation-数据转换处理
//====下面的需要掌握======
//https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html
//-1.告诉Flink使用EventTime来进行窗口计算
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//过期API,新版本中不需要设置,默认就是
//-2.告诉Flink最大允许的延迟时间/乱序时间为多少
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
/*public interface TimestampAssigner<T> {
long extractTimestamp(T element, long recordTimestamp);
}*/
//-3.告诉Flink哪一列是事件时间
.withTimestampAssigner((order, time) -> order.getCreateTime())
);
//====上面的需要掌握======
//每隔5s计算每隔用户最近5s的订单总金额
KeyedStream<Order, Integer> keyedDS = orderDSWithWatermark.keyBy(order -> order.getUserId());
SingleOutputStreamOperator<Order> resultDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");
//TODO 4.sink-数据输出
resultDS.print();
//TODO 5.execute-执行
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
private String id;
private Integer userId;
private Integer money;
private Long createTime;
}
}
关键注意事项与进阶优化
- 最大延迟时间选择:需结合业务场景评估(如日志延迟通常 10 秒内,支付数据延迟可能达 1 分钟),过短会导致数据丢失,过长会影响窗口触发实时性;
- 并行度影响:多并行度下,每个并行子任务独立计算 Watermark,Flink 会取所有子任务中最小的 Watermark 作为全局 Watermark,需注意数据分区均衡性,避免个别子任务拖慢全局窗口触发。
- 侧输出流(Side Output):对超过最大延迟时间的重要数据,通过侧输出流单独收集,后续可离线补算或告警,避免核心数据丢失;
- 窗口 allowedLateness:在窗口定义时额外设置允许延迟时间(如window(...).allowedLateness(Time.seconds(2))),与 Watermark 配合形成 “双重延迟保障”,但需注意窗口状态会持续保留,可能增加内存开销。
- Flink 1.13 + 版本中,TimeCharacteristic.EventTime已成为默认值,无需显式调用env.setStreamTimeCharacteristic();低版本需手动设置,避免默认使用 ProcessingTime 导致业务错误。
核心总结
1.时间分类优先级:EventTime > IngestionTime > ProcessingTime,生产环境优先使用 EventTime,确保业务统计符合事件本质;
2.Watermark 核心价值:通过 “动态调整窗口触发时机”,在实时性与数据完整性间平衡,解决部分乱序和延迟问题,公式Watermark = 当前最大EventTime - 最大允许延迟时间需牢记;
3.代码落地关键:掌握WatermarkStrategy配置、EventTime 字段提取、窗口定义三步法,结合业务场景优化最大延迟时间与延迟数据处理方案,确保结果准确且实时。
- 点赞
- 收藏
- 关注作者
评论(0)