FlinkTable&SQL(七)

举报
Maynor学长 发表于 2022/07/24 11:48:58 2022/07/24
【摘要】 动态表 & 连续查询动态表就是无界的数据表, 源源不断的将数据输入和输出需求: 使用SQL和Table两种方式对DataStream中的单词进行统计。package cn.itcast.flink.sql;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import or...

动态表 & 连续查询

  • 动态表就是无界的数据表, 源源不断的将数据输入和输出

  • 需求: 使用SQL和Table两种方式对DataStream中的单词进行统计。

    package cn.itcast.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.util.Arrays;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    /**
     * Author itcast
     * Date 2021/6/22 11:16
     * Desc TODO
     */
    public class FlinkSQLDemo {
        public static void main(String[] args) throws Exception {
            //1.准备环境 获取流执行环境 流表环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //流表环境
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
            //2.Source 获取 单词信息
            //2.Source
            DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                    new Order(1L, "beer", 3),
                    new Order(1L, "diaper", 4),
                    new Order(3L, "rubber", 2)));
    
            DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                    new Order(2L, "pen", 3),
                    new Order(2L, "rubber", 3),
                    new Order(4L, "beer", 1)));
            //3.创建视图 WordCount
            tEnv.createTemporaryView("t_order",orderA,$("user"),$("product"),$("amount"));
            //4.执行查询 根据用户统计订单总量
            Table table = tEnv.sqlQuery(
                    "select user,sum(amount) as totalAmount " +
                            " from t_order " +
                            " group by user "
            );
            //5.输出结果 retractStream获取数据流(别名)
            DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(table, Row.class);
            //6.打印输出结果
            result.print();
            //7.执行
            env.execute();
        }
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class Order {
            public Long user;
            public String product;
            public int amount;
        }
    }
    
  • 需求

    单词统计,统计出来单词的出现次数为2 的单词的数据流打印输出,使用 Flink Table

  • 开发步骤

    package cn.itcast.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    /**
     * Author itcast
     * Date 2021/6/22 11:29
     * Desc TODO
     */
    public class FlinkTableDemo {
        public static void main(String[] args) throws Exception {
            //1.准备环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
            //2.Source
            DataStream<WC> input = env.fromElements(
                    new WC("Hello", 1),
                    new WC("World", 1),
                    new WC("Hello", 1)
            );
            //3.注册表
            Table table = tEnv.fromDataStream(input, $("word"), $("frequency"));
    
            //4.通过 FLinkTable API 过滤分组查询
            // select word,count(frequency) as frequency
            // from table
            // group by word
            // having count(frequency)=2;
            Table filter = table
                    .groupBy($("word"))
                    .select($("word"),
                            $("frequency").count().as("frequency"))
                    .filter($("frequency").isEqual(2));
    
            //5.将结果集转换成 DataStream
            DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(filter, Row.class);
            //6.打印输出
            result.print();
            //7.执行
            env.execute();
        }
        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class WC {
            public String word;
            public long frequency;
        }
    }
    
  • 需求

    使用Flink SQL来统计 5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额

    也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额

    上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!

    那么接下来使用FlinkTable&SQL-API来实现

  • 开发步骤

    package cn.itcast.flink.SQL;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    /**
     * Author itcast
     * Date 2021/6/23 8:42
     * Desc TODO
     */
    public class FlinkTableWindow {
        public static void main(String[] args) throws Exception {
            //1.准备环境 创建流执行环境和流表环境
            //准备流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置Flink table配置
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            //准备流表环境
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
            //2.Source 自定义Order 每一秒中睡眠一次
            DataStreamSource<Order> source = env.addSource(new MyOrder());
            //3.Transformation 分配时间戳和水印2秒
            SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                    Duration.ofSeconds(2)
            ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
    
            //4.注册表 创建临时视图并在事件时间上分配 rowtime
            tEnv.createTemporaryView("t_order",
                    watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
            //5.编写SQL,根据 userId 和 createTime 滚动分组统计 userId、订单总笔数、最大、最小金额
            String sql="SELECT userId,count(orderId) totalCount,max(money) maxMoney,min(money) minMoney " +
                    "FROM t_order " +
                    "group by userId," +
                    "tumble(createTime,interval '5' second)";
            //6.执行查询语句返回结果
            Table resultTable = tEnv.sqlQuery(sql);
            //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
            DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
            //8.打印输出
            result.print();
            //9.执行
            env.execute();
        }
    
        public static class MyOrder extends RichSourceFunction<Order> {
            Random rm = new Random();
            boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while(flag) {
                    String oid = UUID.randomUUID().toString();
                    int uid = rm.nextInt(3);
                    int money = rm.nextInt(101);
                    long createTime = System.currentTimeMillis();
                    //收集数据
                    ctx.collect(new Order(oid, uid, money, createTime));
                    Thread.sleep(1000);
                }
            }
    
            @Override
            public void cancel() {
                flag = false;
            }
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            //订单id
            private String orderId;
            //用户id
            private Integer userId;
            //订单金额
            private Integer money;
            //事件时间
            private Long createTime;
        }
    }
    
  • 需求 使用 FlinkTable API 来实现订单的总笔数,最大金额和最小金额根据用户id

  • 开发步骤

    package cn.itcast.flink.SQL;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.lit;
    
    /**
     * Author itcast
     * Date 2021/6/23 9:20
     * Desc TODO
     */
    public class FlinkTableAPIWindow {
        public static void main(String[] args) throws Exception {
    //1.准备环境 创建流执行环境和流表环境
            //准备流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置Flink table配置
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            //准备流表环境
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
            //2.Source 自定义Order 每一秒中睡眠一次
            DataStreamSource<Order> source = env.addSource(new MyOrder());
            //3.Transformation 分配时间戳和水印2秒
            SingleOutputStreamOperator<Order> watermarkDS = source.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(
                    Duration.ofSeconds(2)
            ).withTimestampAssigner((element, recordTimestamp) -> element.createTime));
    
            //4.注册表 创建临时视图并在事件时间上分配 rowtime
            tEnv.createTemporaryView("t_order",
                    watermarkDS,$("orderId"),$("userId"),$("money"),$("createTime").rowtime());
            //5.TableAPI查询
            // 获取 TableApi
            Table t_order = tEnv.from("t_order");
            //6.TableAPI 订单的统计,根据用户id 统计订单金额,最大金额和最小金额
    
            Table resultTable = t_order
                    //6.1 根据窗口 window 分组,先有个滚动 window 窗口
                    .window(Tumble.over(lit(5).second())
                    .on($("createTime")).as("tumbleWindow"))
                    //6.2 对用户id 和 时间window 窗口 分组
                    .groupBy($("tumbleWindow"), $("userId"))
                    //6.3 查询出来对用订单总笔数和最大金额和最小金额
                    .select($("userId"), $("orderId").count().as("totalCount")
                            , $("money").max().as("maxMoney")
                            , $("money").min().as("minMoney"));
    
            //7.Sink toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
            DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(resultTable, Row.class);
            //8.打印输出
            result.print();
            //9.执行
            env.execute();
        }
    
        public static class MyOrder extends RichSourceFunction<Order> {
            Random rm = new Random();
            boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while(flag) {
                    String oid = UUID.randomUUID().toString();
                    int uid = rm.nextInt(3);
                    int money = rm.nextInt(101);
                    long createTime = System.currentTimeMillis();
                    //收集数据
                    ctx.collect(new Order(oid, uid, money, createTime));
                    Thread.sleep(1000);
                }
            }
    
            @Override
            public void cancel() {
                flag = false;
            }
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            //订单id
            private String orderId;
            //用户id
            private Integer userId;
            //订单金额
            private Integer money;
            //事件时间
            private Long createTime;
        }
    }
    
  • 需求 将kafka中的json字符串映射成一张Flink表,对这张表进行过滤分组聚合操作之后落地到 Kafka的表中

    如果不用 FlinkTable 直接使用 Flink DataStream 能做吗?

    1. 读取 Kafka 数据源 FlinkKafkaConsumer
    2. 将Json字符串转换成 Java Bean
    3. Flink的 filter算子 进行过滤 .filter(t->t.status.equal(“success”))
    4. 将对象 map 转换成 JSON.toJsonString => json string
    5. 写入 Kafka FlinkKafkaProducer

    使用 Flink TableApi 来实现过滤 status=“status”

  • 开发步骤

    package cn.itcast.flink.SQL;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    /**
     * Author itcast
     * Date 2021/6/23 9:46
     * Desc TODO
     */
    public class FlinkTableKafka {
        public static void main(String[] args) throws Exception {
            //1.准备环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //创建流表环境
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
            //2.Source
            //从 kafka 中直接映射到输入表
            TableResult inputTable = tEnv.executeSql(
                    "CREATE TABLE input_kafka (\n" +
                            "  `user_id` BIGINT,\n" +
                            "  `page_id` BIGINT,\n" +
                            "  `status` STRING\n" +
                            ") WITH (\n" +
                            "  'connector' = 'kafka',\n" +  //连接的数据源是 kafka
                            "  'topic' = 'input_kafka',\n" + //映射的主题topic
                            "  'properties.bootstrap.servers' = 'node1:9092,node2:9092,node3:9092',\n" + //kafka地址
                            "  'properties.group.id' = 'default',\n" + //kafka消费的消费组
                            "  'scan.startup.mode' = 'latest-offset',\n" + //从最新的位置扫描
                            "  'format' = 'json'\n" +  //扫描的数据是格式:json格式
                            ")"
            );
            //从 kafka 中映射一个输出表
            TableResult outputTable = tEnv.executeSql(
                    "CREATE TABLE output_kafka (\n" +
                            "  `user_id` BIGINT,\n" +
                            "  `page_id` BIGINT,\n" +
                            "  `status` STRING\n" +
                            ") WITH (\n" +
                            "  'connector' = 'kafka',\n" +
                            "  'topic' = 'output_kafka',\n" +
                            "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                            "  'format' = 'json',\n" +
                            "  'sink.partitioner' = 'round-robin'\n" +  //分区的方式,轮训
                            ")"
            );
    
            String sql = "select " +
                    "user_id," +
                    "page_id," +
                    "status " +
                    "from input_kafka " +
                    "where status = 'success'";
    
            Table ResultTable = tEnv.sqlQuery(sql);
    
            DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
            resultDS.print();
    
            //将满足 status = 'success' 的记录存储到 output_kafka 落地表中
            tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
    
            //7.excute
            env.execute();
        }
    }
    
  • 总结

    1. input_kafka 的 topic ,基于这个topic 创建一个临时表 input_kafka
    2. 基于output_kafka 的topic , output_kafka 表
    3. 读出来每一条数据并过滤出来 status=“success” 数据
    4. insert into output_kafka select * from input_kafka
    5. 直接在 output_kafka 这个topic 消费到数据
  • 可选项

    Option Required Default Type Description
    connector required (none) String Specify what connector to use, for Kafka use 'kafka'.
    topic required for sink (none) String Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of “topic-pattern” and “topic” can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
    topic-pattern optional (none) String The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of “topic-pattern” and “topic” can be specified for sources.
    properties.bootstrap.servers required (none) String Comma separated list of Kafka brokers.
    properties.group.id required by source (none) String The id of the consumer group for Kafka source, optional for Kafka sink.
    properties.* optional (none) String This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the “properties.” key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
    format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'value.format' option are required.
    key.format optional (none) String The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key.
    key.fields optional [] List<String> Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
    key.fields-prefix optional (none) String Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
    value.format required (none) String The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the 'format' option are required.
    value.fields-include optional ALL EnumPossible values: [ALL, EXCEPT_KEY] Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.
    scan.startup.mode optional group-offsets String Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.
    scan.startup.specific-offsets optional (none) String Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'.
    scan.startup.timestamp-millis optional (none) Long Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode.
    scan.topic-partition-discovery.interval optional (none) Duration Interval for consumer to discover dynamically created Kafka topics and partitions periodically.
    sink.partitioner optional ‘default’ String Output partitioning from Flink’s partitions into Kafka’s partitions. Valid values aredefault: use the kafka default partitioner to partition records.fixed: each Flink partition ends up in at most one Kafka partition.round-robin: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record’s keys are not specified.Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.MyPartitioner'.See the following Sink Partitioning for more details.
    sink.semantic optional at-least-once String Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See Consistency guarantees for more details.
    sink.parallelism optional (none) Integer Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Flink-SQL常用算子

  • 常用算子 Join - full join

    image-20210623105041919

