Flink SQL中connector的定义和实现

举报
潇湘暮雨 发表于 2020/09/28 19:52:23 2020/09/28
【摘要】 在FLink SQL中一般是以create Table和connector结合的形式读取外部数据,从而创建table,如下是以JDBC作为connector的创建格式:CREATE TABLE MyUserTable ( ...) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:33...

在FLink SQL中一般是以create Table和connector结合的形式读取外部数据,从而创建table,如下是以JDBC作为connector的创建格式:

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 
  'connector.table' = 'jdbc_table_name',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'name',
  'connector.password' = 'password'
  ...
)

其中第一个小括号中的定义的table的各个列及其各列的类型;with后的小括号中的内容用于定义connector的类型,以及连接connector和读取或写入数据所需要的各种设置

对于connector的实现需要遵循Flink中的接口定义以及实现流程

  1. Flink SQL中TableSource和TableSink的创建方式

  • 在connector中,TableSource和TableSink都是TableFactory通过调用crateStreamTableSource和createStreamTableSink来创建相对应的TableSource和TableSink。而因为在Flink SQL中,对于TableFactory的查找使用的是SPI(Service Provider Interface) 的方法,通过src/main/resources/META-INF.services中的org.apache.flink.table.factories.TableFactory所记录的TableFactory的路径来查找相应的TableFactory,如下图:

image.png
  • 当查询到多个Factory后,会通过一定的条件进行筛选,如通过fliterByFactoryClass来过滤掉不是TableFactory的实现类,从而保留符合TableFactory实现类的TableFacotry。其次,通过filterByContext来判断TableFactory中requiredContext()要求的必要属性和用户提供的properties中的属性相匹配的TableFactory,这些属性一般是"connector_type","connector_version",从而选择出需要的TableFactory。然后,通过filterBySupportedProperties方法来判断该TableFactory是否支持用户所提供的所有属性。最终得到和用户属性相匹配的TableFactory。其筛选过滤方法如下图所示:

image.png

2.connector中所需要的相应TableFactory的实现

  • 在connector中实现自己的TableFactory时,如果是SourceFactory则根据需要implements StreamTableSourceFactory或者BatchTableSourceFactory,如果是SinkFactory则需要implements StreamTableSinkFactory或BatchTableSinkFactory,如果是SourceSinkFactory则二者都需要实现;后续将以流Factory为例进行讲解。

  • 在实现相应的接口后,需要重写相应的方法:

    1. 首先是重写requiredContext()方法,该方法是指当前TableFactory要求用户必须提供的属性名称及其值,它会被上述介绍的filrerByContext()方法调用,从而完成对不同TableFactory的筛选

    2. 其次是重写supportedProperties()方法,该方法主要是描述当前TableFactory支持用户提供的属性名称和Schema等,它会被上述介绍的filterBySupportedProperties()方法调用,从而判断用户输入的属性名称是否是该TableFactory所支持的属性

    3. 然后需要重写createStreamTableSource(Map<String, String> properties)方法,在该方法中需要实现对所需要的自定义的TableSource的创建,并返回

    4. 最后,需要重写createStreamTableSink(Map<String, String> properties)方法,在该方法中需要实现对所需要的自定义的TableSink的创建,并返回

3.connector中TableSource的实现

  • 实现TableFactory后,需要实现所需要的TableSource类

  • 对于TableSource类,需要implements StreamTableSource接口,需要重写以下方法:

    1. 重写getDataStream(StreamExecutionEnvironment env)方法,该方法是TableSource的主要方法,是Flink SQL添加获取数据源的方法,用户在其中添加自定义的读取数据源数据的对象

    2. 重写getReturnType()方法,该方法是返回所读取数据的名称和类型

    3. 重写getTableSchema()方法,该方法是返回table的物理schema

    4. 重写equlals(Object o)和hashCode()方法

  • 当然为实现其他功能,TableSource也可实现其他多个接口,但同时需要根据实际实现该接口的方法,例如:DefinedProctimeAttribute、DefinedRowTimeAttribute和ProjectableTableSource等接口

4.connector中TableSink的实现

  • 实现TableFactory后,需要实现所需要的TableSink类

  • 对于TableSink类,需要根据需要实现不同的接口,如AppendStreamTableSink、UpsertStreamTableSink,此处以AppendStreamTableSink为例进行介绍

    1. 重写consumeDataStream(DataStream<Row> dataStream)方法,该方法是TableSink中写入数据的重要方法,用户通过在该方法中使用addSink()方法,从而实现将数据写入

    2. 重写emitDatatream(DataStream<T> dataStream)方法,该方法后续将被移除,一般直接调用consumeDataStream方法

    3. 重写getOutputType()方法,该方法主要是返回输出的类型,可以从schema中获得

    4. 重写getFieldNames()和getFieldTypes()方法,这两个方法分别是返回输出的属性名即每列名称和输出的类型。

    了解需要实现的方法和接口,即可按照自己的需求来实现一个简单的connector,不过最重要的是getDataStream方法中的addSource()方法和consumeDataStream()方法中的addSink()中需要添加的SourceFunction和SinkFunction对象的实现。


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。