HBase(七) HBase JAVA API - Filter

举报
大数据小粉 发表于 2016/11/15 19:01:03 2016/11/15
【摘要】 HBase JAVA API - 过滤器Filter

过滤器Filter

过滤器在get和scan的基础上,进行进一步的过滤,如列名、具体值等。Hbase提供了很多自带的实现类,也可以自定义filter。
谓词下推(predicate push down),所有的过滤器都在服务端生效,所以过滤掉的数据不会传到客户端。使用者的自己的代码实现也尽量不要做客户端的过滤。
过滤器每region/scan一个实例


底层接口

通用接口为org.apache.hadoop.hbase.filter.Filter,已有的接口实现中:
大部分实体过滤器类继承自org.apache.hadoop.hbase.filter.FilterBase
还有一组继承自org.apache.hadoop.hbase.filter.CompareFilter,比FilterBase多一个compare()方法
其他的接口实现可以参考Filter接口的API说明

CompareFilter及实现类

CompareFilter需要两个参数,一个是CompareFilter.CompareOp,即比较运算符;一个是WritableByteArrayComparable,即比较器。
语义上,比较过滤器是返回成功匹配的值,和hbase过滤器原有的目的(筛掉无用信息)不同

比较运算符

枚举类型

EQUAL Equals,匹配相等的值,就是字面的意思,下同
GREATER greater than
GREATER_OR_EQUAL greater than or equal to
LESS less than
LESS_OR_EQUAL less than or equal to
NO_OP no operation,就是排除一切值
NOT_EQUAL not equal

比较器

WritableByteArrayComparable类,实现了org.apache.hadoop.io.Writable和Comparable接口。Hbase自带了几个已实现的子类:

比较器 说明
BinaryComparator 使用Bytes.compareTo()比较当前值和阈值
BinaryPrefixComparator 也是Bytes.compareTo()进行匹配,但是从左端开始前缀匹配
BitComparator BitComparator.BitwiseOp枚举,AND/OR/XOR(异或)位级比较
NullComparator 严格上不是比较,就是判断值是不是null
RegexStringComparator 正则表达式的匹配
SubstringComparator 阈值和表中的值看做String,通过contains()匹配字符串

行过滤器RowFilter

基于行健过滤数据,比较过程中,是按照字典顺序排序的,比如筛选小于“row2”的行,会返回row1、row11、row100等,比较常见的避免这种语义上的差别的方法,就是存的时候补位数据。

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");

Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row9"));
Filter f1 = new RowFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("row2")));
scan.setFilter(f1);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println(r);
}
rs.close();
System.out.println("===");
Filter f2 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("row[1,3]"));
scan.setFilter(f2);
ResultScanner rs2 = table.getScanner(scan);
for (Result r : rs2) {
System.out.println(r);
}
rs2.close();
System.out.println("===");
Filter f3 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("ro"));
scan.setFilter(f3);
ResultScanner rs3 = table.getScanner(scan);
for (Result r : rs3) {
System.out.println(r);
}
rs3.close();

table.close();


列族过滤器FamilyFilter

与行过滤器使用方式类似,只不过用来比较列族
new FamilyFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("f2")));

列过滤器QualifierFilter

过滤特定列
new QualifierFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("c2")));

值过滤器ValueFilter

常用的方式是与RegexStringComparator或SubstringComparator配合使用
new ValueFilter(CompareOp.EQUAL,new RegexStringComparator("v*2"));

依赖列过滤器DependentColumnFilter

指定一个列作为基准,过滤其他列,过滤条件是基准列的时间戳。这个过滤器是基于列值进行筛选的,也就是说,可以理解成一个ValueFilter和时间戳过滤器的组合。这个过滤器与scan.setBatch不兼容,因为可能会导致取不到基准列的值。
dropDependentColumn参数可以控制是否丢弃过滤掉的数据,从实测结果来看,基准列本身不会被查出来,除非dropDependentColumn=false
new DependentColumnFilter(Bytes.toBytes("f1"), Bytes.toBytes("c1"), true);
还有有几种可选的构造函数,不同范围的过滤。

DependentColumnFilter()

