Flink JDBC Connector sink源码简单阅读

举报
潇湘暮雨 发表于 2021/08/09 20:58:55 2021/08/09
【摘要】 对于connector的介绍之前已经讲解,此处就不再详细结果其运行流程和使用方式,而是简单学习下jdbc connector中sink的源码,而由于源码较多因此我们只挑选重点部分进行研究学习。1      JDBCTableSourceSinkFactory:JDBC支持的配置属性参数在flink sql中,所有的connector的使用都是从factory开始的,而factory的创建是通...

对于connector的介绍之前已经讲解,此处就不再详细结果其运行流程和使用方式,而是简单学习下jdbc connectorsink的源码,而由于源码较多因此我们只挑选重点部分进行研究学习。

1      JDBCTableSourceSinkFactoryJDBC支持的配置属性参数

flink sql中,所有的connector的使用都是从factory开始的,而factory的创建是通过java spi创建的,在factory主要有几个重要的方法起着重要的作用。

首先supportedProperties方法,这里记录着jdbc connector所支持的所有的with参数和相应的schema,从下面代码中可以看到jdbc所支持的各种情况下的各种参数,如作为souce、作为sink和作为维表。

@Override

public List<String> supportedProperties() {

   List<String> properties = new ArrayList<>();



   // common options

   properties.add(CONNECTOR_DRIVER);

   properties.add(CONNECTOR_URL);

   properties.add(CONNECTOR_TABLE);

   properties.add(CONNECTOR_USERNAME);

   properties.add(CONNECTOR_PASSWORD);



   // scan options

   properties.add(CONNECTOR_READ_PARTITION_COLUMN);

   properties.add(CONNECTOR_READ_PARTITION_NUM);

   properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);

   properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND);

   properties.add(CONNECTOR_READ_FETCH_SIZE);



   // lookup options

   properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS);

   properties.add(CONNECTOR_LOOKUP_CACHE_TTL);

   properties.add(CONNECTOR_LOOKUP_MAX_RETRIES);



   // sink options

   properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS);

   properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL);

   properties.add(CONNECTOR_WRITE_MAX_RETRIES);



   // schema

   properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);

   properties.add(SCHEMA + ".#." + SCHEMA_TYPE);

   properties.add(SCHEMA + ".#." + SCHEMA_NAME);

   // computed column

   properties.add(SCHEMA + ".#." + EXPR);



   // watermark

   properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);

   properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);

   properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);



   return properties;

}

对于sink来说,其次的重要方法就是createStreamTableSink,它主要是创建所需的TableSink,其具体使用参见代码注释:

@Override

public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {

   DescriptorProperties descriptorProperties = getValidatedProperties(properties);
//获取创建的sink表的schema

   TableSchema schema = TableSchemaUtils.getPhysicalSchema(

      descriptorProperties.getTableSchema(SCHEMA));

   // 创建builder,并根据用户的with参数设置相应的表名,url,sql方言、驱动、账号和密码

   final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder()

      .setOptions(getJDBCOptions(descriptorProperties))

      .setTableSchema(schema);



//若用户配置了最大刷新条数、刷新间隔时间以及写入失败时的最大尝试次数,则进行设置

   descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize);

   descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent(

      s -> builder.setFlushIntervalMills(s.toMillis()));

   descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);



   return builder.build();//最后,创建JDBCUpsertTableSink

}

2      JDBCUpsertTableSink

factory中创建相应的TableSink后,flink会调用JDBCUpsertTableSink中的consumeDataStream方法来添加相应的sink函数,在这里就是JDBCUpsertSinkFunction函数,而这个函数也就是JDBC的核心,它主要负责将数据写入数据库中。

@Override

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

   return dataStream

         .addSink(new JDBCUpsertSinkFunction(newFormat()))

         .setParallelism(dataStream.getParallelism())

         .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()))

         .disableChaining();

}

JDBCUpsertFunction中我们可以看到JDBCUpsertSinkFunction主要继承了RichSinkFunction类,而继承该类主要实现了一下几个重要方法,其中open主要是进行一些初始化操作,invoke是每条到达的数据都会调用,主要负责对到达的数据进行处理。而在这里我们可以看到在JDBCUpsertSinkFunction中只有一个outputFormat属性,而无论是open函数,还是invoke方法,其最终的实际操作者都是outputFormat,即所有到达的数据都是由outputFormat操作写入。因此我们的重点查看对象应该是outputFormat。而从上面代码可以看到,outputFormat又是在JDBCUpsertTableSink中创建JDBCUpsertSinkFunction时通过构造函数传入,而outputFormat又是通过newFormat方法创建生成,因此我们最终要重点关注的应该是在newFormat中创建的JDBCUpsertOutputFormat类型的对象。