多语言实现WordCount

  • 使用 scala 实现wordcount

    package cn.itcast.flink.scala.demo
    
    import cn.itcast.flink.SQL.WordCountData
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.environment.CheckpointConfig
    
    object HelloWorld {
      def main(args: Array[String]): Unit = {
        //1.创建流执行环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //获取当前上下文的参数
        val params: ParameterTool = ParameterTool.fromArgs(args)
        //将当前的参数设置到全局变量中
        env.getConfig.setGlobalJobParameters(params)
        //设置 checkpoint
        env.enableCheckpointing(1000)
        //设置checkpoint 保存 stateback
        env.setStateBackend(new FsStateBackend("file:///d:/chk"))
        //当前flink任务结束了checkpoint 不删除
        env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        env.getCheckpointConfig.setCheckpointInterval(60000)
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000))
        //并行度
        env.setParallelism(1)
        //2.读取数据源
        val source: DataStream[String] = env.fromElements(WordCountData.WORDS: _*)
        //3.转换任务
        val result: DataStream[(String, Int)] = source.flatMap(_.split("\\W+"))
          .map((_, 1))
          .keyBy(_._1)
          .sum(1)
        //4.打印输出
        result.print()
        //5.执行流环境
        env.execute()
      }
    }
    

问题

  • 建模块和导包问题

  • Operator state 案例 - Checkpoint

    如果不设置一秒钟产生一条记录, 不会保存state ,每次还是 从头开始。

    原因:checkpoint 需要 1s ,但是每5条数据生成报异常 Exception ,5条数据生成完之后并没有做完整的checkpoint 状态备份,每次重启之后都重新开始消费。

