Flink JDBC Connector sink源码简单阅读
对于connector的介绍之前已经讲解,此处就不再详细结果其运行流程和使用方式,而是简单学习下jdbc connector中sink的源码,而由于源码较多因此我们只挑选重点部分进行研究学习。
1 JDBCTableSourceSinkFactory:JDBC支持的配置属性参数
在flink sql中,所有的connector的使用都是从factory开始的,而factory的创建是通过java spi创建的,在factory主要有几个重要的方法起着重要的作用。
首先supportedProperties方法,这里记录着jdbc connector所支持的所有的with参数和相应的schema,从下面代码中可以看到jdbc所支持的各种情况下的各种参数,如作为souce、作为sink和作为维表。
@Override
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();
// common options
properties.add(CONNECTOR_DRIVER);
properties.add(CONNECTOR_URL);
properties.add(CONNECTOR_TABLE);
properties.add(CONNECTOR_USERNAME);
properties.add(CONNECTOR_PASSWORD);
// scan options
properties.add(CONNECTOR_READ_PARTITION_COLUMN);
properties.add(CONNECTOR_READ_PARTITION_NUM);
properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND);
properties.add(CONNECTOR_READ_FETCH_SIZE);
// lookup options
properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS);
properties.add(CONNECTOR_LOOKUP_CACHE_TTL);
properties.add(CONNECTOR_LOOKUP_MAX_RETRIES);
// sink options
properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS);
properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL);
properties.add(CONNECTOR_WRITE_MAX_RETRIES);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
// computed column
properties.add(SCHEMA + ".#." + EXPR);
// watermark
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
return properties;
}
对于sink来说,其次的重要方法就是createStreamTableSink,它主要是创建所需的TableSink,其具体使用参见代码注释:
@Override
public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
DescriptorProperties descriptorProperties = getValidatedProperties(properties);
//获取创建的sink表的schema
TableSchema schema = TableSchemaUtils.getPhysicalSchema(
descriptorProperties.getTableSchema(SCHEMA));
// 创建builder,并根据用户的with参数设置相应的表名,url,sql方言、驱动、账号和密码
final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder()
.setOptions(getJDBCOptions(descriptorProperties))
.setTableSchema(schema);
//若用户配置了最大刷新条数、刷新间隔时间以及写入失败时的最大尝试次数,则进行设置
descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize);
descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent(
s -> builder.setFlushIntervalMills(s.toMillis()));
descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
return builder.build();//最后,创建JDBCUpsertTableSink
}
2 JDBCUpsertTableSink
在factory中创建相应的TableSink后,flink会调用JDBCUpsertTableSink中的consumeDataStream方法来添加相应的sink函数,在这里就是JDBCUpsertSinkFunction函数,而这个函数也就是JDBC的核心,它主要负责将数据写入数据库中。
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream
.addSink(new JDBCUpsertSinkFunction(newFormat()))
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()))
.disableChaining();
}
在JDBCUpsertFunction中我们可以看到JDBCUpsertSinkFunction主要继承了RichSinkFunction类,而继承该类主要实现了一下几个重要方法,其中open主要是进行一些初始化操作,invoke是每条到达的数据都会调用,主要负责对到达的数据进行处理。而在这里我们可以看到在JDBCUpsertSinkFunction中只有一个outputFormat属性,而无论是open函数,还是invoke方法,其最终的实际操作者都是outputFormat,即所有到达的数据都是由outputFormat操作写入。因此我们的重点查看对象应该是outputFormat。而从上面代码可以看到,outputFormat又是在JDBCUpsertTableSink中创建JDBCUpsertSinkFunction时通过构造函数传入,而outputFormat又是通过newFormat方法创建生成,因此我们最终要重点关注的应该是在newFormat中创建的JDBCUpsertOutputFormat类型的对象。
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
outputFormat.writeRecord(value);
}
@Override
public void close() throws Exception {
outputFormat.close();
}
3 JDBCUpsertOutputFormat:数据的实际操作者
正如上述所说,在sink function实际处理数据前,首先需要调用open方法进行相应的初始化操作,而这些操作又是最终调用outputFormat的open方法,因此我们需要了解的是outputFormat在open方法中进行的初始化操作。其实际进行的操作如下注释:
/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
//使用sql的驱动、账号和密码创建相应的数据库的connection
establishConnection();
objectReuse = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled();
//根据用户sql代码中是否配置了主键进行判断进入不同的处理逻辑,创建所需要的不同的writer
//这里主要是两种:1.只能够进行插入操作的 2.能够进行插入、更新、删除操作的
if (keyFields == null || keyFields.length == 0) {
//利用获取的表名和字段名,创建相应的插入语句,这里即为
//insert into tableName(column1, column2……)values(?,?……)
String insertSQL = dialect.getInsertIntoStatement(tableName, fieldNames);
//若没有主键,则我们这里只进行插入操作,而不会进行其他的更新以及删除操作,因此这里会创建一个append模式的writer
jdbcWriter = new AppendOnlyWriter(insertSQL, fieldTypes);
} else {
jdbcWriter = UpsertWriter.create(
dialect, tableName, fieldNames, fieldTypes, keyFields, excludeUpdateColumns);
}
//对writer进行初始化
//如果为appendOnlyWriter,则根据上述的insert into tableName(column1, column2……)values(?,?……)语句和传入的connection创建插入语句的preparestatement
//如果为UpsertWriter,则根据insert语句insert into tableName(column1,column2……) values(?,?......) on duplicate key update column1=values(),column2=values()……和delete语句delete from tableName whrer key1=? And key2 = ?......以及传入的connection创建相应的statement
jdbcWriter.open(connection);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
//若用户配置了刷新时间,则启动一个定时线程,定时间隔统一刷新数据到数据库中
if (flushIntervalMills != 0 && flushMaxSize != 1) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (JDBCUpsertOutputFormat.this) {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
}
}
进行初始化后,当数据到达时会调用JDBCUpsertSinkFunction的invoke方法对数据进行相应处理。从2的JDBCUpsertSinkFunction中的invoke方法,我们可以看到当数据到达时会调用outputFormat的writeRecord方法,因此这里我们需要继续看下JDBCUpsertOutputFormat的writeRecord方法。
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {
checkFlushException();
try {
Tuple2<Boolean, Row> record = objectReuse ? new Tuple2<>(tuple2.f0, Row.copy(tuple2.f1)) : tuple2;
jdbcWriter.addRecord(record);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
从上述方法可以看出,当数据到达时,会调用在open初始化创建的writer的addRecord方法。这里我们在学习下两种write的addRecord方法。
首先是AppendOnlyWriter类型,从代码中可以看出会取出记录中的两个属性值,第一个是一个boolean类型,并进行验证,若为false则会抛出错误,这是因为appendOnlyWriter只支持append模式即插入操作,而当该值为false时,需要进行的是delete操作,因此需要抛出错误。
第二个值就是用户写入的实际数据,而这里做的操作是调用cachedRows的add方法,而通过上下文可以看出cacheRows是一个ArrayList,因此addRecord方法主要是将用户的数据放入list中。
@Override
public void addRecord(Tuple2<Boolean, Row> record) {
checkArgument(record.f0, "Append mode can not receive retract/delete message.");
cachedRows.add(record.f1);
}
其次,我们要看的是另一种类型的writer即UpsertWriter的addRecord方法。其代码如下,从代码中我们可以看出,该方法进行的也是一种存入数据的操作。从该类的上下文可以看出与appendOnlyWriter不同的是,keyToRows是一个map类型,即所有的记录都会放入map中。同时这里是以用户实际数据的主键值作为key,而将包含标识写入数据方式的数据作为value,即value里第一个参数是标识数据的操作即true或者false,而第二个参数是用户的实际数据。
public void addRecord(Tuple2<Boolean, Row> record) {
// add records to buffer
keyToRows.put(getPrimaryKey(record.f1), record);
}
了解完相应的addRecord方法后,我们继续回到上述JDBCOutputFormat的writeRecord方法中,从上述addRecord方法我们可以看出用户的实际数据都是暂时存储一批。而在writeRecord方法中,我们可以看到放入一条数据,都会将batchCount加一,即统计当前放入数据的数据个数。如果当前放置个数达到用户设置的最大刷新大小,将会调用flush方法进行刷新操作,将数据写入数据库中,此举是为了实现数据的批写入,避免频繁写入影响性能。
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (e.getMessage().contains("retrieve transaction read-only status")
|| e.getMessage().contains("statement closed")) {
resetConnection();
jdbcWriter.resetConnection(connection);
}
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}
当数据需要进行刷新时,会调用flush方法。从上述方法中我们可以看到,实际调用的是初始化时创建的writer的executeBatch方法。这里又是和上述的addRecord一样,不同类型的writer调用不同的executeBatch方法。
首先我们先看下较为简单的appendOnlyWriter的executeBatch方法。在该方法中,首先进行的是循环遍历放入list中的数据,在遍历时为之前根据sql语句创建的statement中的占位符赋值,最后再支持该statement,实现批量插入数据。
@Override
public void executeBatch() throws SQLException {
if (cachedRows.size() > 0) {
for (Row row : cachedRows) {
setRecordToStatement(statement, fieldTypes, row);
statement.addBatch();
}
statement.executeBatch();
cachedRows.clear();
}
}
其次,我们再看下UpsertWriter的executeBatch方法。它也是循环遍历放入map中的数据,获取每条数据的主键值,然后根据value中的第一个参数的不同进行不同的处理。如果第一个参数为true,则书名进行的是插入或者更新操作,则为已经创建的statement中的占位符赋值。如果为false,则进行的delete操作,为已经创建的delete语句的statement中的占位符赋值。最后执行相应的statement。如果没有对占位符进行赋值,则及时执行statement,也不会产生任何操作,任何影响。
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}
至此,jdbc connector的sink运行代码初步简单学习完毕。其中很多地方只是进行简单的讲述,并没有深入,例如在创建upsertWriter是实际创建的是UpsertWriterUsingUpsertStatement,同时执行UpsertWriter的executeBatch调用processOneRowInBatch时,会调用UpsertWriterUpsertStatement的processOneRowInBatch进行处理。
作者才疏学浅,难免有所错误和遗漏,若有错误,请谅解!
- 点赞
- 收藏
- 关注作者
评论(0)