客快物流大数据项目(四十二):Java代码操作Kudu

举报
Lansonli 发表于 2022/02/20 00:40:21 2022/02/20
【摘要】 目录 Java代码操作Kudu 一、构建maven工程 二、导入依赖 三、​​​​​​​创建包结构 四、​​​​​​​初始化方法 五、​​​​​​​创建表 六、​​​​​​​插入数据 七、​​​​​​​查询数据 八、修改数据 九、​​​​​​​删除数据 十、​​​​​​​修改表 十一、​​​​​​​删除表 ...

目录

Java代码操作Kudu

一、构建maven工程

二、导入依赖

三、​​​​​​​创建包结构

四、​​​​​​​初始化方法

五、​​​​​​​创建表

六、​​​​​​​插入数据

七、​​​​​​​查询数据

八、修改数据

九、​​​​​​​删除数据

十、​​​​​​​修改表

十一、​​​​​​​删除表


Java代码操作Kudu

一、​​​​​​​构建maven工程

二、导入依赖


      <repositories>
          <repository>
              <id>cloudera</id>
              <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
          </repository>
      </repositories>
      <dependencies>
          <dependency>
              <groupId>org.apache.kudu</groupId>
              <artifactId>kudu-client</artifactId>
              <version>1.9.0-cdh6.2.1</version>
          </dependency>
          <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>4.12</version>
          </dependency>
          <dependency>
              <groupId>org.apache.kudu</groupId>
              <artifactId>kudu-client-tools</artifactId>
              <version>1.9.0-cdh6.2.1</version>
          </dependency>
          <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
          <dependency>
              <groupId>org.apache.kudu</groupId>
              <artifactId>kudu-spark2_2.11</artifactId>
              <version>1.9.0-cdh6.2.1</version>
          </dependency>
          <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
          <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-sql_2.11</artifactId>
              <version>2.1.0</version>
          </dependency>
      </dependencies>
  
 

三、​​​​​​​创建包结构

包名

说明

cn.it

代码所在的包目录

 

四、​​​​​​​初始化方法


      package cn.it;
      import org.apache.kudu.ColumnSchema;
      import org.apache.kudu.Type;
      import org.apache.kudu.client.KuduClient;
      import org.junit.Before;
      public class TestKudu {
         //定义KuduClient客户端对象
         private static KuduClient kuduClient;
         //定义表名
         private static String tableName = "person";
         /**
       * 初始化方法
       */
         @Before
         public void init() {
             //指定master地址
             String masterAddress = "node2.cn";
             //创建kudu的数据库连接
              kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
          }
         //构建表schema的字段信息
         //字段名称 数据类型 是否为主键
         public ColumnSchema newColumn(String name, Type type, boolean isKey) {
              ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
              column.key(isKey);
             return column.build();
          }
      }
  
 

五、​​​​​​​创建表


      /** 使用junit进行测试
       *
       * 创建表
       * @throws KuduException
       */
      @Test
      public void createTable() throws KuduException {
         //设置表的schema
          List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
          columns.add(newColumn("CompanyId", Type.INT32, true));
          columns.add(newColumn("WorkId", Type.INT32, false));
          columns.add(newColumn("Name", Type.STRING, false));
          columns.add(newColumn("Gender", Type.STRING, false));
          columns.add(newColumn("Photo", Type.STRING, false));
         Schema schema = new Schema(columns);
         //创建表时提供的所有选项
         CreateTableOptions tableOptions = new CreateTableOptions();
         //设置表的副本和分区规则
          LinkedList<String> list = new LinkedList<String>();
          list.add("CompanyId");
         //设置表副本数
          tableOptions.setNumReplicas(1);
         //设置range分区
         //tableOptions.setRangePartitionColumns(list);
         //设置hash分区和分区的数量
          tableOptions.addHashPartitions(list, 3);
         try {
              kuduClient.createTable("person", schema, tableOptions);
          } catch (Exception e) {
              e.printStackTrace();
          }
          kuduClient.close();
      }
  
 

 

六、​​​​​​​插入数据


      /**
       * 向表中加载数据
       * @throws KuduException
       */
      @Test
      public void loadData() throws KuduException {
         //打开表
         KuduTable kuduTable = kuduClient.openTable(tableName);
         //创建KuduSession对象 kudu必须通过KuduSession写入数据
         KuduSession kuduSession = kuduClient.newSession();
         //采用flush方式 手动刷新
          kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
          kuduSession.setMutationBufferSpace(3000);
         //准备数据
         for(int i=1; i<=10; i++){
             Insert insert = kuduTable.newInsert();
             //设置字段的内容
              insert.getRow().addInt("CompanyId",i);
              insert.getRow().addInt("WorkId",i);
              insert.getRow().addString("Name","lisi"+i);
              insert.getRow().addString("Gender","male");
              insert.getRow().addString("Photo","person"+i);
              kuduSession.flush();
              kuduSession.apply(insert);
          }
          kuduSession.close();
          kuduClient.close();
      }
  
 

