2021年大数据Flink(三十七):Table与SQL 案例四
【摘要】
目录
案例四
需求
代码实现
案例四
需求
从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
{"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "...
目录
案例四
需求
从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
-
-
{"user_id": "1", "page_id":"1", "status": "success"}
-
-
{"user_id": "1", "page_id":"1", "status": "success"}
-
-
{"user_id": "1", "page_id":"1", "status": "success"}
-
-
{"user_id": "1", "page_id":"1", "status": "success"}
-
-
{"user_id": "1", "page_id":"1", "status": "fail"}
-
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
-
-
-
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
-
-
-
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
-
-
-
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
-
package cn.itcast.sql;
-
-
import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.table.api.Table;
-
import org.apache.flink.table.api.TableResult;
-
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
import org.apache.flink.types.Row;
-
-
/**
-
* Author itcast
-
* Desc
-
*/
-
public class FlinkSQL_Table_Demo06 {
-
public static void main(String[] args) throws Exception {
-
//1.准备环境
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-
//2.Source
-
TableResult inputTable = tEnv.executeSql(
-
"CREATE TABLE input_kafka (\n" +
-
" `user_id` BIGINT,\n" +
-
" `page_id` BIGINT,\n" +
-
" `status` STRING\n" +
-
") WITH (\n" +
-
" 'connector' = 'kafka',\n" +
-
" 'topic' = 'input_kafka',\n" +
-
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
-
" 'properties.group.id' = 'testGroup',\n" +
-
" 'scan.startup.mode' = 'latest-offset',\n" +
-
" 'format' = 'json'\n" +
-
")"
-
);
-
TableResult outputTable = tEnv.executeSql(
-
"CREATE TABLE output_kafka (\n" +
-
" `user_id` BIGINT,\n" +
-
" `page_id` BIGINT,\n" +
-
" `status` STRING\n" +
-
") WITH (\n" +
-
" 'connector' = 'kafka',\n" +
-
" 'topic' = 'output_kafka',\n" +
-
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
-
" 'format' = 'json',\n" +
-
" 'sink.partitioner' = 'round-robin'\n" +
-
")"
-
);
-
-
String sql = "select " +
-
"user_id," +
-
"page_id," +
-
"status " +
-
"from input_kafka " +
-
"where status = 'success'";
-
-
Table ResultTable = tEnv.sqlQuery(sql);
-
-
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
-
resultDS.print();
-
-
tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
-
-
-
//7.excute
-
env.execute();
-
}
-
-
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116357577
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)