Flink高级API(四)

举报
Maynor学长 发表于 2022/07/22 22:30:33 2022/07/22
【摘要】 day04_Flink高级API 今日目标Flink的四大基石Flink窗口Window操作Flink时间 - TimeFlink水印 - Watermark机制Flink的state状态管理-keyed state 和 operator state Flink的四大基石Checkpoint - 检查点, 分布式一致性,解决数据丢失,故障恢复数据, 存储的是全局的状态, 持久化HDFS分布...

day04_Flink高级API

今日目标

  • Flink的四大基石
  • Flink窗口Window操作
  • Flink时间 - Time
  • Flink水印 - Watermark机制
  • Flink的state状态管理-keyed state 和 operator state

Flink的四大基石

  • Checkpoint - 检查点, 分布式一致性,解决数据丢失,故障恢复数据, 存储的是全局的状态, 持久化HDFS分布式文件系统中
  • State - 状态,分为Managed state(托管状态) 和 Rawed state (原始状态); 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
  • Time - 时间 , EventTime事件时间、Ingestion摄取时间、Process处理时间
  • Window - 窗口,时间窗口 和 计数窗口, TimeWindow 、 countwindow、 sessionwindow

Window操作

  • 为什么需要 Window - 窗口

    数据是动态的, 无界的, 需要窗口划定范围,将无界数据转换成有界、静态的数据进行计算。

Window分类

  • time - 时间进行分类

    • 时间的窗口级别, 一天,一小时,一分钟

    • 用的比较多 滚动窗口 - tumbling window 和 滑动窗口 - sliding window

    • 滚动窗口 ,窗口时间和滑动时间一样就是滚动时间

    • 滑动窗口, 滑动的时间小于窗口的时间;

    • 会话窗口 - session windows

  • count - 计数进行分类

    • 滚动计数窗口
    • 滑动计数窗口

如何使用

image-20210507090957187

windows的案例

时间窗口需求

  • 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
  • 每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
package cn.itcast.flink.basestone;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Date 2021/6/18 15:00
 * 开发步骤
 * 1. 将 字符串 9,3 转换成 CartInfo
 * 2. 使用 滚动窗口, 滑动窗口
 * 3. 分组和聚合
 * 4. 打印输出
 * 5. 执行环境
 */
public class WindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<CartInfo> mapDS = source.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        //4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和
        // Tumbling(滚动)Processing(处理)TimeWindows(时间窗口)
        //需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量
        SingleOutputStreamOperator<CartInfo> result1 = mapDS.keyBy(t -> t.sensorId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");
        //需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量
        SingleOutputStreamOperator<CartInfo> result2 = mapDS.keyBy(t -> t.sensorId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .sum("count");
        //5.打印输出
        //result1.print();
        result2.print();
        //6.execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

计数窗口需求

  • 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口

  • 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口

package cn.itcast.flink.basestone;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Date 2021/6/18 15:46
 * Desc TODO
 */
public class CountWindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
            @Override
            public WindowDemo01.CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //        //countWindow(long size, long slide)
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5, 3)
                .sum("count");

        //打印输出
        //result1.print();
        result2.print();
        //执行环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}
package cn.itcast.flink.basestone;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Date 2021/6/18 15:46
 * Desc TODO
 */
public class CountWindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
            @Override
            public WindowDemo01.CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //        //countWindow(long size, long slide)
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5, 3)
                .sum("count");

        //打印输出
        //result1.print();
        result2.print();
        //执行环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

Flink - Time 和 watermark

Time - 时间

image-20210618160007265

水印机制 - watermark

  • 主要解决数据延迟问题

  • 水印(时间戳) = 事件时间 - 允许最大的延时时间

  • 窗口触发条件

    水印时间 >= 窗口的结束时间 触发计算

image-20210618161943842

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s, 计算5秒内,每个用户的订单总金额

并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。

package cn.itcast.flink.basestone;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
 * Author itcast
 * Date 2021/6/18 16:54
 * Desc TODO
 */
public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
        DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
            boolean flag = true;
            Random rm = new Random();

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    ctx.collect(new Order(
                            UUID.randomUUID().toString(),
                            rm.nextInt(3),
                            rm.nextInt(101),
                            //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                            System.currentTimeMillis() - rm.nextInt(5) * 1000
                    ));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        DataStream<Order> result = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
        )
                //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                .keyBy(t -> t.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    //创建订单类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}
  • 自定义重写接口实现水印机制
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
 * Author itcast
 * Date 2021/6/18 16:54
 * Desc TODO
 */
public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
        DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
            boolean flag = true;
            Random rm = new Random();

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    ctx.collect(new Order(
                            UUID.randomUUID().toString(),
                            rm.nextInt(3),
                            rm.nextInt(101),
                            //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                            System.currentTimeMillis() - rm.nextInt(5) * 1000
                    ));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        DataStream<Order> result = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
        )
                //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                .keyBy(t -> t.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    //创建订单类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}
  • 一秒钟生成一条订单数据, 根据用户进行统计总金额,每5秒钟计算5秒中窗口, 允许最大的延时时间是 3秒; 如果超过了 3秒时间,再来的数据存储到 outputTag ,打印输出正常的数据和严重迟到的数据

    package cn.itcast.flink.basestone;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Author itcast
     * Date 2021/6/18 17:47
     * Desc TODO
     */
    public class LatenessWatermarkDemo {
        public static void main(String[] args) throws Exception {
            //此时间的将严重延迟的数据保存到 outputTag 中。
            /// 创建 outputTag 用于存储严重延迟的数据 数据类型为 Order
    
            //4.Sink 打印正常和严重延迟的数据
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
            DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
                boolean flag = true;
                Random rm = new Random();
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    while (flag) {
                        ctx.collect(new Order(
                                UUID.randomUUID().toString(),
                                rm.nextInt(3),
                                rm.nextInt(101),
                                //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                                System.currentTimeMillis() - rm.nextInt(10) * 1000
                        ));
                        //Thread.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            //定义一个允许最大的延时存储的数据 tag
            OutputTag<Order> seriousLateOrder = new OutputTag<>("SeriousLateOrder", TypeInformation.of(Order.class));
    
            //3.Transformation
            //-告诉Flink要基于事件时间来计算!
            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
            SingleOutputStreamOperator<Order> result = source.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                            .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
            )
                    //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                    //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                    //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                    .keyBy(t -> t.userId)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.seconds(3))
                    .sideOutputLateData(seriousLateOrder)
                    .sum("money");
            //4.Sink
            result.print("正常数据和不严重迟到的数据");
            result.getSideOutput(seriousLateOrder).print();
            //5.execute
            env.execute();
        }
        //创建订单类
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order{
            private String orderId;
            private Integer userId;
            private Integer money;
            private Long eventTime;
        }
    }
    
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。