图数据库的易用性—GES与Flink的对接

举报
你好_TT 发表于 2021/12/29 18:00:35 2021/12/29
【摘要】 GES-Flink-Connector是一款自定义的离线/实时数据同步Flink连接器(connector),用于外部数据源与GES图数据库的数据同步。

数字化时代业务的实时处理需求越来越迫切实时预警实时风控实时推荐Flink作为新一代流批统一的计算引擎,具有独特的天然流式计算特性和更为先进的架构设计的特点它可以从不同的第三方存储引擎中读取数据,进行处理,然后再写出到另外的存储引擎中。

GES拥抱变化,开发了与Flink的对接工具GES-Flink-ConnectorGES-Flink-Connector是一款自定义的离线/实时数据同步Flink连接器(connector),用于外部数据源与GES图数据库的数据同步。Connector的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。GES-Flink-Connector具备流批统一的能力,对于离线计算与流计算的数据都可以写入GES图数据库中。利用Flink连接器机制,只要实现了数据源的Source Connector读取数据,就可以通过GES-Flink-Connector将数据进行自定义转换并导入到GES图数据库中。

GES-Flink-Connector的架构图如下所示

功能介绍

GES-Flink-Connector具备如下能力:

  • 流批统一,支持流数据与批数据
  • 数据导入支持三种提交模式,批量提交、间隔提交、混合提交
  • 利用Flink提供的Checkpoint机制,具备一定的容错能力
  • 具备导入失败处理能力,批导入失败转单条导入,单条导入失败转存储
  • 具备脏数据发现能力,验证属性数量是否符合要求,验证label是否存在
  • 具备脏数据和错误数据存储能力,可将数据存储到LOCAL、OBS、HDFS
  • 具备错误数据限制能力,当错误率达到一定上限时,停止任务

使用案例介绍

将离线数据导入GES

以向GES中导入JDBC离线数据为例,操作步骤如下:

1. 将GES-Flink-Connector jar包打入本地maven仓库

mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar

2. 添加相关maven依赖(flink版本需高于1.7.2)

<dependency>
    <groupId>com.huawei.ges</groupId>
    <artifactId>ges-flink-connector</artifactId>
    <version>1.0.0</version>
</dependency>

3. 配置相关参数

4. 编写数据转换方法

// T is your data type
public class GraphStringDataConverter implements GraphDataConverter<T> {
    /**
     * Your convert method.
     * Separate your data fields with commas
     * e.g.
     * vertex
     * id, label, property 1, property 2,…
     * edge
     * id 1, id 2, label, property 1, property 2, …
     *
     * @param t your data
     * @return format string
     */
    @Override
    public String convert(T t) {
        // Implement your transformation method
        String s = ...
        return s;
    }
}

5. 创建flink任务

// ------------------------flink环境创建----------------------------------
// 创建flink流数据环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
environment.setParallelism(CONCURRENT_COUNT);
// 开启checkpoint 设置checkpoint时间间隔与checkpoint模式
environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);

// -------------------------数据源获取-------------------------------------
// table schema
TypeInformation[] fieldTypes = new TypeInformation[]{
    // id
    BasicTypeInfo.INT_TYPE_INFO,
    // label
    BasicTypeInfo.STRING_TYPE_INFO,
    // property 1
    BasicTypeInfo.STRING_TYPE_INFO
   	// ...
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// query sql
String querySql = "select * from {$your_table_name}";

// 数据源获取,JDBCInputFormat 读出来数据为flink Row类型
DataStream<Row> dataSource =
    environment.createInput(
    JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("your_mysql_jdbc_url")
    .setUsername("you_mysql_username")
    .setPassword("you_mysql_password")
    .setQuery(querySql)
    .setRowTypeInfo(rowTypeInfo)
    .finish());

// -------------------------输出源配置---------------------------------------
// 读取配置文件
Properties gesProp = new Properties();
InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties");
gesProp.load(in);

// 创建flink Row数据转为要求的逗号分隔字符串的策略
GraphDataConverter<Row> graphRowDataConvert = new GraphRowDataConvert();
GraphDataConvertStrategy<Row, GraphDataConverter<Row>> rowConvertStrategy =
    new GraphDataConvertStrategy<>(graphRowDataConvert);
// 创建batch输出方法,并添加转化策略与配置文件
GraphBatchOutputFormat<Row> outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp);
// 创建sink输出方法
GraphSinkFunction<Row> sinkFunction = new GraphSinkFunction<>(outputFormat);
// 为数据源添加输出方法
dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT);
// 启动flink
environment.execute();

flink_web.png

通过DLI与云上数据源交互

GES-Flink-Connector-DLI版本用于云上DLI Flink队列,采用Flink SQL的方式完成数据到GES的导入,操作步骤如下:

1. 修改jar包内config.properties参数配置

2. 将jar包导入OBS

obs.png

3. DLI创建程序包数据管理-程序包管理-创建程序包

chengxubao.png

4. DLI购买队列并创建Flink作业

zuoye.png

5. 创建DLI Flink队列与GES图服务的对等连接跨源连接-创建连接

kuayuan.png

vpc设置为GES图引擎服务的同一个vpc,并测试地址连通性。

6. 编辑Flink SQL

# SOURCE表示数据源,可以是DLI支持的任意数据源
CREATE SOURCE STREAM v_labels (
  id STRING,
  label STRING,
  uuid STRING,
  d1 STRING,
  d2 STRING
) WITH (
  type = "obs",
  bucket = "your bucket",
  region = "your region",
  object_name = "your file",
  row_delimiter = "\n",
  field_delimiter = ","
);

# SINK表示输出源 为GES图数据库
CREATE SINK STREAM ges_sink (
  id STRING,
  label STRING,
  uuid STRING,
  d1 STRING,
  d2 STRING
) WITH (
  type = "user_defined",
  type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction
  type_class_parameter = ""
);

# Some data processing
...

# 执行数据由输入源导入输出源
INSERT INTO
  ges_sink
SELECT
  * -- 选择想要输出的字段
FROM
  v_labels;

dli.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。