Flink SQL中connector的定义和实现
在FLink SQL中一般是以create Table和connector结合的形式读取外部数据,从而创建table,如下是以JDBC作为connector的创建格式:
CREATE TABLE MyUserTable ( ... ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'name', 'connector.password' = 'password' ... )
其中第一个小括号中的定义的table的各个列及其各列的类型;with后的小括号中的内容用于定义connector的类型,以及连接connector和读取或写入数据所需要的各种设置
对于connector的实现需要遵循Flink中的接口定义以及实现流程
Flink SQL中TableSource和TableSink的创建方式
Flink SQL中TableSource和TableSink的创建方式
在connector中,TableSource和TableSink都是TableFactory通过调用crateStreamTableSource和createStreamTableSink来创建相对应的TableSource和TableSink。而因为在Flink SQL中,对于TableFactory的查找使用的是SPI(Service Provider Interface) 的方法,通过src/main/resources/META-INF.services中的org.apache.flink.table.factories.TableFactory所记录的TableFactory的路径来查找相应的TableFactory,如下图:
当查询到多个Factory后,会通过一定的条件进行筛选,如通过fliterByFactoryClass来过滤掉不是TableFactory的实现类,从而保留符合TableFactory实现类的TableFacotry。其次,通过filterByContext来判断TableFactory中requiredContext()要求的必要属性和用户提供的properties中的属性相匹配的TableFactory,这些属性一般是"connector_type","connector_version",从而选择出需要的TableFactory。然后,通过filterBySupportedProperties方法来判断该TableFactory是否支持用户所提供的所有属性。最终得到和用户属性相匹配的TableFactory。其筛选过滤方法如下图所示:
2.connector中所需要的相应TableFactory的实现
在connector中实现自己的TableFactory时,如果是SourceFactory则根据需要implements StreamTableSourceFactory或者BatchTableSourceFactory,如果是SinkFactory则需要implements StreamTableSinkFactory或BatchTableSinkFactory,如果是SourceSinkFactory则二者都需要实现;后续将以流Factory为例进行讲解。
在实现相应的接口后,需要重写相应的方法:
首先是重写requiredContext()方法,该方法是指当前TableFactory要求用户必须提供的属性名称及其值,它会被上述介绍的filrerByContext()方法调用,从而完成对不同TableFactory的筛选
其次是重写supportedProperties()方法,该方法主要是描述当前TableFactory支持用户提供的属性名称和Schema等,它会被上述介绍的filterBySupportedProperties()方法调用,从而判断用户输入的属性名称是否是该TableFactory所支持的属性
然后需要重写createStreamTableSource(Map<String, String> properties)方法,在该方法中需要实现对所需要的自定义的TableSource的创建,并返回
最后,需要重写createStreamTableSink(Map<String, String> properties)方法,在该方法中需要实现对所需要的自定义的TableSink的创建,并返回
3.connector中TableSource的实现
实现TableFactory后,需要实现所需要的TableSource类
对于TableSource类,需要implements StreamTableSource接口,需要重写以下方法:
重写getDataStream(StreamExecutionEnvironment env)方法,该方法是TableSource的主要方法,是Flink SQL添加获取数据源的方法,用户在其中添加自定义的读取数据源数据的对象
重写getReturnType()方法,该方法是返回所读取数据的名称和类型
重写getTableSchema()方法,该方法是返回table的物理schema
重写equlals(Object o)和hashCode()方法
当然为实现其他功能,TableSource也可实现其他多个接口,但同时需要根据实际实现该接口的方法,例如:DefinedProctimeAttribute、DefinedRowTimeAttribute和ProjectableTableSource等接口
4.connector中TableSink的实现
实现TableFactory后,需要实现所需要的TableSink类
对于TableSink类,需要根据需要实现不同的接口,如AppendStreamTableSink、UpsertStreamTableSink,此处以AppendStreamTableSink为例进行介绍
重写consumeDataStream(DataStream<Row> dataStream)方法,该方法是TableSink中写入数据的重要方法,用户通过在该方法中使用addSink()方法,从而实现将数据写入
重写emitDatatream(DataStream<T> dataStream)方法,该方法后续将被移除,一般直接调用consumeDataStream方法
重写getOutputType()方法,该方法主要是返回输出的类型,可以从schema中获得
重写getFieldNames()和getFieldTypes()方法,这两个方法分别是返回输出的属性名即每列名称和输出的类型。
了解需要实现的方法和接口,即可按照自己的需求来实现一个简单的connector,不过最重要的是getDataStream方法中的addSource()方法和consumeDataStream()方法中的addSink()中需要添加的SourceFunction和SinkFunction对象的实现。
- 点赞
- 收藏
- 关注作者
评论(0)