Flink高级特性和新特性(八)v2
双流 JOIN
-
多个数据流 DataStream 之间进行 JOIN 操作
-
双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join
-
Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口
-
Interval 包括 下届, 上届
-
需求
订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出
-
开发步骤
package cn.itcast.flink.broadcast; import com.alibaba.fastjson.JSON; import lombok.Data; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */ public class JoinDemo { public static void main(String[] args) throws Exception { // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 构建商品数据流 SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 订单表 join 商品表 订单表.goodsId===商品表.goodsId DataStream<FactOrderItem> result = orderItemSource.join(goodsSource) .where(o -> o.goodsId) .equalTo(g -> g.goodsId) /// 窗口为滚动窗口 5 秒 .window(TumblingEventTimeWindows.of(Time.seconds(5))) /// apply 实现 (OrderItem first, Goods second) -> factOrderItem .apply((OrderItem first, Goods second) -> { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(first.goodsId); factOrderItem.setGoodsName(second.goodsName); factOrderItem.setCount(new BigDecimal(first.count)); factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice)); return factOrderItem; }); //打印输出 result.print(); //执行环境 env.execute(); } //商品类实体类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细实体类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果,落地表的实体表 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource extends RichSourceFunction<Goods> { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction<OrderItem> { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } }
-
-
需求
将商品数据和订单明细数据进行关联,间隔,上届 0(不包含),下届 -1(包含),统计数据并落地
-
开发步骤
package cn.itcast.flink.broadcast; import com.alibaba.fastjson.JSON; import lombok.Data; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Desc */ public class JoinDemo02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建商品数据流 DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 进行关联查询 SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()) .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())) .between(Time.seconds(-1), Time.seconds(0)) //上届的开区间,排除掉上届 [-1,0) .upperBoundExclusive() .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() { @Override public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(right.getGoodsId()); factOrderItem.setGoodsName(right.getGoodsName()); factOrderItem.setCount(new BigDecimal(left.getCount())); factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount()))); out.collect(factOrderItem); } }); factOrderItemDS.print(); env.execute("Interval JOIN"); } //商品类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource11 extends RichSourceFunction { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } }
Streaming File Sink
-
Sink 落地
-
Sink 分类
- sink MySQL
- sink Kafka
- sink Redis
- sink 控制台
-
Sink 落地到分布式文件系统上 HDFS 上
-
Sink 到文件系统 Streaming File Sink 落地使用应用场景
- 实时数据仓库
- 小时级的数据分析 等
- 抽取数据
-
需求
通过在 socket 数据流中将数据定时 2秒钟写入到 hdfs 上。
-
开发步骤
package cn.itcast.flink.broadcast;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
/**
* Author itcast
* Date 2021/6/24 10:52
* Desc TODO
*/
public class StreamingFileSinkDemo {
public static void main(String[] args) throws Exception {
//1.初始化流计算运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径
// Sink保证仅一次语义使用 checkpoint 和 二段提交
env.enableCheckpointing(10000);
env.setStateBackend(new FsStateBackend("file:///d:/chk/"));
//4.接入socket数据源,获取数据
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//5.创建Streamingfilesink对象
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("crm")
.withPartSuffix(".txt")
.build();
//5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet
StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"),
new SimpleStringEncoder<String>("UTF-8"))
// sink-kafka new FlinkKafkaProducer
//5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> 默认滚筒策略
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> 输出文件的配置
.withOutputFileConfig(config)
.build();
//6.设置输出 sink
source.print();
source.addSink(sink).setParallelism(1);
//7.执行任务
env.execute();
}
}
Sink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> 默认滚筒策略
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> 输出文件的配置
.withOutputFileConfig(config)
.build();
//6.设置输出 sink
source.print();
source.addSink(sink).setParallelism(1);
//7.执行任务
env.execute();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)