大数据ClickHouse(十九):Flink 写入 ClickHouse API

举报
Lansonli 发表于 2022/09/04 00:29:15 2022/09/04
【摘要】 ​Flink 写入 ClickHouse API可以通过Flink原生JDBC Connector包将Flink结果写入ClickHouse中,Flink在1.11.0版本对其JDBC Connnector进行了重构:重构之前(1.10.x 及之前版本),包名为 flink-jdbc 。重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc 。二者对 Flin...

​Flink 写入 ClickHouse API

可以通过Flink原生JDBC Connector包将Flink结果写入ClickHouse中,Flink在1.11.0版本对其JDBC Connnector进行了重构:

  • 重构之前(1.10.x 及之前版本),包名为 flink-jdbc 。
  • 重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc 。

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

API名称

flink-jdbc

flink-connector-jdbc

DataStream

不支持

支持

Table API

支持

不支持

一、Flink 1.10.x之前版本使用flink-jdbc,只支持Table API

  • 示例

1、maven中需要导入以下包

<!--添加 Flink Table API 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

2、代码

/**
  *  通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
  *
  *  注意:
  *   1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  *   2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
  */

case class PersonInfo(id:Int,name:String,age:Int)

object FlinkWriteToClickHouse1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
    env.setParallelism(1)
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //读取Socket中的数据
    val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
    val ds: DataStream[PersonInfo] = sourceDS.map(line => {
      val arr: Array[String] = line.split(",")
      PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
    })

    //将 ds 转换成 table 对象
    import org.apache.flink.table.api.scala._
    val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)

    //将table 对象写入ClickHouse中
    //需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
    val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //准备ClickHouse table sink
    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl("jdbc:clickhouse://node1:8123/default")
      .setUsername("default")
      .setPassword("")
      .setQuery(insertIntoCkSql)
      .setBatchSize(2) //设置批次量,默认5000条
      .setParameterTypes(Types.INT, Types.STRING, Types.INT)
      .build()

    //注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
    tableEnv.registerTableSink("ck-sink",
      sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))

    //将数据插入到 ClickHouse Sink 中
    tableEnv.insertInto(table,"ck-sink")

    //触发以上执行
    env.execute("Flink Table API to ClickHouse Example")

  }
}


二、Flink 1.11.x之后版本使用flink-connector-jdbc,只支持DataStream API

  • 示例

1、在Maven中导入以下依赖包

<!-- Flink1.11 后需要 Flink-client包-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>


2、代码

/**
  *  Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
  */
object FlinkWriteToClickHouse2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1
    env.setParallelism(1)
    import org.apache.flink.streaming.api.scala._

    val ds: DataStream[String] = env.socketTextStream("node5",9999)

    val result: DataStream[(Int, String, Int)] = ds.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    })

    //准备向ClickHouse中插入数据的sql
    val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //设置ClickHouse Sink
    val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
      //插入数据SQL
      insetIntoCkSql,

      //设置插入ClickHouse数据的参数
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
          ps.setInt(1, tp._1)
          ps.setString(2, tp._2)
          ps.setInt(3, tp._3)
        }
      },
      //设置批次插入数据
      new JdbcExecutionOptions.Builder().withBatchSize(5).build(),

      //设置连接ClickHouse的配置
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUrl("jdbc:clickhouse://node1:8123/default")
        .withUsername("default")
        .withUsername("")
        .build()
    )

    //针对数据加入sink
    result.addSink(ckSink)

    env.execute("Flink DataStream to ClickHouse Example")

  }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。