HBase(六) HBase JAVA API - CRUD
CRUD
新增数据
HTable提供了put方法用于插入数据,提供两种方式,单行和多行。每行的插入以Put类实例化。
put(Put put)
put(List
第二个批量方法是一般会使用的,因为性能好的多。
批量方法中的Put可能涉及多行,所以可能部分失败。服务器遍历所有的操作并执行,失败会返回,客户端使用RetriesExhaustedWithDetailsException报告远程错误,用户可以查询有多少个操作失败,出错原因及重试次>数,失败的Put实例会保存在本地的写缓冲区,下次flush的时候会重试。
批量的Put方式是不能保证插入顺序的。
同一行的数据,尽量放在同一个Put实例中。
Put类
Put类有几种构造函数,行健是必选的,行锁、时间戳可选。
Put(byte[] row)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)
Put(byte[] row, RowLock rowLock)
Put(Put putToCopy)
添加数据调用add方法,每次add都可以特定的添加一列数据。
add(byte[] family, byte[] qualifier, byte[] value)
add(byte[] family, byte[] qualifier, long ts, byte[] value)
add(KeyValue kv)
add多次后,可以调用heapSize() 方法,计算当前Put实例所需的堆大小,既包括其中的数据,也包括内部数据结构所需的空间。
Put也提供多个has方法,检查是否存在特定的单元格。
KeyValue 类
KeyValue 类是Hbase的底层数据格式。最长的一个构造函数包含了行健、列族、列、时间戳、type和值,以byte[]的方式存储,包括数组的offset和length,这是为了能提交一个已经存在的byte[]数组,并进行高效率的字节层次的操作。这个类和它的比较器最初都设计为hbase内部使用。客户端API只出现在几个地方,目的是方便用户访问原始数据,避免额外的复制操作。还可以允许基于byte的比较,而不是依赖比较慢的基于类的比较。
KeyValue 实例的类型有Put、Delete、DeleteColumn、DeleteFamily四种。
KeyValue 有一系列实现了Comparator接口的内部类,使用它们做排序处理。KeyValue 将大部分比较器按照静态实例的方式提供:
static KeyValue.KVComparator | COMPARATOR Comparator for plain key/values; i.e. |
static KeyValue.KeyComparator | KEY_COMPARATOR Comparator for plain key; i.e. |
static KeyValue.KVComparator | META_COMPARATOR A KeyValue.KVComparator for .META. catalog table KeyValues. |
static KeyValue.KeyComparator | META_KEY_COMPARATOR A KeyValue.KVComparator for .META. catalog table KeyValue keys. |
static KeyValue.KVComparator | ROOT_COMPARATOR A KeyValue.KVComparator for -ROOT- catalog table KeyValues. |
static KeyValue.KeyComparator | ROOT_KEY_COMPARATOR A KeyValue.KVComparator for -ROOT- catalog table KeyValue keys. |
不在表中的,还有RowComparator、SamePrefixComparator
写缓冲
每次put其实就是一次RPC,为了减少RPC调用的次数,hbase提供了客户端的写缓冲区。
默认写缓冲区不开启,即提交一次put就执行一次。可以通过HTable的setAutoFlush(false)开启。开启后客户端机器崩溃可能造成数据丢失,也就是说,默认的性能差,但是安全性好。
开启缓冲后,虽然可以使用flushCommits()强制把缓存的数据提交到服务端,但是一般是不需要关注的,hbase会统计每个用户添加的实例堆大小,包括追踪数据大小和内部数据结构大小,超过缓存制定的大小限制,就会自动调用flushCommits。可以通过HTable的setWriteBufferSize(long writeBufferSize)方法设置这个大小。为每个HTable实例设置大小特别繁琐,可以在hbase-site.xml写入一个默认的值:
<property>
<name>hbase.client.write.buffer</name>
<value>20971520</value>
</property>
缓存中的数据是按region server归类的,自动提交到各个rs上,过程是透明的
代码示例
Configuration conf = HBaseConfiguration.create();
// 已建好表t1,和列族f1、f2
HTable table = new HTable(conf, "t1");
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes("value1"));
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("f2"), Bytes.toBytes("c2"),
Bytes.toBytes("value2"));
put1.add(kv);
// 单次插入了一行中的两列数据
table.put(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes("value1"));
put2.add(Bytes.toBytes("f2"), Bytes.toBytes("c2"), Bytes.toBytes("value2"));
Put put3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes("value1"));
put3.add(Bytes.toBytes("f2"), Bytes.toBytes("c2"), Bytes.toBytes("value2"));
// 插入一个错误的列族,FF
Put put4 = new Put(Bytes.toBytes("row4"));
put4.add(Bytes.toBytes("FF"), Bytes.toBytes("c1"), Bytes.toBytes("value1"));
List
puts.add(put2);
puts.add(put4);
puts.add(put3);
// 批量插入
try{
table.put(puts);
}catch(RetriesExhaustedWithDetailsException e){
for(int i=0;i<e.getNumExceptions();i++){
System.out.println(e.getHostnamePort(i));
System.out.println(e.getAddress(i));
System.out.println(e.getCause(i));
// out: Column family FF does not exist in region t1 ...
System.out.println(e.getRow(i));
// out: {"totalColumns":1,"families":{"FF":[{"timestamp":9223372036854775807,"qualifier":"c1","vlen":6}]},"row":"row4"}
}
}
table.close();
更新数据
覆盖更新
hbase的单元格是rowkey,family:column,timestamp这三个维度来区分的。
即如果两条记录其rowkey,family:,timestamp一样的话,那么hbase就会认为其是相同的数据,后面插入的数据就会覆盖前面的(内部是墓碑标记的方式),所以这是一种更新方式。
Configuration conf = HBaseConfiguration.create();
// 已建好表t1,和列族f1、f2
HTable table = new HTable(conf, "t1");
long cur = System.currentTimeMillis();
byte[] row = Bytes.toBytes("row1");
byte[] family = Bytes.toBytes("f1");
byte[] col = Bytes.toBytes("c1");
Put put1 = new Put(row, cur);
put1.add(family, col, Bytes.toBytes("value1"));
table.put(put1);
Get get1 = new Get(row);
get1.addColumn(family, col);
System.out.println(Bytes.toString(table.get(get1).getValue(family, col)));
// OUT: value1
Put put2 = new Put(row, cur);
put2.add(family, col, Bytes.toBytes("value2"));
table.put(put2);
System.out.println(Bytes.toString(table.get(get1).getValue(family, col)));
// OUT: value2
table.close();
CAS更新
HTable提供cas方法checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
前面参数都匹配的时候,才去更新这个Put,而且保证过程是原子的。一种特殊的使用方式是,值不存在的时候才修改,指定参数设置为null即可。方法的原子性保证是同行的,只能检查和修改同一行的数据,如果是不同行,会抛出异常。方法返回bool,表示put操作成功还是失败。还有个类似的方法是checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
下面代码承接上面的更新代码:
// 前面的put有时间戳,这里如果是new Put(row),且客户端和服务端时间不一致,可能看不到最新的更新结果
Put put3 = new Put(row,System.currentTimeMillis());
put3.add(family, col, Bytes.toBytes("value3"));
boolean result1 = table.checkAndPut(row, family, col, Bytes.toBytes("value1"), put3);
boolean result2 = table.checkAndPut(row, Bytes.toBytes("f2"), col, Bytes.toBytes("value1"), put3);
boolean result3 = table.checkAndPut(row, family, col, Bytes.toBytes("value2"), put3);
System.out.println(result1+" "+result2+" "+result3);
// out : false false true
删除数据
类似put,hbase的删除也提供了单行删除和批量删除
delete(Delete delete)
delete(List
Delete类
Delete也是行级别的操作。可以删除整行,也可以限制删除。限制删除有两个维度,一个是时间戳,可以在构造函数中设置,或者是setTimestamp(long timestamp)方法设置;一个维度是scheme方面,deleteFamily(byte[] family)删除列族的所有列和时间版本、deleteColumns(byte[] family, byte[] qualifier)特定列的所有版本、deleteColumn(byte[] family, byte[] qualifier)特定列的最新版本(注意这两个命名);每个函数也有各自的时间戳限制版本。
Delete del1 = new Delete(row);
// del1.deleteFamily(family);
del1.deleteColumns(family, col);
// del1.deleteColumn(family, col);
table.delete(del1);
HTable的delete(List
Delete del2 = new Delete(Bytes.toBytes("row4"));
del2.deleteColumn(Bytes.toBytes("NOTEXISTS"), col);
List
dels.add(del1);
dels.add(del2);
try{
table.delete(dels);
}catch(RetriesExhaustedWithDetailsException e){
System.out.println(e.getNumExceptions());
// out: 1
}
System.out.println(dels.size());
// out: 1
查询数据
HTable提供单行get和多行get方法
get(Get get)
get(List
还有个exists(Get get)方法,和get方法类似,但是不会返回数据,只验证是否存在。
getRowOrBefore(byte[] row, byte[] family)方法,支持特殊的查询,要么返回特定的行,不匹配的时候返回特定行之前的那一行(字典顺序),比如表里有两行row1,row2,查询row99会返回row2的结果。列族是必须存在的,否则会抛空指针异常。如果之前的行不存在,如查询aaa,会返回null。
Get类
Get类的用法和Delete类的用法相似,也是两个维度限制查询,只不过一个是查询一个是删除。Get的时间维度又有两个细化,时间戳范围的setTimeRange(long minStamp, long maxStamp)、特定时间戳setTimeStamp(long timestamp)、和特定版本数setMaxVersions(int maxVersions)、所有版本setMaxVersions()(默认版本数是1)。scheme维度,有限制列族addFamily(byte[] family)、限制列addColumn(byte[] family, byte[] qualifier)。
Result类
get方法所有匹配的单元格,会封装在一个Result实例中返回。
本类是非线程安全的。
Result类提供多个维度的查询方法,可以多次查询而不造成额外的消耗,因为查询结果缓存到了客户端本地。
getValue(byte[] family, byte[] qualifier)方法返回特定的单元格,但是不能指定版本,只能得到最新版本的值。value()方法返回第一个列对应的最新单元格的值。
raw()方法返回底层KeyValue实例的数组,也提供size()方法直接查询这个数组的大小。list()方法就是把这个数组包装成list返回。
列维度还有两个方法getColumn(byte[] family, byte[] qualifier)和getColumnLatest(byte[] family, byte[] qualifier),后一个方法是取最新的版本。这两个方法和上面返回byte[]不同,是返回keyValue实例的。
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> getMap()方法比较常用,返回的是Map&family,Map<qualifier,Map<timestamp,value>>>
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
byte[] row = Bytes.toBytes("row1");
byte[] family = Bytes.toBytes("f1");
byte[] col = Bytes.toBytes("c1");
Get get = new Get(row);
get.setMaxVersions();
Result r = table.get(get);
System.out.println(r.size());
System.out.println(Bytes.toString(r.getValue(family, col)));
// Result的toString是逐个调用KeyValue的toString方法
System.out.println(r.toString());
批处理
上面说的多行操作,put(List<Put> puts)、delete(List<Delete> deletes)等,都是基于batch方法实现的,batch有两个方法:
Object[] batch(List<? extends Row> actions)
void batch(List<? extends Row> actions, Object[] results)
Row类就是Put、Get、Delete的父类,批量可以处理这些操作。
batch中的put是没有写缓冲的,全部操作都是同步的,这点与单独的put不同。
batch可以支持跨行操作,不在局限于单行的批处理,但是要注意的是,针对同一行的数据操作要谨慎,如delete和put,因为批量是不保证顺序执行的,可能会产生问题,不要放在同一个批处理中。
batch返回的结果:
null :远程服务通信失败
EmptyResult: Put and Delete成功操作后的返回结果
Result:Get 成功操作后的返回结果,但是如果没有匹配,会返回空的Result
Throwable:服务器异常,返回给用户
样例
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
byte[] row = Bytes.toBytes("row1");
List
Get get = new Get(row);
batch.add(get);
Put put = new Put(row);
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes("value111"));
batch.add(put);
Object[] results = new Object[batch.size()];
try {
table.batch(batch, results);
} catch (Exception e) {
System.out.println(e);
}
for (int i = 0; i < results.length; i++) {
System.out.println("Result["+i+"]: "+results[i]);
}
table.close();
输出:
Result[0]: keyvalues={row1/f1:c1/1440418113056/Put/vlen=8/ts=0, row1/f2:c2/1439370051775/Put/vlen=6/ts=0}
Result[1]: keyvalues=NONE (这里就是EmptyResult实例的打印结果)
行锁
这个机制是使客户端API能够获得一行的锁,保证put、delete、checkAndPut的原子性,一般不会使用显式的行锁,也不建议显式的使用。Put构造函数就会在服务器调用期间创建一个锁,也可以使用显式锁的构造函数。
锁可以设置超时时间,默认是1分钟。超时并恢复锁后,如果继续使用之前申请的锁,会抛出异常。
可以在hbase-site.xml中显式配置,单位是ms
<property>
<name>hbase.regionserver.lease.period</name>
<value>120000</value>
</property>
扫描scan
这个技术是类似Oracle中的游标cursor的,利用了Hbase提供的底层顺序存储的数据结构。
工作方式是类似迭代器的,所以无需创建实例,使用HTable.getScanner()方法即可。
getScanner(byte[] family)
getScanner(byte[] family, byte[] qualifier)
getScanner(Scan scan)
前两个就是隐式的创建了一个scan,然后调用后一个方法。
Scan的构造函数有几种,可以指定起止行,范围是[startRow,stopRow):
Scan()
Scan(byte[] startRow)
Scan(byte[] startRow, byte[] stopRow)
Scan(byte[] startRow, Filter filter)
Scan(Get get)
Scan(Scan scan)
如果不指定start,默认从表的起始位置开始,如果不指定stop,会扫描到表的最后一行结束。参数不是必须精确的匹配,hbase会扫描相等或大于起始行的特定行健作为开始。scan创建后,也可以通过addFamily、addColumn、setMaxVersions、setTimeRange、setTimeStamp等方法增加限制条件。也可以使用filter。
如果不需要某个列族的数据,就不要加到scan中,这样列族对应的存储文件就不会加载,这是列式数据库的一个好处。
ResultScanner
scan不会通过一次RPC取回所有的数据,是以行为单位返回。ResultScanner是scan返回结果的包装,他把每一行数据封装成一个Result实例,并把这些Result封装在一个迭代器中。迭代器有两种工作方式:
Result next()
Result[] next(int nbRows) 如果没有足够的匹配,返回的数组长度会小于指定
ResultScanner有close方法,释放所有扫描控制的资源。一个打开的scan会占用服务器不少资源,尤其是堆内存,所以要注意尽早释放资源并做异常处理。也正是由于这种资源占用,ResultScanner和行锁一样,提供了租约超时保护机制,这个超时是和行锁一个配置。
Scanner caching(面向行)
scanner caching默认是关闭的,也就是说,每次调用next,都会发起一次RPC调用,而next(int nbRows) 也只是程序底层傻傻的循环调用next而已,并不会产生批量的效果。一次请求的数据量越小,这种性能的损耗相对更大。
开启缓存有几种方式,依次优先级越高(后面覆盖前面):
全局默认配置为1行,可以在配置中修改,全局Scan生效
<property>
<name>hbase.client.scanner.caching</name>
<value>10</value>
</property>
表的层次:打开后,这个表的所有Scan实例的缓存都会生效
HTable.setScannerCaching(int scannerCaching)
Scan层次:打开后,只会影响当前的扫描实例。
Scan. setCaching(int caching)
缓存大小的设置,应该在RPC请求次数和客户端/服务端内存消耗间平衡考虑,设置过高,每次next调用会使服务端可能查询更多文件并传输到客户端,会占用更多时间,如果数据超过了堆设置的大小,还会抛出OOM异常。
Batch(面向列)
batch可以控制一次next能取回多少列,解决了超大单行引起的OOM问题。
设置方式:Scan. setBatch(int batch)
如果 列数%batch!=0 , 最后一次返回的Result实例会包含剩下全部的比较少的列。
RPC请求次数计算
RPC请求数 = (行数*每行列数)/ Min(每行列数,batch)/ scanner cache
这个公式不包含打开、关闭scan的请求。
这两个参数是联动处理的,启动batch时,扫描是intra-row的模式。从下图可以看出,batch分片了列,所以一次RPC实际上取了6个Result,每个Result包含3列。
举例,假设一张有两个列族的表,10行数据,每行每列族有10列,考虑单版本。共200单元格。不同cache和batch设置的影响如下:
Cache | Batch | Result个数 | RPC次数 | 说明 |
1 | 1 | 200 | 201 | 每单元格一次,最后一个确认scan完成,下同 |
200 | 1 | 200 | 2 | 单列查询,每个Result包装了一列的值 |
2 | 10 | 20 | 11 | |
5 | 100 | 10 | 3 | |
5 | 20 | 10 | 3 | |
10 | 10 | 20 | 3 |
代码示例
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
// scan 范围 [startRow,stopRow)
Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row3"));
// cache 3行
scan.setCaching(3);
// batch 1列
scan.setBatch(1);
ResultScanner rs = table.getScanner(scan);
for(Result r:rs){
System.out.println(r);
}
// 生产代码注意处理异常
rs.close();
table.close();
- 点赞
- 收藏
- 关注作者
评论(0)