Should only be used for writable
DependentColumnFilter(byte[] family,byte[] qualifier)

Constructor for DependentColumn filter. Keyvalues where a keyvalue from target column with the same timestamp do not exist will be dropped.
DependentColumnFilter(byte[] family, byte[] qualifier, boolean dropDependentColumn

Constructor for DependentColumn filter. Keyvalues where a keyvalue from target column with the same timestamp do not exist will be dropped.
DependentColumnFilter(byte[] family, byte[] qualifier, boolean dropDependentColumn, CompareFilter.CompareOp valueCompareOp, WritableByteArrayComparable valueComparator)

Build a dependent column filter with value checking dependent column varies will be compared using the supplied compareOp and comparator, for usage of which refer to CompareFilter

FilterBase及实现类

Hbase第二类过滤器是继承自FilterBase,部分过滤器只适用于scan,因为用在get上,会要么包含整行,要么都不包含

单列值过滤器SingleColumnValueFilter

用一列的值判断本行数据是否整体过滤掉。SingleColumnValueFilter使用了比较过滤器类似的参数风格,但是注意,并没有继承关系。
new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("c1"), CompareOp.EQUAL, new RegexStringComparator("v*1*"));
当参考列不存在时,默认这行是包含在结果中的,可以使用setFilterIfMissing方法排除。
默认检查参考列的最新版本,可以使用setLatestVersionOnly(false)方法检查所有版本。

单列排除过滤器SingleColumnValueExcludeFilter

继承SingleColumnValueFilter,略不同的语义是参考列不被包含到结果中。

前缀过滤器PrefixFilter

构造一个前缀,匹配前缀的行会返回客户端。也是按字典顺序查找。一般scan的时候使用。
new PrefixFilter(Bytes.toBytes("row1"));
会返回row1开头的行

分页过滤器PageFilter

指定pageSize参数后,可以对结果进行分页。其实就是过滤返回的行数,下一行的位置需要客户端来维护。一次扫描的结果可能大于分页大小,因为这个过滤器是分别作用于不同的regionserver的,并行执行不能共享他们现在的状态和边界,可能每个server上都获取分页大小的数据。所以客户端程序要处理这种情况,如果需要的话。

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");

final byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(2);
int totalRows = 0;
byte[] lastRow = null;
while (true) {
    System.out.println("=======");
    Scan scan = new Scan();
    scan.setFilter(filter);
    if (lastRow != null) {
        // 加一个最小的增量new byte[] { 0x00 };
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row: " + Bytes.toStringBinary(startRow));
        scan.setStartRow(startRow);
    }
    ResultScanner scanner = table.getScanner(scan);
    int localRows = 0;
    Result result;
    while ((result = scanner.next()) != null) {
        System.out.println(localRows++ + ": " + result);
        totalRows++;
        lastRow = result.getRow();
    }
    scanner.close();
    if (localRows == 0)
        break;
}
System.out.println("total rows: " + totalRows);
table.close();


行健过滤器KeyOnlyFilter

针对只需要key的场景,这个过滤器可以只返回KV中的key,而把value覆写成为空。
构造函数KeyOnlyFilter(boolean lenAsVal) 可以改变覆写策略。无参构造函数默认为false,即覆写为长度为0的字节数组,而设置为true时,value会被覆写为原值长度的字节数组,这个长度可以用来做二次排序或其他场景。

第一个键值过滤器FirstKeyOnlyFilter

这个过滤器只返回每行的第一个KV,排序是Hbase的隐式排序。
一般用在行数统计的场景,因为列式数据库中,某行存在,则这一行必定有列。因为检查完第一列的时候,过滤器框架就会通知region server结束本行的扫描,并跳到下一行,所以比全表扫描有很大的性能提升。

终止行包含过滤器InclusiveStopFilter

scan的范围是[startrow, stoprow),使用这个过滤器可以包含最后一行,同时也定义了scan的stoprow,如下面的代码是从表开始,扫描到row2,且包含row2
Filter f = new InclusiveStopFilter(Bytes.toBytes("row2"));
scan.setFilter(f);
ResultScanner rs = table.getScanner(scan);

