DLI Flink OpenSource SQL中用户自定义sink的使用

举报
潇湘暮雨 发表于 2021/09/30 09:05:56 2021/09/30
【摘要】         在DLI的Flink OpenSource SQL语句中针对用户不同的使用场景,为用户提供了较多的connector以供用户使用,如DWS connector、kafka connector、RDS connector、Hbase connector和elasticsearch connector等。虽然这些connector足够应付大多数主流使用场景,但是仍然有可能不能够满...
        在DLI的Flink OpenSource SQL语句中针对用户不同的使用场景,为用户提供了较多的connector以供用户使用,如DWS connector、kafka connector、RDS connector、Hbase connector和elasticsearch connector等。虽然这些connector足够应付大多数主流使用场景,但是仍然有可能不能够满足部分用户的使用场景,比如用户需要将输出结果输出到日志中,或者DLI没有提供相应connector的服务中。然而,DLI虽然想要提供所有的connector以满足所有的用户使用场景,但是由于人力等问题,无法实现。因此,为了满足这些无法使用DLI提供connector的用户的使用场景,我们提供了用户自定义connector。用户使用自定义connector时,只需要实现相应的sink或source函数即可,从而可以实现更大的灵活性。这里我们以自定义sink为例进行介绍。
1.自定义sink的用法       
        我们首先介绍自定义sink的用法。我们从DLI的官网可以参考https://support.huaweicloud.com/sqlref-flink-dli/dli_08_0347.html,其具体内容如下表所示。从表中我们可以看到,该connector在with中只支持三个属性。第一个字段为connector.type,其为必选,且值只能为user-defined。第二个字段属性为connector.class-name,它也为必选,这个字段的值是用户提供的sink函数的全限定类名。而最后一个字段则为connector.class-parameter,这个值不是必选,主要是作为用户定义的sink函数的构造函数传入。而同时,如果用户的sink函数的构造函数需要传入该参数值,则构造函数的参数只能为一个,且只能为字符串类型,因此用户如果需要传入多个值,则需要拼接成一个字符串并作为connector.class-parameter字段的值,之后再在sink函数的构造函数中进行分解、转换,从而得到需要的类型参数。了解了自定义sink的用法,我们知道首先需要做的就是编写自己需要的sink函数,下面我们将以sink函数的编写为重点进行介绍。
表1 参数说明

参数

是否必选

说明

connector.type

只能为user-defined,表示使用自定义的sink。

connector.class-name

sink函数的全限定类名。

connector.class-parameter

sink函数其构造函数的参数,只支持一个String类型的参数。


2.sink函数原理

      为编写支持upsert流的sink函数,则该sink函数需要继承RichSinkFunction(该类必须继承或其他提供相应功能的Flink类),并实现CheckpointedFunction(该接口可由用户选择是否使用)接口,RichSinkFunction主要用于对到达数据的处理和写入,CheckpointedFunction主要用于进行checkpoint时进行相应的处理。这两个类和接口的继承关系如下:

1632915714711-jnx.png

1632915719500-n2u.png

3.具体实现方式
3.1    RichSinkFunction中以下几个方法需要重点重写
  • 重写open(Configuration)方法,该方法主要是进行相应的初始化操作,如hdfs sink中使用参数和相应环境的配置初始化
  • 重写invoke(In, Context)方法,当每条数据到达时会调用该方法,主要在sink中用于对每条到达数据的处理
  • 重写close()方法,该方法主要用于对资源等的释放
3.2    CheckpointedFunction中以下几个方法需要重点重写(该接口不是必须实现)
  • 重写snapshotState(FunctionSnapshotContext)方法,该方法主要用于进行checkpoint时用户需要进行的相应操作,每当checkpoint执行的时候,snapshotState会被调用,通常用于flush、commit、synchronize外部系统
  • 重写initializeState(FunctionInitializationContext)方法,该方法主要用于进行一些初始化操作,初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,通常用来初始化state,以及处理state recovery的逻辑
4.使用示例
基于上面的介绍,我们实现一个简单的输出数据到日志中的sink函数,该函数名为DefinedSink,假设这个类在包com.huawei.flink下。
首先,我们需要在pom文件中添加相应的依赖。
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.10.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

其后,我们开始实现sink函数。根据上述的描述,如果我们需要将数据输出到日志中,则我们只需要在invoke方法中,将每条数据利用日志记录即可。每次到达的数据都是一个Tuple2类型的,它有两个字段,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,当为false时,可不进行任何操作。第二个值表示实际的数据值。基于此,我们做了进一步的改进,即当要插入或者更新该数据时,我们在其前面输出了insert or update字段。当要删除该数据时,我们在其前面加了delete字段已作区分,方便用于调试。因此,其具体实现如下述代码

package com.huawei.flink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefinedSink extends RichSinkFunction<Tuple2<Boolean, Row>> {
    Logger logger = LoggerFactory.getLogger(getClass());
	// 初始化
    @Override
    public void open(Configuration parameters) throws Exception {
    }

    @Override
    // in包括两个值,其中第一个值为布尔型,为true或false,当true时表示插入或更新操作,为false时表示删除操作,若对接的sink端不支持删除等操作,
    //当为false时,可不进行任何操作。第二个值表示实际的数据值
    public void invoke(Tuple2<Boolean, Row> in, Context context) throws Exception {
        boolean flag = in.f0;
        if (flag) {
            logger.info("insert or update:" + in.f1.toString());
        } else {
            logger.info("delete:" + in.f1.toString());
        }
    }

    @Override
    public void close() throws Exception {
    }
}

代码编写完成后,我们只需要使用mvn clean install命令进行打包即可。之后将该jar作为udf在Flink OpenSource SQL作业编辑页面中上传即可。之后即可通过用户自定义connector使用该sink。 这里,我们以kafka作为source,自定义sink作为sink,从kafa中读取数据,输出到日志中。当作业成功运行后,并且向kafka中插入多个字符串"defined-sinkTest"后,我们在taskManager的日志中可以看到类似于下图结果。

create table kafkaSource(
  attr string
) with (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'topicName',
  'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'connector.properties.group.id' = 'test_definedsink',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'csv'
);

create table userSink(
  attr string
) with (
  'connector.type' = 'user-defined',
  'connector.class-name' = 'com.huawei.flink.DefinedSink'
);

insert into userSink select *from kafkaSource;

1632917860728-21l.png

5.可能存在的问题
若出现Caused by: com.huawei.flink.userdefined.error.UserDefinedParameterException: The class of user defined construct fail.,请确认sink函数的构造函数是否正确。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。