FusionInsight MRS FlinkServer之FlinkSQL对接组件实例

yangxiao_mrs 发表于 2021/10/29 22:11:21 2021/10/29
【摘要】 为了方便业务开发人员开发FlinkSQL作业、提交FlinkJar作业,以及方便运维人员对Flink作业的管理,FusionInsight MRS研发了FlinkServer可视化开发平台。

MRS 3.1.2 版本中FlinkServer支持通过FlinkSQL对接的组件有:KafkaRedisHudiClickHouseHiveHbaseHDFS


一、对接Kafka【Kafka可以用作Source、Sink表】

CREATE TABLE datagenkafka (a INT) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1'

);

create table kafkasink(a INT) with(

  'connector' = 'kafka',

  'topic' = 'input',

  'properties.bootstrap.servers' = 'x.x.x.x:21007',

  'properties.group.id' = 'group1',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'json',

  'properties.sasl.kerberos.service.name' = 'kafka',

  'properties.security.protocol' = 'SASL_PLAINTEXT',

  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

);

insert into

  kafkasink

select

  *

from

  datagenkafka;

 

其中对接kafka 21007安全端口时需要添加properties.sasl.kerberos.service.name、properties.security.protocol、properties.kerberos.domain.name三个参数,properties.kerberos.domain.name为kafka的域名称;对接21005端口不需要添加这三个参数。

二、对接Redis【Redis可以用作Sink、维表】

CREATE TABLE datagenRedis (a INT, b INT) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1'

);

CREATE SINK STREAM redis_sink(account int, costs int, PRIMARY KEY(account)) WITH (

  'isSafeMode' = 'true',

  'clusterAddress' = '8.5.213.12:22400,8.5.213.13:22400,8.5.213.14:22400',

  'redistype' = 'String',

  'type' = 'Redis',

  'isSSLMode' = 'false'

);

insert into

  redis_sink

select

  *

from

  datagenRedis;

 

其中isSafeMode表示Redis集群是不是安全集群,isSSLMode表示Redis有没有打开SSL。

三、对接Hudi【Hudi可以用作Source、Sink表】

CREATE TABLE hudisink(

  uuid VARCHAR(20),

  name VARCHAR(10),

  age INT,

  ts timestamp(3),

  `p` VARCHAR(20)

) PARTITIONED BY (`p`) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hacluster/tmp/hudi/t1'

);

CREATE TABLE datagenhudi (

  uuid varchar(20),

  name varchar(10),

  age int,

  ts timestamp(3),

  p varchar(20)

) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1'

);

insert into

  hudisink

select

  *

from

  datagenhudi;

 

当Hudi Sink表为MOR类型时需要在FlinkServer作业定义界面中打开CheckPoint,否则数据无法Sink到Hudi表中。


四、对接ClickHouseClickHouse可以用作Source、Sink表】

CREATE TABLE CKT (`a` int) WITH (

  'connector' = 'jdbc',

  'url' = 'jdbc:clickhouse://8.5.213.12:21422/default?ssl=true&sslmode=none',

  'username' = 'flinkuser',

  'password' = 'Bigdata_2013',

  'table-name' = 'test',

  'driver' = 'ru.yandex.clickhouse.ClickHouseDriver',

  'sink.buffer-flush.max-rows' = '10'

);

CREATE TABLE datagen (a INT) WITH ('connector' = 'datagen');

Insert into

  CKT

select

  *

from

  datagen;

 

通过FlinkSQL向ClickHouse中写入数据前需要现在ClickHouse创建好对应的表。其中攒批写入由sink.buffer-flush.max-rows、sink.buffer-flush.interval两个参数控制。


五、对接HiveHive可以用作Source、Sink、维表】

CREATE TABLE test_kafka (
user_id varchar,
item_id varchar,
cat_id varchar,
zw_test timestamp
) WITH (
'properties.bootstrap.servers' = '10.244.225.192:21007',
'format' = 'json',
'topic' = 'zw_tset_kafka',
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-version' = '3.1.0',
'default-database' = 'default',
'cluster.name' = 'flink_hive'
);
use catalog myhive;
set table.sql-dialect = hive;create table user_behavior_hive_tbl_no_partition (
user_id STRING,
item_id STRING,
cat_id STRING,
ts timestamp
) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as orcfile TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.delay' = '0S',
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
INSERT into
user_behavior_hive_tbl_no_partition
SELECT
user_id,
item_id,
cat_id,
zw_test,
DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
DATE_FORMAT(zw_test, 'HH'),
DATE_FORMAT(zw_test, 'mm')
FROM
default_catalog.default_database.test_kafka;


对接Hive之前需要先在FlinkServer中创建好集群连接,因为Hive的相关配置是通过集群连接获取的,其中cluster.name是集群连接命令。

当Hive为Sink表时需要在FlinkServer作业定义界面中打开CheckPoint,否则数据无法Sink到Hive表中。


六、对接HBaseHBase可以用作Sink、维表】

CREATE TABLE datagenHBase (a sTRING, b sTRING) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1'

);

CREATE TABLE Hbasesink (

  rowkey STRING,

  f1 ROW < b STRING >,

  PRIMARY KEY (rowkey) NOT ENFORCED

) WITH (

  'connector' = 'hbase-2.2',

  'table-name' = 'dim_province',

  'zookeeper.quorum' = '8.5.213.13:24002,8.5.213.12:24002,8.5.213.14:24002'

);

INSERT INTO

  Hbasesink

SELECT

  a as rowkey,

  ROW(b) as f1

FROM

  datagenHBase;  


通过FlinkSQL向HBase中写入数据前需要现在HBase创建好对应的表。

七、对接HDFSHDFS可以用作SourceSink表】

CREATE TABLE datagenhdfs (a INT) WITH (

  'connector' = 'datagen',

  'rows-per-second' = '1'

);

CREATE TABLE hdfssink(a int) WITH(

  'connector.type' = 'filesystem',

  'connector.path' = 'hdfs:///tmp/hdfs.result',

  'format.type' = 'csv'

);

insert into

  hdfssink

select

  *

from

  datagenhdfs;

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。