@Override

public void open(Configuration parameters) throws Exception {

   super.open(parameters);

   RuntimeContext ctx = getRuntimeContext();

   outputFormat.setRuntimeContext(ctx);

   outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());

}



@Override

public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

   outputFormat.writeRecord(value);

}
@Override

public void close() throws Exception {

   outputFormat.close();

}

 

3      JDBCUpsertOutputFormat:数据的实际操作者

正如上述所说,在sink function实际处理数据前,首先需要调用open方法进行相应的初始化操作,而这些操作又是最终调用outputFormatopen方法,因此我们需要了解的是outputFormatopen方法中进行的初始化操作。其实际进行的操作如下注释:

/**

 * Connects to the target database and initializes the prepared statement.

 *

 * @param taskNumber The number of the parallel instance.

 * @throws IOException Thrown, if the output could not be opened due to an

 * I/O problem.

 */

@Override

public void open(int taskNumber, int numTasks) throws IOException {

   try {
//使用sql的驱动、账号和密码创建相应的数据库的connection

      establishConnection();

      objectReuse = getRuntimeContext().getExecutionConfig().isObjectReuseEnabled();
//根据用户sql代码中是否配置了主键进行判断进入不同的处理逻辑,创建所需要的不同的writer
//这里主要是两种:1.只能够进行插入操作的 2.能够进行插入、更新、删除操作的

      if (keyFields == null || keyFields.length == 0) {
//利用获取的表名和字段名,创建相应的插入语句,这里即为
//insert into tableName(column1, column2……)values(?,?……)

         String insertSQL = dialect.getInsertIntoStatement(tableName, fieldNames);
//若没有主键,则我们这里只进行插入操作,而不会进行其他的更新以及删除操作,因此这里会创建一个append模式的writer

         jdbcWriter = new AppendOnlyWriter(insertSQL, fieldTypes);

      } else {

         jdbcWriter = UpsertWriter.create(

            dialect, tableName, fieldNames, fieldTypes, keyFields, excludeUpdateColumns);

      }
//对writer进行初始化
//如果为appendOnlyWriter,则根据上述的insert into tableName(column1, column2……)values(?,?……)语句和传入的connection创建插入语句的preparestatement
//如果为UpsertWriter,则根据insert语句insert into tableName(column1,column2……) values(?,?......) on duplicate key update column1=values(),column2=values()……和delete语句delete from tableName whrer key1=? And key2 = ?......以及传入的connection创建相应的statement

      jdbcWriter.open(connection);

   } catch (SQLException sqe) {

      throw new IllegalArgumentException("open() failed.", sqe);

   } catch (ClassNotFoundException cnfe) {

      throw new IllegalArgumentException("JDBC driver class not found.", cnfe);

   }



//若用户配置了刷新时间,则启动一个定时线程,定时间隔统一刷新数据到数据库中

   if (flushIntervalMills != 0 && flushMaxSize != 1) {

      this.scheduler = Executors.newScheduledThreadPool(

            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));

      this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {

         synchronized (JDBCUpsertOutputFormat.this) {

            if (closed) {

               return;

            }

            try {

               flush();

            } catch (Exception e) {

               flushException = e;

            }

         }

      }, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);

   }

}

进行初始化后,当数据到达时会调用JDBCUpsertSinkFunctioninvoke方法对数据进行相应处理。从2JDBCUpsertSinkFunction中的invoke方法,我们可以看到当数据到达时会调用outputFormatwriteRecord方法,因此这里我们需要继续看下JDBCUpsertOutputFormatwriteRecord方法。

@Override

public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOException {

   checkFlushException();



   try {

      Tuple2<Boolean, Row> record = objectReuse ? new Tuple2<>(tuple2.f0, Row.copy(tuple2.f1)) : tuple2;

      jdbcWriter.addRecord(record);

      batchCount++;

      if (batchCount >= flushMaxSize) {

         flush();

      }

   } catch (Exception e) {

      throw new RuntimeException("Writing records to JDBC failed.", e);

   }

}

从上述方法可以看出,当数据到达时,会调用在open初始化创建的writeraddRecord方法。这里我们在学习下两种writeaddRecord方法。

首先是AppendOnlyWriter类型,从代码中可以看出会取出记录中的两个属性值,第一个是一个boolean类型,并进行验证,若为false则会抛出错误,这是因为appendOnlyWriter只支持append模式即插入操作,而当该值为false时,需要进行的是delete操作,因此需要抛出错误。

