Flink高级特性和新特性(八)v2

举报
Maynor学长 发表于 2022/07/24 11:51:37 2022/07/24
【摘要】 双流 JOIN多个数据流 DataStream 之间进行 JOIN 操作双流 JOIN 分为两大类: Window 窗口的join, Interval 的 joinWindow窗口 分为 tumbling 窗口, sliding 窗口, session 窗口Interval 包括 下届, 上届需求订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出开发步骤pack...

双流 JOIN

  • 多个数据流 DataStream 之间进行 JOIN 操作

  • 双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join

  • Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口

  • Interval 包括 下届, 上届

    image-20210624093501057

  • 需求

    订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出

    • 开发步骤

      package cn.itcast.flink.broadcast;
      
      import com.alibaba.fastjson.JSON;
      import lombok.Data;
      import org.apache.flink.api.common.eventtime.*;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
      import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      
      import java.math.BigDecimal;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Random;
      import java.util.UUID;
      import java.util.concurrent.TimeUnit;
      
      /**
       * Author itcast
       * Date 2021/6/24 9:40
       * Desc TODO
       */
      public class JoinDemo {
          public static void main(String[] args) throws Exception {
              // 创建流执行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              // 构建商品数据流
              SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());
              // 构建订单明细数据流
              SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());
              // 订单表 join 商品表 订单表.goodsId===商品表.goodsId
              DataStream<FactOrderItem> result = orderItemSource.join(goodsSource)
                      .where(o -> o.goodsId)
                      .equalTo(g -> g.goodsId)
                      /// 窗口为滚动窗口 5 秒
                      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                      /// apply 实现 (OrderItem first, Goods second) -> factOrderItem
                      .apply((OrderItem first, Goods second) -> {
                          FactOrderItem factOrderItem = new FactOrderItem();
                          factOrderItem.setGoodsId(first.goodsId);
                          factOrderItem.setGoodsName(second.goodsName);
                          factOrderItem.setCount(new BigDecimal(first.count));
                          factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice));
                          return factOrderItem;
                      });
              //打印输出
              result.print();
              //执行环境
              env.execute();
          }
          //商品类实体类
          @Data
          public static class Goods {
              private String goodsId;
              private String goodsName;
              private BigDecimal goodsPrice;
      
              public static List<Goods> GOODS_LIST;
              public static Random r;
      
              static  {
                  r = new Random();
                  GOODS_LIST = new ArrayList<>();
                  GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
                  GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
                  GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
                  GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
                  GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
                  GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
              }
      
              public static Goods randomGoods() {
                  int rIndex = r.nextInt(GOODS_LIST.size());
                  return GOODS_LIST.get(rIndex);
              }
      
              public Goods() {
              }
      
              public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
                  this.goodsId = goodsId;
                  this.goodsName = goodsName;
                  this.goodsPrice = goodsPrice;
              }
      
              @Override
              public String toString() {
                  return JSON.toJSONString(this);
              }
          }
      
          //订单明细实体类
          @Data
          public static class OrderItem {
              private String itemId;
              private String goodsId;
              private Integer count;
      
              @Override
              public String toString() {
                  return JSON.toJSONString(this);
              }
          }
      
          //关联结果,落地表的实体表
          @Data
          public static class FactOrderItem {
              private String goodsId;
              private String goodsName;
              private BigDecimal count;
              private BigDecimal totalMoney;
              @Override
              public String toString() {
                  return JSON.toJSONString(this);
              }
          }
          //构建一个商品Stream源(这个好比就是维表)
          public static class GoodsSource extends RichSourceFunction<Goods> {
              private Boolean isCancel;
              @Override
              public void open(Configuration parameters) throws Exception {
                  isCancel = false;
              }
              @Override
              public void run(SourceContext sourceContext) throws Exception {
                  while(!isCancel) {
                      Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                      TimeUnit.SECONDS.sleep(1);
                  }
              }
              @Override
              public void cancel() {
                  isCancel = true;
              }
          }
          //构建订单明细Stream源
          public static class OrderItemSource extends RichSourceFunction<OrderItem>
          {
              private Boolean isCancel;
              private Random r;
              @Override
              public void open(Configuration parameters) throws Exception {
                  isCancel = false;
                  r = new Random();
              }
              @Override
              public void run(SourceContext sourceContext) throws Exception {
                  while(!isCancel) {
                      Goods goods = Goods.randomGoods();
                      OrderItem orderItem = new OrderItem();
                      orderItem.setGoodsId(goods.getGoodsId());
                      orderItem.setCount(r.nextInt(10) + 1);
                      orderItem.setItemId(UUID.randomUUID().toString());
                      sourceContext.collect(orderItem);
                      orderItem.setGoodsId("111");
                      sourceContext.collect(orderItem);
                      TimeUnit.SECONDS.sleep(1);
                  }
              }
      
              @Override
              public void cancel() {
                  isCancel = true;
              }
          }
          //构建水印分配器(此处为了简单),直接使用系统时间了
          public static class GoodsWatermark implements WatermarkStrategy<Goods> {
      
              @Override
              public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                  return (element, recordTimestamp) -> System.currentTimeMillis();
              }
      
              @Override
              public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                  return new WatermarkGenerator<Goods>() {
                      @Override
                      public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
      
                      @Override
                      public void onPeriodicEmit(WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
                  };
              }
          }
      
          public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
              @Override
              public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                  return (element, recordTimestamp) -> System.currentTimeMillis();
              }
              @Override
              public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                  return new WatermarkGenerator<OrderItem>() {
                      @Override
                      public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
                      @Override
                      public void onPeriodicEmit(WatermarkOutput output) {
                          output.emitWatermark(new Watermark(System.currentTimeMillis()));
                      }
                  };
              }
          }
      }
      
  • 需求

    将商品数据和订单明细数据进行关联,间隔,上届 0(不包含),下届 -1(包含),统计数据并落地

  • 开发步骤

    package cn.itcast.flink.broadcast;
    
    import com.alibaba.fastjson.JSON;
    import lombok.Data;
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    import java.math.BigDecimal;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Author itcast
     * Desc
     */
    public class JoinDemo02 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 构建商品数据流
            DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());
            // 构建订单明细数据流
            DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());
    
            // 进行关联查询
            SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                    .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                    .between(Time.seconds(-1), Time.seconds(0))
                    //上届的开区间,排除掉上届 [-1,0)
                    .upperBoundExclusive()
                    .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                        @Override
                        public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                            FactOrderItem factOrderItem = new FactOrderItem();
                            factOrderItem.setGoodsId(right.getGoodsId());
                            factOrderItem.setGoodsName(right.getGoodsName());
                            factOrderItem.setCount(new BigDecimal(left.getCount()));
                            factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));
    
                            out.collect(factOrderItem);
                        }
                    });
    
            factOrderItemDS.print();
    
            env.execute("Interval JOIN");
        }
        //商品类
        @Data
        public static class Goods {
            private String goodsId;
            private String goodsName;
            private BigDecimal goodsPrice;
    
            public static List<Goods> GOODS_LIST;
            public static Random r;
    
            static  {
                r = new Random();
                GOODS_LIST = new ArrayList<>();
                GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
                GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
                GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
                GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
                GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
                GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
            }
    
            public static Goods randomGoods() {
                int rIndex = r.nextInt(GOODS_LIST.size());
                return GOODS_LIST.get(rIndex);
            }
    
            public Goods() {
            }
    
            public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
                this.goodsId = goodsId;
                this.goodsName = goodsName;
                this.goodsPrice = goodsPrice;
            }
    
            @Override
            public String toString() {
                return JSON.toJSONString(this);
            }
        }
    
        //订单明细类
        @Data
        public static class OrderItem {
            private String itemId;
            private String goodsId;
            private Integer count;
    
            @Override
            public String toString() {
                return JSON.toJSONString(this);
            }
        }
    
        //关联结果
        @Data
        public static class FactOrderItem {
            private String goodsId;
            private String goodsName;
            private BigDecimal count;
            private BigDecimal totalMoney;
            @Override
            public String toString() {
                return JSON.toJSONString(this);
            }
        }
        //构建一个商品Stream源(这个好比就是维表)
        public static class GoodsSource11 extends RichSourceFunction {
            private Boolean isCancel;
            @Override
            public void open(Configuration parameters) throws Exception {
                isCancel = false;
            }
            @Override
            public void run(SourceContext sourceContext) throws Exception {
                while(!isCancel) {
                    Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                isCancel = true;
            }
        }
        //构建订单明细Stream源
        public static class OrderItemSource extends RichSourceFunction {
            private Boolean isCancel;
            private Random r;
            @Override
            public void open(Configuration parameters) throws Exception {
                isCancel = false;
                r = new Random();
            }
            @Override
            public void run(SourceContext sourceContext) throws Exception {
                while(!isCancel) {
                    Goods goods = Goods.randomGoods();
                    OrderItem orderItem = new OrderItem();
                    orderItem.setGoodsId(goods.getGoodsId());
                    orderItem.setCount(r.nextInt(10) + 1);
                    orderItem.setItemId(UUID.randomUUID().toString());
                    sourceContext.collect(orderItem);
                    orderItem.setGoodsId("111");
                    sourceContext.collect(orderItem);
                    TimeUnit.SECONDS.sleep(1);
                }
            }
    
            @Override
            public void cancel() {
                isCancel = true;
            }
        }
        //构建水印分配器(此处为了简单),直接使用系统时间了
        public static class GoodsWatermark implements WatermarkStrategy<Goods> {
    
            @Override
            public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return (element, recordTimestamp) -> System.currentTimeMillis();
            }
    
            @Override
            public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Goods>() {
                    @Override
                    public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
    
                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
                };
            }
        }
    
        public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
            @Override
            public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return (element, recordTimestamp) -> System.currentTimeMillis();
            }
            @Override
            public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<OrderItem>() {
                    @Override
                    public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(System.currentTimeMillis()));
                    }
                };
            }
        }
    }
    