版本过滤器TimestampsFilter

命名是时间戳,实际上是版本的控制,如下面代码返回两个特定版本的值
FilterList ts = Arrays.asList(new long [] {1,3});
FilterFilter f = new TimestampsFilter(ts);
Filter也支持和scan的setTimeRange方法联合缩小范围。

列数过滤器ColumnCountGetFilter

限制每行最多取回多少列,列数达到设定的值时,过滤器会停止整个扫描,所以一般不和scan配合使用,更适合get。列数可以直接在构造函数中设置
new ColumnCountGetFilter(2);

列分页过滤器ColumnPaginationFilter

与PageFilter类似的功能,不过是在列上实现数目的限制返回。
ColumnPaginationFilter(int limit, int offset)
构造函数有两个参数,就是返回偏移量在[offset, limit]的列。

列前缀过滤器ColumnPrefixFilter

与PrefixFilter类似,只不过作用在列上,返回所有前缀匹配的列

随机行过滤器RandomRowFilter

结果包含的行是随机的。构造函数RandomRowFilter(float chance) 会传入一个chance,取值范围在0~1,内部是用了Java的Random.nextFloat()方法和chance的比较结果,来决定一行是否过滤掉,所以,如果chance<0则查询结果全部过滤掉,而chance>1则会包含所有结果。
所以这个过滤器一般可以用于采样,参数chance其实就是采样比,数值越大,留下的数据越多。

装饰型过滤器Decorating Filter

这类过滤器采用装饰者模式,可以装饰在其他过滤器上使用。

跳过过滤器SkipFilter

包装一个过滤器F,如果过滤器F检查任何一个KV不满足条件的时候,包装成SkipFilter就会把这个KV的整行过滤掉。被包装的过滤器必须实现filterKeyValue()方法,因为SkipFilter是判断这个方法的结果来决定如何处理这一行的,所以和部分Filter不兼容。后面会有总结。
如下面代码只会返回所有列值都大于value1的行:
Filter f1 = new ValueFilter(CompareOp.GREATER,new BinaryComparator(Bytes.toBytes("value1")));
Filter f = new SkipFilter(f1);

循环匹配过滤器WhileMatchFilter

这个包装后,一旦发现不符合包装过滤器F的条件,就终止scan,这之前的结果回返回客户端。下面的代码,如果不加这个过滤器,会返回row2之外的所有行,加上之后,扫描到row2就停止了,所以只会扫描row2之前的行。
Filter f1 = new RowFilter(CompareOp.NOT_EQUAL,new BinaryComparator(Bytes.toBytes("row2")));
Filter f = new WhileMatchFilter(f1);

自定义过滤器

自定义Filter一般继承FilterBase类,也可以继承Filter接口,前者把后者所有的方法提供了默认实现,按需覆写即可。
Filter接口中有个枚举Filter.ReturnCode,被filterKeyValue()方法用于通知执行框架,决定如何执行下一步。

INCLUDE 结果中包含 KeyValue
INCLUDE_AND_NEXT_COL 包含当前KV,并跳过旧版本跳到下一列
NEXT_COL 跳过这个column,继续处理后面的列。如TimestampsFilter使用了这个返回值
NEXT_ROW 跳过当前行,继续处理后面的行。如RowFilter使用了这个返回值。
SEEK_NEXT_USING_HINT 跳过一系列的值,seek到指定的位置。框架执行getNextKeyHint()决定跳到什么位置,ColumnPrefixFilter使用了这个返回值。
SKIP 跳过这个 KeyValue,并继续处理下面的工作

