2021年大数据Flink(二十三):Watermaker案例演示
【摘要】
目录
Watermaker案例演示
需求
API
代码实现-1-开发版-掌握
代码实现-2-验证版-了解
Watermaker案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程...
目录
Watermaker案例演示
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
API
注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor
代码实现-1-开发版-掌握
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html
-
package cn.itcast.watermaker;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-
import org.apache.flink.streaming.api.windowing.time.Time;
-
-
import java.time.Duration;
-
import java.util.Random;
-
import java.util.UUID;
-
import java.util.concurrent.TimeUnit;
-
-
/**
-
* Author itcast
-
* Desc
-
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
-
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
-
*/
-
public class WatermakerDemo01_Develop {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
//模拟实时订单数据(数据有延迟和乱序)
-
DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
-
private boolean flag = true;
-
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (flag) {
-
String orderId = UUID.randomUUID().toString();
-
int userId = random.nextInt(3);
-
int money = random.nextInt(100);
-
//模拟数据延迟和乱序!
-
long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
-
ctx.collect(new Order(orderId, userId, money, eventTime));
-
-
TimeUnit.SECONDS.sleep(1);
-
}
-
}
-
-
@Override
-
public void cancel() {
-
flag = false;
-
}
-
});
-
-
//3.Transformation
-
//-告诉Flink要基于事件时间来计算!
-
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
-
//-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
-
/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(
-
new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间
-
@Override
-
public long extractTimestamp(Order element) {
-
return element.eventTime;
-
//指定事件时间是哪一列,Flink底层会自动计算:
-
//Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
-
}
-
});*/
-
DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
-
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
-
);
-
-
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
-
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
DataStream<Order> result = watermakerDS
-
.keyBy(Order::getUserId)
-
//.timeWindow(Time.seconds(5), Time.seconds(5))
-
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
.sum("money");
-
-
-
//4.Sink
-
result.print();
-
-
//5.execute
-
env.execute();
-
}
-
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class Order {
-
private String orderId;
-
private Integer userId;
-
private Integer money;
-
private Long eventTime;
-
}
-
}
代码实现-2-验证版-了解
-
-
package cn.itcast.watermaker;
-
-
import lombok.AllArgsConstructor;
-
import lombok.Data;
-
import lombok.NoArgsConstructor;
-
import org.apache.commons.lang3.time.FastDateFormat;
-
import org.apache.flink.api.common.eventtime.*;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-
import org.apache.flink.streaming.api.windowing.time.Time;
-
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
import org.apache.flink.util.Collector;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
import java.util.Random;
-
import java.util.UUID;
-
import java.util.concurrent.TimeUnit;
-
-
/**
-
* Author itcast
-
* Desc
-
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
-
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
-
*/
-
public class WatermakerDemo02_Check {
-
public static void main(String[] args) throws Exception {
-
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
-
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
//模拟实时订单数据(数据有延迟和乱序)
-
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
-
private boolean flag = true;
-
-
@Override
-
public void run(SourceContext<Order> ctx) throws Exception {
-
Random random = new Random();
-
while (flag) {
-
String orderId = UUID.randomUUID().toString();
-
int userId = random.nextInt(3);
-
int money = random.nextInt(100);
-
//模拟数据延迟和乱序!
-
long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
-
System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));
-
ctx.collect(new Order(orderId, userId, money, eventTime));
-
TimeUnit.SECONDS.sleep(1);
-
}
-
}
-
-
@Override
-
public void cancel() {
-
flag = false;
-
}
-
});
-
-
//3.Transformation
-
/*DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
-
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
-
);*/
-
-
//开发中直接使用上面的即可
-
//学习测试时可以自己实现
-
DataStream<Order> watermakerDS = orderDS
-
.assignTimestampsAndWatermarks(
-
new WatermarkStrategy<Order>() {
-
@Override
-
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
-
return new WatermarkGenerator<Order>() {
-
private int userId = 0;
-
private long eventTime = 0L;
-
private final long outOfOrdernessMillis = 3000;
-
private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
-
-
@Override
-
public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
-
userId = event.userId;
-
eventTime = event.eventTime;
-
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
-
}
-
-
@Override
-
public void onPeriodicEmit(WatermarkOutput output) {
-
//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间
-
Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
-
System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
-
output.emitWatermark(watermark);
-
}
-
};
-
}
-
}.withTimestampAssigner((event, timestamp) -> event.getEventTime())
-
);
-
-
-
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
-
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
-
/* DataStream<Order> result = watermakerDS
-
.keyBy(Order::getUserId)
-
//.timeWindow(Time.seconds(5), Time.seconds(5))
-
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
.sum("money");*/
-
-
//开发中使用上面的代码进行业务计算即可
-
//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间
-
DataStream<String> result = watermakerDS
-
.keyBy(Order::getUserId)
-
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
//把apply中的函数应用在窗口中的数据上
-
//WindowFunction<IN, OUT, KEY, W extends Window>
-
.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
-
@Override
-
public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
-
//准备一个集合用来存放属于该窗口的数据的事件时间
-
List<String> eventTimeList = new ArrayList<>();
-
for (Order order : input) {
-
Long eventTime = order.eventTime;
-
eventTimeList.add(df.format(eventTime));
-
}
-
String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",
-
key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
-
out.collect(outStr);
-
}
-
});
-
//4.Sink
-
result.print();
-
-
//5.execute
-
env.execute();
-
}
-
-
@Data
-
@AllArgsConstructor
-
@NoArgsConstructor
-
public static class Order {
-
private String orderId;
-
private Integer userId;
-
private Integer money;
-
private Long eventTime;
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116279701
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)