Streaming File Sink

  • Sink 落地

  • Sink 分类

    1. sink MySQL
    2. sink Kafka
    3. sink Redis
    4. sink 控制台
  • Sink 落地到分布式文件系统上 HDFS 上

  • Sink 到文件系统 Streaming File Sink 落地使用应用场景

    1. 实时数据仓库
    2. 小时级的数据分析 等
    3. 抽取数据
  • 需求

    通过在 socket 数据流中将数据定时 2秒钟写入到 hdfs 上。

  • 开发步骤

package cn.itcast.flink.broadcast;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

/**
 * Author itcast
 * Date 2021/6/24 10:52
 * Desc TODO
 */
public class StreamingFileSinkDemo {
    public static void main(String[] args) throws Exception {
        //1.初始化流计算运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径
        // Sink保证仅一次语义使用 checkpoint 和 二段提交
        env.enableCheckpointing(10000);
        env.setStateBackend(new FsStateBackend("file:///d:/chk/"));

        //4.接入socket数据源,获取数据
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //5.创建Streamingfilesink对象
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("crm")
                .withPartSuffix(".txt")
                .build();
        //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet
        StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"),
                new SimpleStringEncoder<String>("UTF-8"))
                // sink-kafka new FlinkKafkaProducer
                //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
                .withBucketAssigner(new DateTimeBucketAssigner())
                //withRollingPolicy -> 默认滚筒策略
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(128 * 1024 * 1024)
                        .withRolloverInterval(Time.seconds(2).toMilliseconds())
                        .withInactivityInterval(Time.seconds(2).toMilliseconds())
                        .build())
        //withOutputFileConfig -> 输出文件的配置
                .withOutputFileConfig(config)
                .build();
        //6.设置输出 sink
        source.print();
        source.addSink(sink).setParallelism(1);
        //7.执行任务
        env.execute();
    }
}

Sink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> 默认滚筒策略
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> 输出文件的配置
.withOutputFileConfig(config)
.build();
//6.设置输出 sink
source.print();
source.addSink(sink).setParallelism(1);
//7.执行任务
env.execute();
}
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。