大数据物流项目:Kudu 操作命令(六)
theme: smartblue
Logistics_Day06:Kudu 的操作命令
03-[掌握]-Java 操作 Kudu之创建Maven Project
首先使用Java Client API操作Kudu数据库,DDL操作(创建表、删除表及修改表)和DML操作(CRUD)。
创建Maven Project设置GAV如下图所示:
创建Maven Module模块,用于编写Java API 操作Kudu,模块GAV设置如下所示:
构建Maven Project工程或Maven Module模块,POM文件添加依赖如下:
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
配置IDEA远程连接虚拟机,方便文件传输和远程命令行基本操作。
04-[掌握]-Java 操作 Kudu之创建KuduClient实例
在使用Java Client API之前,首先包package创建完成,此外,使用Java Client API操作Kudu数据库,需要创建客户端实例对象:
KuduClient
对象。
首先创建KuduClient对象,并且在应用运行结束的时候,需要关闭Client,所以采用JUnit方式构建和关闭。
package cn.itcast.kudu.table;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* 基于Java API对Kudu进行CRUD操作,包含创建表及删除表的操作
*/
public class KuduTableDemo {
// 定声明KuduClient实例对象
private KuduClient kuduClient = null ;
@Before
public void init() {
// KuduMaster地址信息
String masterAddresses = "node2.itcast.cn:7051" ;
// 初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)
// 设置对此Kudu进行操作时超时时间,默认值为30s
.defaultOperationTimeoutMs(10000)
.build();
}
@Test
public void testKuduClient(){
System.out.println(kuduClient);
}
@After
public void close() throws KuduException {
// 测试完成以后,关闭连接
if(null != kuduClient) {
kuduClient.close();
}
}
}
在Kudu提供API中,尤其是Java Client API,构建对象时,比如KuduClient,往往使用
建造者设计模式
,首先创建Builder对象,设置相关属性,最后获取实例对象。
05-[掌握]-Java 操作 Kudu之创建表(Hash分区)
任务:==使用Java Client API在Kudu中创建表==。
create table itcast_users(
id int,
name string,
age byte,
primary key(id)
)
paritition by hash(id) partitions 3
stored as kudu ;
Kudu提供面向对象API,将创建表DDL语句,封装到类中,具体如下图所示:
- 1)、
Schema
,里面存储表的所有列信息(列名称和列类型)- 2)、
CreateTableOptions
,封装表的分区策略,分区数目和副本数
创建测试方法,编写创建表的代码:
/**
* 用于构建Kudu表中每列的字段信息Schema
*
* @param name 字段名称
* @param type 字段类型
* @param isKey 是否为Key
* @return ColumnSchema对象
*/
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
// 创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
// 设置是否为主键
column.key(isKey) ;
// 构建 ColumnSchema
return column.build() ;
}
/**
* 创建Kudu中的表,表的结构如下所示:
create table itcast_users(
id int,
name string,
age byte,
primary key(id)
)
paritition by hash(id) partitions 3
stored as kudu ;
*/
@Test
public void createKuduTable() throws KuduException {
// a. 定义Schema信息,列名称和列类型
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(newColumnSchema("name", Type.STRING, false));
columns.add(newColumnSchema("age", Type.INT8, false));
Schema schema = new Schema(columns) ;
// b. 设置表的属性
CreateTableOptions options = new CreateTableOptions() ;
// 设置分区策略
options.addHashPartitions(Arrays.asList("id"), 3);
// 设置副本数目
options.setNumReplicas(1) ;
// c. 传递参数,创建表
/*
public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
*/
KuduTable kuduTable = kuduClient.createTable("itcast_users", schema, options);
System.out.println("Kudu Table ID = " + kuduTable.getTableId());
}
06-[掌握]-Java 操作 Kudu之删除表
任务:==删除Kudu中表,先判断表是否存在。==
/**
* 判断表是否存在,如果存在,将表删除
*/
@Test
public void dropKuduTable() throws KuduException {
// 判断表是否存在
if(kuduClient.tableExists("itcast_users")){
// 传递表的名称,进行删除
kuduClient.deleteTable("itcast_users") ;
}
}
07-[掌握]-Java 操作 Kudu之插入数据
任务Task:==向Kudu表中插入数据,先插入单条数据,再批量插入。==
- 1)、获取表的句柄:
KuduTable
,通过KuduClient
获取- 2)、插入数据时,创建
Insert
对象,设置每行Row
的值- 3)、当向Kudu表插入数据时,创建会话实例对象
KuduSession
,类似PreparedStatement对象
编写代码,向Kudu表插入数据,步骤如下所示:
/**
* 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
*/
@Test
public void insertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 插入数据,获取Insert对象
Insert insert = kuduTable.newInsert();
// d. 获取Row对象
PartialRow insertRow = insert.getRow();
// 设置值
insertRow.addInt("id", 1001);
insertRow.addString("name", "itcast");
insertRow.addByte("age", (byte)25);
// e. 插入数据
kuduSession.apply(insert);
// f. 关闭连接
kuduSession.close();
}
上面编写代码,完成单条数据插入,接下来,批量插入数据,代码如下所示:
/**
* 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
*/
@Test
public void insertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// 设置手动提交,刷新数据
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// 设置缓存数据量
kuduSession.setMutationBufferSpace(1000);
Random random = new Random();
for(int index = 0; index < 100; index ++){
// c. 插入数据,获取Insert对象
Insert insert = kuduTable.newInsert();
// d. 获取Row对象
PartialRow insertRow = insert.getRow();
// 设置值
insertRow.addInt("id", 100 + index);
insertRow.addString("name", "zhangsan-" + index);
insertRow.addByte("age", (byte)(random.nextInt(10) + 21));
// e. 插入数据
kuduSession.apply(insert);
}
// 手动提交
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
08-[掌握]-Java 操作 Kudu之全量查询数据
任务:==从Kudu表中查询数据,属于全量查询数据==。
注意:从Kudu表加载数据时,思路与HBase不一样,从表的每个Tablet中扫描查询数据,放到迭代器中,最后将所有Tablet查询结果的迭代器放入迭代器中。
编写代码,从Kudu表全量加载数据,注意,遍历查询数据时,进行双层循环获取数据。
/**
* 从Kudu表中全量加载数据
*/
@Test
public void queryKuduData() throws KuduException {
// 1. 获取表的句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// 2. 获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner kuduScanner = scannerBuilder.build();
// 3. 遍历获取的数据
int index = 0 ;
while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取
index += 1;
System.out.println("tablet index = " + index);
// 获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
// 遍历每个Tablet中数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id")
+ ", name = " + rowResult.getString("name")
+ ", age = " + rowResult.getByte("age")
);
}
}
}
09-[掌握]-Java 操作 Kudu之过滤查询数据
任务:在实际项目中,从Kudu加载数据,肯定有过滤条件,接下来实现,==如何进行过滤查询数据==。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n2ycpc4Q-1625969711968)(/img/1615880958773.png)]
使用KuduPlus实现上述过滤条件
分析思路:
/**
* 从Kudu表中全量加载数据
*/
@Test
public void queryKuduData() throws KuduException {
// 1. 获取表的句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// 2. 获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
// TODO: 设置过滤条件
/*
查询id和age两个字段的值,年龄age小于25,id大于150
*/
// TODO: 查询id和age两个字段
scannerBuilder.setProjectedColumnNames(Arrays.asList("id", "age"));
// TODO: 年龄age小于25,id大于150
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("id", Type.INT32, true),
KuduPredicate.ComparisonOp.GREATER,
150
)
);
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("age", Type.INT8, false),
KuduPredicate.ComparisonOp.LESS,
(byte)25
)
);
KuduScanner kuduScanner = scannerBuilder.build();
// 3. 遍历获取的数据
int index = 0 ;
while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取
index += 1;
System.out.println("tablet index = " + index);
// 获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
// 遍历每个Tablet中数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id")
+ ", age = " + rowResult.getByte("age")
);
}
}
}
10-[掌握]-Java 操作 Kudu之更新及删除数据
任务:==向Kudu表中数据进行更新和删除操作==,类似Insert插入数据时操作。
- 1)、更新数据,只能根据
主键
更新数据
/**
* 更新Kudu表中数据
*/
@Test
public void updateKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取更新数据update对象
Update newUpdate = kuduTable.newUpdate();
// 获取Row对象
PartialRow updateRow = newUpdate.getRow();
// 设置更新的数据
updateRow.addInt("id", 153);
updateRow.addString("name", "张三疯");
// e. 更新数据
kuduSession.apply(newUpdate);
// f. 关闭连接
kuduSession.close();
}
在Kudu中,除了提供insert和update插入与更新方法外,开提供:
upsert
,表示当表中主键存在时,更新数据;不存在时,插入数据。实际项目中,建议使用upsert操作。
/**
* 更新Kudu表中数据
*/
@Test
public void upsertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取更新数据update对象
Upsert newUpsert = kuduTable.newUpsert();
// 获取Row对象
PartialRow upsertRow = newUpsert.getRow();
// 设置更新的数据
upsertRow.addInt("id", 253);
upsertRow.addString("name", "张疯");
upsertRow.addByte("age", (byte)50);
// e. 更新数据
kuduSession.apply(newUpsert);
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
对Kudu表数据进行删除时,需要按照主键id删除。
/**
* 删除Kudu表中数据
*/
@Test
public void deleteKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("itcast_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取删除数据对象
Delete newDelete = kuduTable.newDelete();
// 获取Row对象
PartialRow deleteRow = newDelete.getRow();
// 设置主键
deleteRow.addInt("id", 253);
// e. 更新数据
kuduSession.apply(newDelete);
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
- 点赞
- 收藏
- 关注作者
评论(0)