实时即未来,大数据项目车联网之FlinkWatermark(水位线)(14)

举报
Maynor学长 发表于 2022/10/31 12:32:21 2022/10/31
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第14天,点击查看活动详情 1 Flink Watermark(水位线) 1 事件时间(event time)为了使用事件时间,Flink任务需要知道事件的时间戳(标记),这意味着流中的每个元素都需要分配其事件时间戳/水位线(watermark)。通常,水位线的分配是通过从元素的某个字段提...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第14天,点击查看活动详情

1 Flink Watermark(水位线)

1 事件时间(event time)

为了使用事件时间,Flink任务需要知道事件的时间戳(标记),这意味着流中的每个元素都需要分配其事件时间戳/水位线(watermark)。通常,水位线的分配是通过从元素的某个字段提取时间戳来完成的。时间戳分配与水位线生成齐头并进,水位线告诉系统事件时间的进展,时间类型分别为:EventTime、IngestionTime、ProcessingTime。

时间类别 描述
EventTime 事件发生的时间,例如:车辆数据中的服务器时间
IngestionTime 某个Flink节点的source operator接收到数据的时间,例如:某个source消费kafka中的数据
ProcessingTime 某个Flink节点执行某个operation的时间,例如:timeWindow接收到数据的时间

img

设置方式

img

水位线(watermark)可以理解为时间戳,是为了解决数据乱序问题,车辆上报的数据(企业收集上传数据),往往会存在数据上报乱序问题

生成水位线(分配时间)的两种方法:

  • 直接在数据流的source中

u 示例(未设置检查点数据源)

将时间戳分配为数据源中的元素,必须实现SourceFuntion中的run方法,使用collectWithTimestamp方法,设置水位线时间。生成水位线,数据源必须调用emitWatermark()函数

@Override
public void run(SourceContext<MyType> ctx) throws Exception {
  while (/* condition */) {
    MyType next = getNext();
    ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {
      ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
    }
  }}
}
  • 需要在特定事件表明可能会生成新的水位线时生成水位线(AssignerWithPunctuatedWatermarks)

img

  • 当需要在kafka数据源中,生成完美的水位线,则使用递增时间戳水位线生成器(AscendingTimestampExtractor)

img

  • 通过时间戳定期分配时间戳并生成水位线(AssignerWithPeriodicWatermarks ):在flink任务中,需定义要发送的水位线(项目采用此种方式\)

u 水位线生成器通常在数据源之后立即指定,但并非严格要求这么做,常见的情况是在FilterFunction和MapFunction之后指定水位线生成器

u 在数据源经过FilterFunction之后指定:

DataStreamSource<String> dataStreamSource = env.addSource(kafkaConsumer);
dataStreamSource.assignTimestampsAndWatermarks(new MyWatermarks());

u 在窗口应用时指定:

WindowedStream<ItcastDataObj, String, TimeWindow> drivingDstream = itcastJsonDStream
        .assignTimestampsAndWatermarks(new TripDriveWatermark())
        .keyBy(ItcastDataObj :: getVin)
        .window(EventTimeSessionWindows.withGap(Time.minutes(15)));

2 Flink Window Assigners(窗口分配器)

窗口分配器定义了如何将元素分配给窗口。这是通过WindowAssigner 在window()(针对key排序)或windowAll()(针对非key排序)调用中指定选择的选项来完成。窗口分配器将负责将每个传入元素分配给一个或多个窗口,Flink带有针对最常见用例的预定义窗口分配器,即滚动窗口, 滑动窗口,会话窗口和全局窗口。您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)均基于时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。

基于时间的窗口具有端(开始时间戳)到端(结束时间戳),共同描述窗口大小。在驾驶行程中,驾驶行程任务时间影响因为设置为基于事件时间(TimeCharacteristic.EventTime),车辆15分钟划定一个窗口,假设车辆最早的一条数据终端时间为2020年7月1日17点,则第一个窗口的端点由17点到17点15分内的车辆数据终端时间最早的一条数据与最晚的一条数据为起始端点和结束端点\。

2.1 Tumbling Windows(翻滚窗口)

翻滚分配器受让人的每个元素到指定的窗口的窗口大小。滚动窗口具有固定的大小,并且不重叠。例如,如果您指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示:(紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度)

img

2.2 Sliding Windows(滑动窗口)

滑动窗口分配器让消息以固定长度的窗口进行计算。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口启动的频率。因此,窗口与滑动步长存在以下三种关系:

当窗口等于步长:就是翻滚窗口,不会重复消费数据也不会丢失数据。

当窗口小于步长:会造成丢失数据。

当窗口大于步长:会造成重复消费。

例如,您可以设置10分钟的窗口,而滑动步长为5分钟。这样,您每隔5分钟就会得到一个窗口,其中包含最近10分钟内到达的事件,如下图所示:

img

2.3 Session Windows(会话窗口)

会话窗口通过session活动来对元素进行分组。与滚动窗口和滑动窗口相比,没有窗口重叠且没有固定的开始和结束时间。相反,当它在一个固定的时间周期内不再收到元素,即会话窗口就会关闭。

会话窗口通过一个间隔时间(gap)来配置,这个间隔定义了非活跃周期的长度。当该时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。

img

2.4 Global Windows(全局窗口)

全局窗口指的是每次都是对全局的数据进行一次计算。仅当指定自定义触发器(Trigger)时,此窗口方案才有用。否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然终点。

img

2.5 窗口函数(Window Function)

定义窗口分配器后,需要指定要在每个窗口上执行的计算,这是窗口函数的职责,一旦窗口准备好进行数据处理,就可以处理每个(key控制)窗口的元素。

l ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction,前两个可以更有效地执行,因为Flink可以在每个窗口到达时以增量方式聚合它们。ProcessWindowFunction获取Iterable窗口中包含的所有元素的,以及有关元素所属窗口的其他元信息,因此会在内部缓存窗口的所有元素。

l WindowFunction与ProcessWindowFunction对比

n WindowFunction是简化版或者说旧版的ProcessWindowFunction

n ProcessWindowFunction有高级功能,获取窗口键状态等。

  • 在可以使用WindowFunction的场景,一定也可以用ProcessWindowFunction
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。