image-20210622082329365

  • Kafka tool 连接node1:9092集群

    1. 使用 node1 node2 node3
    # 在 windows HOSTS文件
    192.168.88.161 node1 node1.itcast.cn
    192.168.88.162 node2 node2.itcast.cn
    192.168.88.163 node3 node3.itcast.cn
    
    1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
    2. 在配置文件中

    advertised.listeners=192.168.88.161:9092

    1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误

    Table result = tEnv.sqlQuery("" +
                    "select * from " + orderTableA + " where amount>2 " +
                    "union all " +
                    "select * from orderTableB where amount<2");
    
  • FlinkTable & SQL ,Table.printSchema()

    打印当前表结构 , 字段,字段类型
    ckpoint 状态备份,每次重启之后都重新开始消费。

[外链图片转存中…(img-2s6cRETy-1624435933007)]

  • Kafka tool 连接node1:9092集群

    1. 使用 node1 node2 node3
    # 在 windows HOSTS文件
    192.168.88.161 node1 node1.itcast.cn
    192.168.88.162 node2 node2.itcast.cn
    192.168.88.163 node3 node3.itcast.cn
    
    1. 防火墙 windows 和 Linux ,杀毒软件是否关闭
    2. 在配置文件中

    advertised.listeners=192.168.88.161:9092

    1. 重启 zookeeper 和 kafka
  • FlinkSQL 需要有空格分割,否则报语义错误

    Table result = tEnv.sqlQuery("" +
                    "select * from " + orderTableA + " where amount>2 " +
                    "union all " +
                    "select * from orderTableB where amount<2");
    
  • FlinkTable & SQL ,Table.printSchema()

    打印当前表结构 , 字段,字段类型

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。