2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四

举报
Lansonli 发表于 2021/09/29 00:24:15 2021/09/29
【摘要】 目录 案例四 需求 ​​​​​​​代码实现 ​​​​​​​案例四 需求 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "...

目录

案例四

需求

​​​​​​​代码实现


​​​​​​​案例四

需求

从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka


  
  1. {"user_id": "1", "page_id":"1", "status": "success"}
  2. {"user_id": "1", "page_id":"1", "status": "success"}
  3. {"user_id": "1", "page_id":"1", "status": "success"}
  4. {"user_id": "1", "page_id":"1", "status": "success"}
  5. {"user_id": "1", "page_id":"1", "status": "fail"}

  
  1. /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
  2. /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
  3. /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
  4. /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

 

​​​​​​​代码实现

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

 


  
  1. package cn.itcast.sql;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.TableResult;
  7. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  8. import org.apache.flink.types.Row;
  9. /**
  10.  * Author itcast
  11.  * Desc
  12.  */
  13. public class FlinkSQL_Table_Demo06 {
  14.     public static void main(String[] args) throws Exception {
  15.         //1.准备环境
  16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  18.         //2.Source
  19.         TableResult inputTable = tEnv.executeSql(
  20.                 "CREATE TABLE input_kafka (\n" +
  21.                         "  `user_id` BIGINT,\n" +
  22.                         "  `page_id` BIGINT,\n" +
  23.                         "  `status` STRING\n" +
  24.                         ") WITH (\n" +
  25.                         "  'connector' = 'kafka',\n" +
  26.                         "  'topic' = 'input_kafka',\n" +
  27.                         "  'properties.bootstrap.servers' = 'node1:9092',\n" +
  28.                         "  'properties.group.id' = 'testGroup',\n" +
  29.                         "  'scan.startup.mode' = 'latest-offset',\n" +
  30.                         "  'format' = 'json'\n" +
  31.                         ")"
  32.         );
  33.         TableResult outputTable = tEnv.executeSql(
  34.                 "CREATE TABLE output_kafka (\n" +
  35.                         "  `user_id` BIGINT,\n" +
  36.                         "  `page_id` BIGINT,\n" +
  37.                         "  `status` STRING\n" +
  38.                         ") WITH (\n" +
  39.                         "  'connector' = 'kafka',\n" +
  40.                         "  'topic' = 'output_kafka',\n" +
  41.                         "  'properties.bootstrap.servers' = 'node1:9092',\n" +
  42.                         "  'format' = 'json',\n" +
  43.                         "  'sink.partitioner' = 'round-robin'\n" +
  44.                         ")"
  45.         );
  46.         String sql = "select " +
  47.                 "user_id," +
  48.                 "page_id," +
  49.                 "status " +
  50.                 "from input_kafka " +
  51.                 "where status = 'success'";
  52.         Table ResultTable = tEnv.sqlQuery(sql);
  53.         DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
  54.         resultDS.print();
  55.         tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
  56.         //7.excute
  57.         env.execute();
  58.     }
  59. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357577

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。