数据湖(六):Hudi与Flink整合

举报
Lansonli 发表于 2022/06/03 23:21:35 2022/06/03
【摘要】 文章目录 Hudi与Flink整合 一、maven pom.xml导入如下包 二、Flink 写入数据到Hudi代码 Hudi与Flink整合 Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoi...

Hudi与Flink整合

Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoint,至少有5次checkpoint后才能看到对应hudi中的数据。

但是应该是有一些问题,目前问题如下:

  • 在本地执行Flink代码向Flink写数据时,存在“java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract”错误信息,预计是hudi版本支持问题。
  • 写入到Flink中的数据,如果使用Flink读取出来,会有对应的错误:“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”,这个错误主要是由于上一个错误导致Hudi中没有commit信息,在内部读取时,读取不到Commit信息导致。


一、maven pom.xml导入如下包


  
   
    
     
    
    
     
      <properties>
     
    
   
    
     
    
    
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     
    
   
    
     
    
    
         <maven.compiler.source>1.8</maven.compiler.source>
     
    
   
    
     
    
    
         <maven.compiler.target>1.8</maven.compiler.target>
     
    
   
    
     
    
    
         <flink.version>1.12.1</flink.version>
     
    
   
    
     
    
    
     
      </properties>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      <dependencies>
     
    
   
    
     
    
    
         <!-- Flink操作Hudi需要的包-->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.hudi</groupId>
     
    
   
    
     
    
    
             <artifactId>hudi-flink-bundle_2.11</artifactId>
     
    
   
    
     
    
    
             <version>0.8.0</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-clients_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- java 开发Flink所需依赖 -->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-java</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-streaming-java_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- Flink 开发Scala需要导入以下依赖 -->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-scala_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-streaming-scala_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- 读取hdfs文件需要jar包-->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
         <groupId>org.apache.hadoop</groupId>
     
    
   
    
     
    
    
         <artifactId>hadoop-client</artifactId>
     
    
   
    
     
    
    
         <version>2.9.2</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
         <!-- Flink 状态管理 RocksDB 依赖 -->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- Flink Kafka连接器的依赖 -->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-connector-kafka_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-csv</artifactId>
     
    
   
    
     
    
    
             <version>1.12.1</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- Flink SQL & Table-->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-table-planner_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         <!-- Flink SQL中使用Blink 需要导入的包-->
     
    
   
    
     
    
    
         <dependency>
     
    
   
    
     
    
    
             <groupId>org.apache.flink</groupId>
     
    
   
    
     
    
    
             <artifactId>flink-table-planner-blink_2.11</artifactId>
     
    
   
    
     
    
    
             <version>${flink.version}</version>
     
    
   
    
     
    
    
         </dependency>
     
    
   
    
     
    
    
     
      </dependencies>
     
    
  


二、Flink 写入数据到Hudi代码


  
   
    
     
    
    
     
      //1.创建对象
     
    
   
    
     
    
    
         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     
    
   
    
     
    
    
         val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()
     
    
   
    
     
    
    
     
          .useBlinkPlanner().inStreamingMode().build())
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         import org.apache.flink.streaming.api.scala._
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据,不然只有一个.hoodie目录。
     
    
   
    
     
    
    
     
          env.enableCheckpointing(2000)
     
    
   
    
     
    
    
     
      // env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //3.设置并行度
     
    
   
    
     
    
    
     
          env.setParallelism(1)
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //4.读取Kakfa 中的数据
     
    
   
    
     
    
    
     
          tableEnv.executeSql(
     
    
   
    
     
    
    
           """
     
    
   
    
     
    
    
     
       | create table kafkaInputTable(
     
    
   
    
     
    
    
     
       | id varchar,
     
    
   
    
     
    
    
     
       | name varchar,
     
    
   
    
     
    
    
     
       | age int,
     
    
   
    
     
    
    
     
       | ts varchar,
     
    
   
    
     
    
    
     
       | loc varchar
     
    
   
    
     
    
    
     
       | ) with (
     
    
   
    
     
    
    
     
       | 'connector' = 'kafka',
     
    
   
    
     
    
    
     
       | 'topic' = 'test_tp',
     
    
   
    
     
    
    
     
       | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
     
    
   
    
     
    
    
     
       | 'scan.startup.mode'='latest-offset',
     
    
   
    
     
    
    
     
       | 'properties.group.id' = 'testgroup',
     
    
   
    
     
    
    
     
       | 'format' = 'csv'
     
    
   
    
     
    
    
     
       | )
     
    
   
    
     
    
    
     
       """.stripMargin)
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         val table: Table = tableEnv.from("kafkaInputTable")
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //5.创建Flink 对应的hudi表
     
    
   
    
     
    
    
     
          tableEnv.executeSql(
     
    
   
    
     
    
    
           """
     
    
   
    
     
    
    
     
       |CREATE TABLE t1(
     
    
   
    
     
    
    
     
       | id VARCHAR(20) PRIMARY KEY NOT ENFORCED,--默认主键列为uuid,这里可以后面跟上“PRIMARY KEY NOT ENFORCED”指定为主键列
     
    
   
    
     
    
    
     
       | name VARCHAR(10),
     
    
   
    
     
    
    
     
       | age INT,
     
    
   
    
     
    
    
     
       | ts VARCHAR(20),
     
    
   
    
     
    
    
     
       | loc VARCHAR(20)
     
    
   
    
     
    
    
     
       |)
     
    
   
    
     
    
    
     
       |PARTITIONED BY (loc)
     
    
   
    
     
    
    
     
       |WITH (
     
    
   
    
     
    
    
     
       | 'connector' = 'hudi',
     
    
   
    
     
    
    
     
       | 'path' = '/flink_hudi_data',
     
    
   
    
     
    
    
     
       | 'write.tasks' = '1', -- default is 4 ,required more resource
     
    
   
    
     
    
    
     
       | 'compaction.tasks' = '1', -- default is 10 ,required more resource
     
    
   
    
     
    
    
     
       | 'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
     
    
   
    
     
    
    
     
       |)
     
    
   
    
     
    
    
     
       """.stripMargin)
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //6.向表中插入数据
     
    
   
    
     
    
    
     
          tableEnv.executeSql(
     
    
   
    
     
    
    
           s"""
     
    
   
    
     
    
    
     
       | insert into t1 select id,name,age,ts,loc from ${table}
     
    
   
    
     
    
    
     
       """.stripMargin)
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
          env.execute()
     
    
  

以上代码需要注意“PRIMARY KEY NOT ENFORCED”可以不指定,如果不指定hudi对应的主键列默认是“uuid”,指定后可以使用自定义的列名当做主键。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。