MRS Flink使用SQL-Client对接Hive

举报
yangxiao_mrs 发表于 2021/09/29 17:38:40 2021/09/29
【摘要】 一、准备环境1.根据产品文档安装Flink客户端;2.将sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中3.将jaas.conf放入/opt/client/Flink/flink/conf中Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=...

一、准备环境

1.根据产品文档安装Flink客户端;

2.sql-client-defaults.yaml放入/opt/client/Flink/flink/conf

3.jaas.conf放入/opt/client/Flink/flink/conf

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
debug=false;
};

4.添加sql-client.sh中添加在JVM_ARGS参数:

JVM_ARGS="-Djava.security.auth.login.config=/opt/client/Flink/flink/conf/jaas.conf $JVM_ARGS"

二、启动Flink集群

例如:yarn-session.sh -t ssl -d

三、启动SQL-Client

./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml

四、运行SQL

CREATE TABLE kafkaSourceTable (
  order_id VARCHAR,
  shop_id VARCHAR,
  member_id VARCHAR,
  trade_amt DOUBLE
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'order_sql',
  'connector.properties.bootstrap.servers' = '10.162.147.217:21005',
  'connector.properties.zookeeper.connect' = '10.162.147.217:24002',
  'connector.properties.group.id' = 'test-consumer-group',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
);

CREATE TABLE kafkaSinkTable(shop_id VARCHAR, member_id VARCHAR) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'order_sql',
  'connector.properties.bootstrap.servers' = '10.162.147.217:21005',
  'connector.properties.zookeeper.connect' = '10.162.147.217:24002',
  'update-mode' = 'append',
  'format.type' = 'json'
);
INSERT INTO
  kafkaSinkTable
SELECT
  shop_id,
  member_id
FROM
  kafkaSourceTable;
SELECT
  shop_id,
  member_id
FROM
  kafkaSourceTable;

五、对接Hive

1)修改sql-client-defaults.yaml

catalogs:
    - name: myhive
     type: hive
    hive-conf-dir: /opt/clienrc5/Hive/config
    hive-version: 3.1.0

2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置

<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>

3)启动sql-client

use catalog myhive;

SET table.sql-dialect=hive;

CREATE TABLE IF NOT EXISTS hive_dialect_tbl (
`id` int ,
`name` string ,
`age` int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

SET table.sql-dialect=default;

CREATE TABLE datagen (
`id` int ,
`name` string ,
`age` int
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);

INSERT INTO hive_dialect_tbl SELECT * FROM datagen;

select * from hive_dialect_tbl;

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。