Flink高级API(三)
【摘要】 day03_Flink高级API 今日目标Flink的四大基石Flink窗口Window操作Flink时间TimeFlink水印Watermark机制Flink的state状态管理-keyed state 和 operator state Flink的四大基石Checkpoint 分布式一致性,解决数据丢失,故障恢复数据State 状态,分为Keyed State ,Operator St...
day03_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间Time
- Flink水印Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
- State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window窗口,TimeWindow 、 countwindow、 sessionwindow
Window操作
Window分类
- time
- 用的比较多 滚动窗口和滑动窗口
- count
如何使用
案例
- 需求
/**
* Author itcast
* Date 2021/5/7 9:13
* 有如下数据表示:
* 信号灯编号和通过该信号灯的车的数量
* 9,3
* 9,2
* 9,7
* 4,9
* 2,6
* 1,5
* 2,3
* 5,7
* 5,4
* 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
* 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
*/
-
分析
-
代码
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */ public class WindowDemo { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .sum("count"); //统计 滑动窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //4.打印输出 result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
-
需求2 - countwindow
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
-
代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */ public class WindowDemo02 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动计数窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .countWindow(5) .sum("count"); //统计 滑动计数窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .countWindow(10,5) .sum("count"); //4.打印输出 //result.printToErr(); result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
-
统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。
-
案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
-
代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 */ public class WindowDemo03 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 会话窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.打印输出 //result.printToErr(); result.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
Time 时间
- EventTime的重要性
- 防止出现网络抖动,造成数据的乱序,数据统计的丢失
- 窗口: 开始时间-结束时间
watermark 水印时间
-
watermark 水印机制
- watermark 就是时间戳
- watermark = eventTime - maxDelayTime
-
触发计算 watermak >= 结束时间
watermark 案例
-
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
基础版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); //分配水印机制 SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy //指定最大的延迟时间 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp); .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) //统计每个用户对应 购买 金额 .keyBy(t -> t.getUserId()) //指定窗口,每5秒钟统计5秒钟之内的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); sum.print(); // env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
扩展版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); private int userId = 0; private long eventTime = 0L; private final long outOfOrdernessMillis = 3000; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //Watermark = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 时间戳 Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1); System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp())); output.emitWatermark(watermark); } }; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 /* DataStream<Order> result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money");*/ //开发中使用上面的代码进行业务计算即可 //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermark时间 DataStream<String> result = WatermarkDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) //把apply中的函数应用在窗口中的数据上 //WindowFunction<IN, OUT, KEY, W extends Window> .apply(new WindowFunction<Order, String, Integer, TimeWindow>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); @Override public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception { //准备一个集合用来存放属于该窗口的数据的事件时间 List<String> eventTimeList = new ArrayList<>(); for (Order order : input) { Long eventTime = order.eventTime; eventTimeList.add(df.format(eventTime)); } String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); result.printToErr(); env.execute(); //分配水印机制 } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
Allowed lateness
-
案例
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/5/7 14:51
* 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermarkDemo03 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//模拟实时订单数据(数据有延迟和乱序)
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
//模拟数据延迟和乱序!
long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
ctx.collect(new Order(orderId, userId, money, eventTime));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
flag = false;
}
});
OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));
//分配水印机制 eventTime 默认使用 maxDelay 3秒
SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
.keyBy(t -> t.getUserId())
//窗口设置 每隔5s,计算5秒内
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//实例化侧输出流 主要用于晚于最大延迟 3 秒的数据
.allowedLateness(Time.seconds(3))
.sideOutputLateData(oot)
//统计
.sum("money");
result.print("正常数据");
result.getSideOutput(oot).print("严重迟到的数据");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)