2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

举报
Lansonli 发表于 2021/09/28 22:57:06 2021/09/28
【摘要】 目录 案例五 FlinkSQL整合Hive 介绍 ​​​​​​​集成Hive的基本方式 ​​​​​​​准备工作 1.添加hadoop_classpath 2.下载jar并上传至flink/lib目录 3.修改hive配置 4.启动hive元数据服务 ​​​​​​​SQL CLI 1.修改flinksql配置 2.启...

目录

案例五 FlinkSQL整合Hive

介绍

​​​​​​​集成Hive的基本方式

​​​​​​​准备工作

1.添加hadoop_classpath

2.下载jar并上传至flink/lib目录

3.修改hive配置

4.启动hive元数据服务

​​​​​​​SQL CLI

1.修改flinksql配置

2.启动flink集群

3.启动flink-sql客户端

4.执行sql:

​​​​​​​代码演示


案例五 FlinkSQL整合Hive

介绍

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

https://zhuanlan.zhihu.com/p/338506408

使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,接下来将以最新的Flink1.12版本为例,实现Flink集成Hive

 

​​​​​​​集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据

Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。

  • 利用 Flink 来读写 Hive 的表

Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

 

​​​​​​​准备工作

1.添加hadoop_classpath

vim /etc/profile

增加如下配置

export HADOOP_CLASSPATH=`hadoop classpath`

刷新配置

source /etc/profile

 

2.下载jar并上传至flink/lib目录

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

 

 

3.修改hive配置

vim /export/server/hive/conf/hive-site.xml
 

  
  1. <property>
  2.         <name>hive.metastore.uris</name>
  3.         <value>thrift://node3:9083</value>
  4. </property>

4.启动hive元数据服务

nohup /export/server/hive/bin/hive --service metastore &
 

 

​​​​​​​SQL CLI

1.修改flinksql配置

vim /export/server/flink/conf/sql-client-defaults.yaml
 

增加如下配置


  
  1. catalogs:
  2.    - name: myhive
  3.      type: hive
  4.      hive-conf-dir: /export/server/hive/conf
  5.      default-database: default

 

2.启动flink集群

/export/server/flink/bin/start-cluster.sh
 

 

3.启动flink-sql客户端

/export/server/flink/bin/sql-client.sh embedded
 

 

4.执行sql:


  
  1. show catalogs;
  2. use catalog myhive;
  3. show tables;
  4. select * from person;

 

 

​​​​​​​代码演示


  
  1. package cn.itcast.extend;
  2. import org.apache.flink.table.api.EnvironmentSettings;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.TableEnvironment;
  5. import org.apache.flink.table.api.TableResult;
  6. import org.apache.flink.table.catalog.hive.HiveCatalog;
  7. /**
  8.  * Author itcast
  9.  * Desc
  10.  */
  11. public class HiveDemo {
  12.     public static void main(String[] args){
  13.         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
  14.         TableEnvironment tableEnv = TableEnvironment.create(settings);
  15.         String name            = "myhive";
  16.         String defaultDatabase = "default";
  17.         String hiveConfDir = "./conf";
  18.         HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
  19.         //注册catalog
  20.         tableEnv.registerCatalog("myhive", hive);
  21.         //使用注册的catalog
  22.         tableEnv.useCatalog("myhive");
  23.         //向Hive表中写入数据
  24.         String insertSQL = "insert into person select * from person";
  25.         TableResult result = tableEnv.executeSql(insertSQL);
  26.         System.out.println(result.getJobClient().get().getJobStatus());
  27.     }
  28. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116357629

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。