Filter接口定义了若干方法,在客户端的检索操作的不同阶段调用,按下面顺序执行:
1. filterRowKey(byte[],int,int):返回true,则丢弃此行。
2. filterKeyValue(KeyValue):上面没有被过滤掉,检查KeyValue按照 Filter.ReturnCode处理当前值
3. filterRow(List kvs): 让用户可以访问上两个方法筛选后的KV实例。DependentColumnFilter过滤器用这个方法来过滤与基准列不匹配的数据。
4. filterRow():最后一道判断是否过滤掉行。PageFilter使用当前方法检查一次迭代分页中返回的行数是否达到预期分页大小,如果达到返回true。默认返回false,即结果包含当前行。
5. reset() :迭代扫描中,为每个新行重置过滤器。服务端读一行数据后,此方法被隐式调用。
6. filterAllRemaining():返回true,则整个scan结束。返回false继续执行,主要用户提前结束的优化场景
注意,使用filterRow(List kvs)或filterRow(),必须重载hasRowFilter()方法,并返回true。框架用这个标志保证过滤器和scan操作的各个参数的兼容。当扫描使用batch时,之前方法不会在每次batch操作时调用,而是在当前行数据结束时被调用。

自定义Filter编译成jar包后,上传到region server上,并在hbase-env.sh的HBASE_CLASSPATH配置上jar包的路径。重启hbase生效。
代码样例:

public class CustomFilter extends FilterBase {
private byte[] value = null;
private boolean filterRow = true;

public CustomFilter(byte[] value) {
// 设置要比较的值
this.value = value;
}

@Override
public void reset() {
// 每个新行都重置
this.filterRow = true;
}

@Override
public ReturnCode filterKeyValue(KeyValue kv) {
if (Bytes.compareTo(value, kv.getValue()) == 0) {
// 策略是先包含进来,在filterRow判断是否过滤掉行
filterRow = false;
}
return ReturnCode.INCLUDE;
}

@Override
public boolean filterRow() {
return filterRow;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
// 设定值写入DataOutput中,服务端实例化Filter时可以读到要比较的这个value
Bytes.writeByteArray(dataOutput,this.value);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
// 服务端用这个方法初始化Filter实例,比较值设定进来
this.value = Bytes.readByteArray(dataInput);
}

public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row9"));
Filter filter = new CustomFilter(Bytes.toBytes("value1"));
scan.setFilter(filter);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println(r);
}
table.close();
}
}
运行时报错//TODO
2015-09-10 11:30:08,588 WARN org.apache.hadoop.ipc.HBaseServer: Unable to read call parameters for client 11.13.1.30
java.io.IOException: Error in readFields


FilterList

FilterList也实现了Filter接口,所以使用方式相同。但是FilterList提供的是一种多个过滤器组合的方式使用。有几种构造函数
FilterList(Filter... rowFilters)
FilterList(FilterList.Operator operator)
FilterList(FilterList.Operator operator, Filter... rowFilters)
FilterList(FilterList.Operator operator, List rowFilters)
FilterList(List rowFilters)
核心的参数其实就是两个,一个是组合逻辑FilterList.Operator,一个是需要组合的filter集合。FilterList.Operator是个枚举类型,默认是FilterList.Operator.MUST_PASS_ALL,即所有过滤器都要通过才保留结果。可以改为FilterList.Operator.MUST_PASS_ONE。
可以控制List中的Filter添加顺序去保证过滤器的执行顺序,如使用ArrayList就可以精准的控制过滤器执行顺序是添加顺序。

过滤器总结
Filter Batch[a] Skip[b] While-Match[c] List[d] EarlyOut[e] Gets[f] Scans[g]
RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter
SingleColumnValueFilter
SingleColumnValueExcludeFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
InclusiveStopFilter
TimestampsFilter
ColumnCountGetFilter
ColumnPaginationFilter
ColumnPrefixFilter
RandomRowFilter
SkipFilter ✓/✗[h] ✓/✗[h]
WhileMatchFilter ✓/✗[h] ✓/✗[h]
FilterList ✓/✗[h] ✓/✗[h] ✓/✗[h] ✓/✗

[a] Filter supports Scan.setBatch(), i.e., the scanner batch mode.
[b] Filter can be used with the decorating SkipFilter class.
[c] Filter can be used with the decorating WhileMatchFilter class.
[d] Filter can be used with the combining FilterList class.
[e] Filter has optimizations to stop a scan early, once there are no more matchingrows ahead.
[f] Filter can be usefully applied to Get instances.
[g] Filter can be usefully applied to Scan instances.
[h] Depends on the included filters.

作者 | 林钰鑫

转载请注明出处:华为云博客 https://portal.hwclouds.com/blogs

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。