2021年大数据Flink(二十四):Allowed Lateness案例演示
【摘要】
Allowed Lateness案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
并使用OutputTag+allowedLateness解决数据丢失问题
...
Allowed Lateness案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
并使用OutputTag+allowedLateness解决数据丢失问题
API
-
package cn.itcast.watermaker;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-
import org.apache.flink.api.common.typeinfo.TypeInformation;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-
import org.apache.flink.streaming.api.windowing.time.Time;
-
import org.apache.flink.util.OutputTag;
-
-
import java.time.Duration;
-
import java.util.Random;
-
import java.util.UUID;
-
-
/**
-
* Author itcast
-
* Desc
-
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
-
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
-
*/
-
public class WatermakerDemo03_AllowedLateness {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
//模拟实时订单数据(数据有延迟和乱序)
-
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
-
private boolean flag = true;
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (flag) {
-
String orderId = UUID.randomUUID().toString();
-
int userId = random.nextInt(3);
-
int money = random.nextInt(100);
-
//模拟数据延迟和乱序!
-
long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
-
ctx.collect(new Order(orderId, userId, money, eventTime));
-
-
//TimeUnit.SECONDS.sleep(1);
-
}
-
}
-
@Override
-
public void cancel() {
-
flag = false;
-
}
-
});
-
-
-
//3.Transformation
-
DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
-
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
-
);
-
-
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
-
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));
-
-
SingleOutputStreamOperator<Order> result = watermakerDS
-
.keyBy(Order::getUserId)
-
//.timeWindow(Time.seconds(5), Time.seconds(5))
-
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
.allowedLateness(Time.seconds(5))
-
.sideOutputLateData(outputTag)
-
.sum("money");
-
-
DataStream<Order> result2 = result.getSideOutput(outputTag);
-
-
//4.Sink
-
result.print("正常的数据和迟到不严重的数据");
-
result2.print("迟到严重的数据");
-
-
//5.execute
-
env.execute();
-
}
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class Order {
-
private String orderId;
-
private Integer userId;
-
private Integer money;
-
private Long eventTime;
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116279739
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)