第二个值就是用户写入的实际数据,而这里做的操作是调用cachedRowsadd方法,而通过上下文可以看出cacheRows是一个ArrayList,因此addRecord方法主要是将用户的数据放入list中。

@Override

public void addRecord(Tuple2<Boolean, Row> record) {

   checkArgument(record.f0, "Append mode can not receive retract/delete message.");

   cachedRows.add(record.f1);

}

其次,我们要看的是另一种类型的writerUpsertWriteraddRecord方法。其代码如下,从代码中我们可以看出,该方法进行的也是一种存入数据的操作。从该类的上下文可以看出与appendOnlyWriter不同的是,keyToRows是一个map类型,即所有的记录都会放入map中。同时这里是以用户实际数据的主键值作为key,而将包含标识写入数据方式的数据作为value,即value里第一个参数是标识数据的操作即true或者false,而第二个参数是用户的实际数据。

public void addRecord(Tuple2<Boolean, Row> record) {

   // add records to buffer

   keyToRows.put(getPrimaryKey(record.f1), record);

}

了解完相应的addRecord方法后,我们继续回到上述JDBCOutputFormatwriteRecord方法中,从上述addRecord方法我们可以看出用户的实际数据都是暂时存储一批。而在writeRecord方法中,我们可以看到放入一条数据,都会将batchCount加一,即统计当前放入数据的数据个数。如果当前放置个数达到用户设置的最大刷新大小,将会调用flush方法进行刷新操作,将数据写入数据库中,此举是为了实现数据的批写入,避免频繁写入影响性能。

public synchronized void flush() throws Exception {

   checkFlushException();



   for (int i = 1; i <= maxRetryTimes; i++) {

      try {

         jdbcWriter.executeBatch();

         batchCount = 0;

         break;

      } catch (SQLException e) {

         LOG.error("JDBC executeBatch error, retry times = {}", i, e);

         if (e.getMessage().contains("retrieve transaction read-only status")

            || e.getMessage().contains("statement closed")) {

            resetConnection();

            jdbcWriter.resetConnection(connection);

         }

         if (i >= maxRetryTimes) {

            throw e;

         }

         Thread.sleep(1000 * i);

      }

   }

}

当数据需要进行刷新时,会调用flush方法。从上述方法中我们可以看到,实际调用的是初始化时创建的writerexecuteBatch方法。这里又是和上述的addRecord一样,不同类型的writer调用不同的executeBatch方法。

首先我们先看下较为简单的appendOnlyWriterexecuteBatch方法。在该方法中,首先进行的是循环遍历放入list中的数据,在遍历时为之前根据sql语句创建的statement中的占位符赋值,最后再支持该statement,实现批量插入数据。

@Override

public void executeBatch() throws SQLException {

   if (cachedRows.size() > 0) {

      for (Row row : cachedRows) {

         setRecordToStatement(statement, fieldTypes, row);

         statement.addBatch();

      }

      statement.executeBatch();

      cachedRows.clear();

   }

}

其次,我们再看下UpsertWriterexecuteBatch方法。它也是循环遍历放入map中的数据,获取每条数据的主键值,然后根据value中的第一个参数的不同进行不同的处理。如果第一个参数为true,则书名进行的是插入或者更新操作,则为已经创建的statement中的占位符赋值。如果为false,则进行的delete操作,为已经创建的delete语句的statement中的占位符赋值。最后执行相应的statement。如果没有对占位符进行赋值,则及时执行statement,也不会产生任何操作,任何影响。

@Override

public void executeBatch() throws SQLException {

   if (keyToRows.size() > 0) {

      for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {

         Row pk = entry.getKey();

         Tuple2<Boolean, Row> tuple = entry.getValue();

         if (tuple.f0) {

            processOneRowInBatch(pk, tuple.f1);

         } else {

            setRecordToStatement(deleteStatement, pkTypes, pk);

            deleteStatement.addBatch();

         }

      }

      internalExecuteBatch();

      deleteStatement.executeBatch();

      keyToRows.clear();

   }

}

至此,jdbc connectorsink运行代码初步简单学习完毕。其中很多地方只是进行简单的讲述,并没有深入,例如在创建upsertWriter是实际创建的是UpsertWriterUsingUpsertStatement,同时执行UpsertWriterexecuteBatch调用processOneRowInBatch时,会调用UpsertWriterUpsertStatementprocessOneRowInBatch进行处理。

作者才疏学浅,难免有所错误和遗漏,若有错误,请谅解!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。