FusionInsight MRS FlinkServer之FlinkSQL对接组件实例
MRS 3.1.2 版本中FlinkServer支持通过FlinkSQL对接的组件有:Kafka、Redis、Hudi、ClickHouse、Hive、Hbase、HDFS。
一、对接Kafka【Kafka可以用作Source、Sink表】
CREATE TABLE datagenkafka (a INT) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create table kafkasink(a INT) with(
'connector' = 'kafka',
'topic' = 'input',
'properties.bootstrap.servers' = 'x.x.x.x:21007',
'properties.group.id' = 'group1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
kafkasink
select
*
from
datagenkafka;
其中对接kafka 21007安全端口时需要添加properties.sasl.kerberos.service.name、properties.security.protocol、properties.kerberos.domain.name三个参数,properties.kerberos.domain.name为kafka的域名称;对接21005端口不需要添加这三个参数。
二、对接Redis【Redis可以用作Sink、维表】
CREATE TABLE datagenRedis (a INT, b INT) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE SINK STREAM redis_sink(account int, costs int, PRIMARY KEY(account)) WITH (
'isSafeMode' = 'true',
'clusterAddress' = '8.5.213.12:22400,8.5.213.13:22400,8.5.213.14:22400',
'redistype' = 'String',
'type' = 'Redis',
'isSSLMode' = 'false'
);
insert into
redis_sink
select
*
from
datagenRedis;
其中isSafeMode表示Redis集群是不是安全集群,isSSLMode表示Redis有没有打开SSL。
三、对接Hudi【Hudi可以用作Source、Sink表】
CREATE TABLE hudisink(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts timestamp(3),
`p` VARCHAR(20)
) PARTITIONED BY (`p`) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/hudi/t1'
);
CREATE TABLE datagenhudi (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
p varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
insert into
hudisink
select
*
from
datagenhudi;
当Hudi Sink表为MOR类型时需要在FlinkServer作业定义界面中打开CheckPoint,否则数据无法Sink到Hudi表中。
四、对接ClickHouse【ClickHouse可以用作Source、Sink表】
CREATE TABLE CKT (`a` int) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://8.5.213.12:21422/default?ssl=true&sslmode=none',
'username' = 'flinkuser',
'password' = 'Bigdata_2013',
'table-name' = 'test',
'driver' = 'ru.yandex.clickhouse.ClickHouseDriver',
'sink.buffer-flush.max-rows' = '10'
);
CREATE TABLE datagen (a INT) WITH ('connector' = 'datagen');
Insert into
CKT
select
*
from
datagen;
通过FlinkSQL向ClickHouse中写入数据前需要现在ClickHouse创建好对应的表。其中攒批写入由sink.buffer-flush.max-rows、sink.buffer-flush.interval两个参数控制。
五、对接Hive【Hive可以用作Source、Sink、维表】
CREATE TABLE test_kafka (
user_id varchar,
item_id varchar,
cat_id varchar,
zw_test timestamp
) WITH (
'properties.bootstrap.servers' = '10.244.225.192:21007',
'format' = 'json',
'topic' = 'zw_tset_kafka',
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-version' = '3.1.0',
'default-database' = 'default',
'cluster.name' = 'flink_hive'
);
use catalog myhive;
set table.sql-dialect = hive;create table user_behavior_hive_tbl_no_partition (
user_id STRING,
item_id STRING,
cat_id STRING,
ts timestamp
) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as orcfile TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00',
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.delay' = '0S',
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
INSERT into
user_behavior_hive_tbl_no_partition
SELECT
user_id,
item_id,
cat_id,
zw_test,
DATE_FORMAT(zw_test, 'yyyy-MM-dd'),
DATE_FORMAT(zw_test, 'HH'),
DATE_FORMAT(zw_test, 'mm')
FROM
default_catalog.default_database.test_kafka;
对接Hive之前需要先在FlinkServer中创建好集群连接,因为Hive的相关配置是通过集群连接获取的,其中cluster.name是集群连接命令。
当Hive为Sink表时需要在FlinkServer作业定义界面中打开CheckPoint,否则数据无法Sink到Hive表中。
六、对接HBase【HBase可以用作Sink、维表】
CREATE TABLE datagenHBase (a sTRING, b sTRING) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE Hbasesink (
rowkey STRING,
f1 ROW < b STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'dim_province',
'zookeeper.quorum' = '8.5.213.13:24002,8.5.213.12:24002,8.5.213.14:24002'
);
INSERT INTO
Hbasesink
SELECT
a as rowkey,
ROW(b) as f1
FROM
datagenHBase;
通过FlinkSQL向HBase中写入数据前需要现在HBase创建好对应的表。
七、对接HDFS【HDFS可以用作Source、Sink表】
CREATE TABLE datagenhdfs (a INT) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE hdfssink(a int) WITH(
'connector.type' = 'filesystem',
'connector.path' = 'hdfs:///tmp/hdfs.result',
'format.type' = 'csv'
);
insert into
hdfssink
select
*
from
datagenhdfs;
- 点赞
- 收藏
- 关注作者
评论(0)