七、​​​​​​​查询数据


       /**
       * 查询表数据
       * @throws KuduException
       */
      @Test
      public void queryData() throws KuduException {
         //打开表
         KuduTable kuduTable = kuduClient.openTable(tableName);
         //获取scanner扫描器
          KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
         KuduScanner scanner = scannerBuilder.build();
         //遍历
         while(scanner.hasMoreRows()){
             RowResultIterator rowResults = scanner.nextRows();
             while (rowResults.hasNext()){
                 RowResult result = rowResults.next();
                 int companyId = result.getInt("CompanyId");
                 int workId = result.getInt("WorkId");
                 String name = result.getString("Name");
                 String gender = result.getString("Gender");
                 String photo = result.getString("Photo");
                  System.out.print("companyId:"+companyId+" ");
                  System.out.print("workId:"+workId+" ");
                  System.out.print("name:"+name+" ");
                  System.out.print("gender:"+gender+" ");
                  System.out.println("photo:"+photo);
              }
          }
         //关闭
          scanner.close();
          kuduClient.close();
      }
  
 

 

八、修改数据


      /**
       * 修改数据
       * @throws KuduException
       */
      @Test
      public void upDATEData() throws KuduException {
         //打开表
         KuduTable kuduTable = kuduClient.openTable(tableName);
         //构建kuduSession对象
         KuduSession kuduSession = kuduClient.newSession();
         //设置刷新数据模式,自动提交
          kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
         //更新数据需要获取UpDATE对象
         UpDATE upDATE = kuduTable.newUpDATE();
         //获取row对象
         PartialRow row = upDATE.getRow();
         //设置要更新的数据信息
          row.addInt("CompanyId",1);
          row.addString("Name","kobe");
         //操作这个upDATE对象
          kuduSession.apply(upDATE);
          kuduSession.close();
      }
  
 

 

九、​​​​​​​删除数据


      /**
       * 删除表中的数据
       */
      @Test
      public void deleteData() throws KuduException {
         //打开表
         KuduTable kuduTable = kuduClient.openTable(tableName);
         KuduSession kuduSession = kuduClient.newSession();
         //获取Delete对象
         Delete delete = kuduTable.newDelete();
         //构建要删除的行对象
         PartialRow row = delete.getRow();
         //设置删除数据的条件
          row.addInt("CompanyId",2);
          kuduSession.flush();
          kuduSession.apply(delete);
          kuduSession.close();
          kuduClient.close();
      }
  
 

 

十、​​​​​​​修改表


      package cn.it.kudu;
      import org.apache.kudu.ColumnSchema;
      import org.apache.kudu.Type;
      import org.apache.kudu.client.*;
      import org.junit.Before;
      import org.junit.Test;
      import java.util.List;
      /**
       * 修改表操作
       */
      public class AlterTable {
         //定义kudu的客户端对象
         private static KuduClient kuduClient;
         //定义一张表名称
         private static String tableName = "person";
         /**
       * 初始化操作
       */
         @Before
         public void init() {
             //指定kudu的master地址
             String masterAddress = "node2.cn";
             //创建kudu的数据库连接
              kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
          }
         /**
       * 添加列
       */
         @Test
         public void alterTableAddColumn() {
             AlterTableOptions alterTableOptions = new AlterTableOptions();
              alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
             try {
                  kuduClient.alterTable(tableName, alterTableOptions);
              } catch (KuduException e) {
                  e.printStackTrace();
             }
          }
         /**
       * 删除列
       */
         @Test
         public void alterTableDeleteColumn(){
             AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
             try {
                  kuduClient.alterTable(tableName, alterTableOptions);
              } catch (KuduException e) {
                  e.printStackTrace();
             }
          }
         /**
       * 添加分区列
       */
         @Test
         public void alterTableAddRangePartition(){
             int lowerValue = 110;
             int upperValue = 120;
             try {
                 KuduTable kuduTable = kuduClient.openTable(tableName);
                  List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
                 boolean flag = true;
                 for (Partition rangePartition : rangePartitions) {
                     int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
                     if(startKey == lowerValue){
                          flag = false;
                      }
                  }
                 if(flag) {
                     PartialRow lower = kuduTable.getSchema().newPartialRow();
                      lower.addInt("Id", lowerValue);
                     PartialRow upper = kuduTable.getSchema().newPartialRow();
                      upper.addInt("Id", upperValue);
                      kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
                  }else{
                      System.out.println("分区已经存在,不能重复创建!");
                  }
              } catch (KuduException e) {
                  e.printStackTrace();
              } catch (Exception exception) {
                  exception.printStackTrace();
              }
          }
         /**
       * 删除表
       * @throws KuduException
       */
         @Test
         public void dropTable() throws KuduException {
              kuduClient.deleteTable(tableName);
          }
      }
  
 

十一、​​​​​​​删除表


      /**
       * 删除表
       */
      @Test
      public void dropTable() throws KuduException {
         //删除表
         DeleteTableResponse response = kuduClient.deleteTable(tableName);
         //关闭客户端连接
          kuduClient.close();
      }
  
 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

 

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123012993

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。