DLI Flink OpenSource SQL中用户自定义sink的使用
参数 |
是否必选 |
说明 |
connector.type |
是 |
只能为user-defined,表示使用自定义的sink。 |
connector.class-name |
是 |
sink函数的全限定类名。 |
connector.class-parameter |
否 |
sink函数其构造函数的参数,只支持一个String类型的参数。 |
2.sink函数原理
- 重写open(Configuration)方法,该方法主要是进行相应的初始化操作,如hdfs sink中使用参数和相应环境的配置初始化
- 重写invoke(In, Context)方法,当每条数据到达时会调用该方法,主要在sink中用于对每条到达数据的处理
- 重写close()方法,该方法主要用于对资源等的释放
- 重写snapshotState(FunctionSnapshotContext)方法,该方法主要用于进行checkpoint时用户需要进行的相应操作,每当checkpoint执行的时候,snapshotState会被调用,通常用于flush、commit、synchronize外部系统
- 重写initializeState(FunctionInitializationContext)方法,该方法主要用于进行一些初始化操作,初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,通常用来初始化state,以及处理state recovery的逻辑
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
其后,我们开始实现sink函数。根据上述的描述,如果我们需要将数据输出到日志中,则我们只需要在invoke方法中,将每条数据利用日志记录即可。每次到达的数据都是一个Tuple2类型的,它有两个字段,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,当为false时,可不进行任何操作。第二个值表示实际的数据值。基于此,我们做了进一步的改进,即当要插入或者更新该数据时,我们在其前面输出了insert or update字段。当要删除该数据时,我们在其前面加了delete字段已作区分,方便用于调试。因此,其具体实现如下述代码
package com.huawei.flink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefinedSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
Logger logger = LoggerFactory.getLogger(getClass());
// 初始化
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
// in包括两个值,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,
//当为false时,可不进行任何操作。第二个值表示实际的数据值
public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {
boolean flag = in.f0;
if (flag) {
logger.info("insert or update:" + in.f1.toString());
} else {
logger.info("delete:" + in.f1.toString());
}
}
@Override
public void close() throws Exception {
}
}
代码编写完成后,我们只需要使用mvn clean install命令进行打包即可。之后将该jar作为udf在Flink OpenSource SQL作业编辑页面中上传即可。之后即可通过用户自定义connector使用该sink。 这里,我们以kafka作为source,自定义sink作为sink,从kafa中读取数据,输出到日志中。当作业成功运行后,并且向kafka中插入多个字符串"defined-sinkTest"后,我们在taskManager的日志中可以看到类似于下图结果。
create table kafkaSource(
attr string
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topicName',
'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'connector.properties.group.id' = 'test_definedsink',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'csv'
);
create table userSink(
attr string
) with (
'connector.type' = 'user-defined',
'connector.class-name' = 'com.huawei.flink.DefinedSink'
);
insert into userSink select *from kafkaSource;
- 点赞
- 收藏
- 关注作者
评论(0)