DLI Flink OpenSource SQL中用户自定义sink的使用
【摘要】 在DLI的Flink OpenSource SQL语句中针对用户不同的使用场景,为用户提供了较多的connector以供用户使用,如DWS connector、kafka connector、RDS connector、Hbase connector和elasticsearch connector等。虽然这些connector足够应付大多数主流使用场景,但是仍然有可能不能够满...
在DLI的Flink OpenSource SQL语句中针对用户不同的使用场景,为用户提供了较多的connector以供用户使用,如DWS connector、kafka connector、RDS connector、Hbase connector和elasticsearch connector等。虽然这些connector足够应付大多数主流使用场景,但是仍然有可能不能够满足部分用户的使用场景,比如用户需要将输出结果输出到日志中,或者DLI没有提供相应connector的服务中。然而,DLI虽然想要提供所有的connector以满足所有的用户使用场景,但是由于人力等问题,无法实现。因此,为了满足这些无法使用DLI提供connector的用户的使用场景,我们提供了用户自定义connector。用户使用自定义connector时,只需要实现相应的sink或source函数即可,从而可以实现更大的灵活性。这里我们以自定义sink为例进行介绍。
1.自定义sink的用法
我们首先介绍自定义sink的用法。我们从DLI的官网可以参考https://support.huaweicloud.com/sqlref-flink-dli/dli_08_0347.html,其具体内容如下表所示。从表中我们可以看到,该connector在with中只支持三个属性。第一个字段为connector.type,其为必选,且值只能为user-defined。第二个字段属性为connector.class-name,它也为必选,这个字段的值是用户提供的sink函数的全限定类名。而最后一个字段则为connector.class-parameter,这个值不是必选,主要是作为用户定义的sink函数的构造函数传入。而同时,如果用户的sink函数的构造函数需要传入该参数值,则构造函数的参数只能为一个,且只能为字符串类型,因此用户如果需要传入多个值,则需要拼接成一个字符串并作为connector.class-parameter字段的值,之后再在sink函数的构造函数中进行分解、转换,从而得到需要的类型参数。了解了自定义sink的用法,我们知道首先需要做的就是编写自己需要的sink函数,下面我们将以sink函数的编写为重点进行介绍。
参数 |
是否必选 |
说明 |
connector.type |
是 |
只能为user-defined,表示使用自定义的sink。 |
connector.class-name |
是 |
sink函数的全限定类名。 |
connector.class-parameter |
否 |
sink函数其构造函数的参数,只支持一个String类型的参数。 |
2.sink函数原理
为编写支持upsert流的sink函数,则该sink函数需要继承RichSinkFunction(该类必须继承或其他提供相应功能的Flink类),并实现CheckpointedFunction(该接口可由用户选择是否使用)接口,RichSinkFunction主要用于对到达数据的处理和写入,CheckpointedFunction主要用于进行checkpoint时进行相应的处理。这两个类和接口的继承关系如下:
3.具体实现方式
3.1 RichSinkFunction中以下几个方法需要重点重写
- 重写open(Configuration)方法,该方法主要是进行相应的初始化操作,如hdfs sink中使用参数和相应环境的配置初始化
- 重写invoke(In, Context)方法,当每条数据到达时会调用该方法,主要在sink中用于对每条到达数据的处理
- 重写close()方法,该方法主要用于对资源等的释放
3.2 CheckpointedFunction中以下几个方法需要重点重写(该接口不是必须实现)
- 重写snapshotState(FunctionSnapshotContext)方法,该方法主要用于进行checkpoint时用户需要进行的相应操作,每当checkpoint执行的时候,snapshotState会被调用,通常用于flush、commit、synchronize外部系统
- 重写initializeState(FunctionInitializationContext)方法,该方法主要用于进行一些初始化操作,初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,通常用来初始化state,以及处理state recovery的逻辑
4.使用示例
基于上面的介绍,我们实现一个简单的输出数据到日志中的sink函数,该函数名为DefinedSink,假设这个类在包com.huawei.flink下。
首先,我们需要在pom文件中添加相应的依赖。
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
其后,我们开始实现sink函数。根据上述的描述,如果我们需要将数据输出到日志中,则我们只需要在invoke方法中,将每条数据利用日志记录即可。每次到达的数据都是一个Tuple2类型的,它有两个字段,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,当为false时,可不进行任何操作。第二个值表示实际的数据值。基于此,我们做了进一步的改进,即当要插入或者更新该数据时,我们在其前面输出了insert or update字段。当要删除该数据时,我们在其前面加了delete字段已作区分,方便用于调试。因此,其具体实现如下述代码
package com.huawei.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefinedSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
Logger logger = LoggerFactory.getLogger(getClass());
// 初始化
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
// in包括两个值,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,
//当为false时,可不进行任何操作。第二个值表示实际的数据值
public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {
boolean flag = in.f0;
if (flag) {
logger.info("insert or update:" + in.f1.toString());
} else {
logger.info("delete:" + in.f1.toString());
}
}
@Override
public void close() throws Exception {
}
}
代码编写完成后,我们只需要使用mvn clean install命令进行打包即可。之后将该jar作为udf在Flink OpenSource SQL作业编辑页面中上传即可。之后即可通过用户自定义connector使用该sink。 这里,我们以kafka作为source,自定义sink作为sink,从kafa中读取数据,输出到日志中。当作业成功运行后,并且向kafka中插入多个字符串"defined-sinkTest"后,我们在taskManager的日志中可以看到类似于下图结果。
create table kafkaSource(
attr string
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topicName',
'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'connector.properties.group.id' = 'test_definedsink',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'csv'
);
create table userSink(
attr string
) with (
'connector.type' = 'user-defined',
'connector.class-name' = 'com.huawei.flink.DefinedSink'
);
insert into userSink select *from kafkaSource;
5.可能存在的问题
若出现Caused by: com.huawei.flink.userdefined.error.UserDefinedParameterException: The class of user defined construct fail.,请确认sink函数的构造函数是否正确。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)