图数据库的易用性—GES与Flink的对接
数字化时代,业务的实时处理需求越来越迫切,实时预警、实时风控、实时推荐等,Flink作为新一代流批统一的计算引擎,具有独特的天然流式计算特性和更为先进的架构设计的特点,它可以从不同的第三方存储引擎中读取数据,进行处理,然后再写出到另外的存储引擎中。
GES拥抱变化,开发了与Flink的对接工具GES-Flink-Connector。GES-Flink-Connector是一款自定义的离线/实时数据同步Flink连接器(connector),用于外部数据源与GES图数据库的数据同步。Connector的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。GES-Flink-Connector具备流批统一的能力,对于离线计算与流计算的数据都可以写入GES图数据库中。利用Flink连接器机制,只要实现了数据源的Source Connector读取数据,就可以通过GES-Flink-Connector将数据进行自定义转换并导入到GES图数据库中。
GES-Flink-Connector的架构图如下所示:
功能介绍
GES-Flink-Connector具备如下能力:
- 流批统一,支持流数据与批数据
- 数据导入支持三种提交模式,批量提交、间隔提交、混合提交
- 利用Flink提供的Checkpoint机制,具备一定的容错能力
- 具备导入失败处理能力,批导入失败转单条导入,单条导入失败转存储
- 具备脏数据发现能力,验证属性数量是否符合要求,验证label是否存在
- 具备脏数据和错误数据存储能力,可将数据存储到LOCAL、OBS、HDFS
- 具备错误数据限制能力,当错误率达到一定上限时,停止任务
使用案例介绍
将离线数据导入GES
以向GES中导入JDBC离线数据为例,操作步骤如下:
1. 将GES-Flink-Connector jar包打入本地maven仓库
mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar
2. 添加相关maven依赖(flink版本需高于1.7.2)
<dependency>
<groupId>com.huawei.ges</groupId>
<artifactId>ges-flink-connector</artifactId>
<version>1.0.0</version>
</dependency>
3. 配置相关参数
4. 编写数据转换方法
// T is your data type
public class GraphStringDataConverter implements GraphDataConverter<T> {
/**
* Your convert method.
* Separate your data fields with commas
* e.g.
* vertex
* id, label, property 1, property 2,…
* edge
* id 1, id 2, label, property 1, property 2, …
*
* @param t your data
* @return format string
*/
@Override
public String convert(T t) {
// Implement your transformation method
String s = ...
return s;
}
}
5. 创建flink任务
// ------------------------flink环境创建----------------------------------
// 创建flink流数据环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
environment.setParallelism(CONCURRENT_COUNT);
// 开启checkpoint 设置checkpoint时间间隔与checkpoint模式
environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
// -------------------------数据源获取-------------------------------------
// table schema
TypeInformation[] fieldTypes = new TypeInformation[]{
// id
BasicTypeInfo.INT_TYPE_INFO,
// label
BasicTypeInfo.STRING_TYPE_INFO,
// property 1
BasicTypeInfo.STRING_TYPE_INFO
// ...
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// query sql
String querySql = "select * from {$your_table_name}";
// 数据源获取,JDBCInputFormat 读出来数据为flink Row类型
DataStream<Row> dataSource =
environment.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("your_mysql_jdbc_url")
.setUsername("you_mysql_username")
.setPassword("you_mysql_password")
.setQuery(querySql)
.setRowTypeInfo(rowTypeInfo)
.finish());
// -------------------------输出源配置---------------------------------------
// 读取配置文件
Properties gesProp = new Properties();
InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties");
gesProp.load(in);
// 创建flink Row数据转为要求的逗号分隔字符串的策略
GraphDataConverter<Row> graphRowDataConvert = new GraphRowDataConvert();
GraphDataConvertStrategy<Row, GraphDataConverter<Row>> rowConvertStrategy =
new GraphDataConvertStrategy<>(graphRowDataConvert);
// 创建batch输出方法,并添加转化策略与配置文件
GraphBatchOutputFormat<Row> outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp);
// 创建sink输出方法
GraphSinkFunction<Row> sinkFunction = new GraphSinkFunction<>(outputFormat);
// 为数据源添加输出方法
dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT);
// 启动flink
environment.execute();
通过DLI与云上数据源交互
GES-Flink-Connector-DLI版本用于云上DLI Flink队列,采用Flink SQL的方式完成数据到GES的导入,操作步骤如下:
1. 修改jar包内config.properties参数配置
2. 将jar包导入OBS
3. DLI创建程序包(数据管理-程序包管理-创建程序包)
4. DLI购买队列并创建Flink作业
5. 创建DLI Flink队列与GES图服务的对等连接(跨源连接-创建连接)
将vpc设置为GES图引擎服务的同一个vpc,并测试地址连通性。
6. 编辑Flink SQL
# SOURCE表示数据源,可以是DLI支持的任意数据源
CREATE SOURCE STREAM v_labels (
id STRING,
label STRING,
uuid STRING,
d1 STRING,
d2 STRING
) WITH (
type = "obs",
bucket = "your bucket",
region = "your region",
object_name = "your file",
row_delimiter = "\n",
field_delimiter = ","
);
# SINK表示输出源 为GES图数据库
CREATE SINK STREAM ges_sink (
id STRING,
label STRING,
uuid STRING,
d1 STRING,
d2 STRING
) WITH (
type = "user_defined",
type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction
type_class_parameter = ""
);
# Some data processing
...
# 执行数据由输入源导入输出源
INSERT INTO
ges_sink
SELECT
* -- 选择想要输出的字段
FROM
v_labels;
- 点赞
- 收藏
- 关注作者
评论(0)