HBase(八) HBase JAVA API - 计数器和协处理器
是否会有这样的场景:有需要测试数据的时候,不知如何生成一些已包含测试数据的文件;或者你是临时需要一个小的程序,可以让你生成不同大小文件(比如大于1Mb少于100Mb),不需要从网络上去搜寻查找如何生成,这里有一些简单的方法帮你偷懒。
计数器
在shell一节看到过incr命令,hbase会保证计数器读取和操作的原子性。 一次只能操作一行中的计数器,可以是一个也可以是多个,多行需要多个调用。 java客户端中,HTable有两种方式操作计数器
                                    
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
                                    
 incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
                                    
 increment(Increment increment)
                                    
writeToWAL参数,默认是true,即WAL生效。前一种提供坐标可以操作单计数器,后面用Increment 实例可以操作单行的多计数器。 Increment 实例需要传入行健,Increment(byte[] row),也有行锁版本的构造函数。
                                    
 增加一个计数器,实际上就是增加一列,使用addColumn(byte[] family, byte[] qualifier, long amount)方法,设置一个计数器操作。Increment 实例不同于Put,是不能设置版本的,但是可以使用setTimeRange(long minStamp, long maxStamp)方法来使老的计数器过期,即一次incr操作会认为这个计数器不存在,并设置为1。
代码示例:
                                    Configuration conf = HBaseConfiguration.create();
                                    
 HTable table = new HTable(conf, "t1");
                                    
 byte[] row = Bytes.toBytes("row0");
                                    
 byte[] family = Bytes.toBytes("f1");
                                    
 byte[] qualifier = Bytes.toBytes("counter");
                                    
 byte[] qualifier2 = Bytes.toBytes("counter2");
                                    
 // 会创建新列counter
                                    
 long cnt1 = table.incrementColumnValue(row, family, qualifier, 1);
                                    
 long cnt2 = table.incrementColumnValue(row, family, qualifier, 3);
                                    
 long cnt3 = table.incrementColumnValue(row, family, qualifier, -2);
                                    
 System.out.println("cnt1: " + cnt1 + " cnt2: " + cnt2 + " cnt3: " + cnt3);
                                    
 // out: cnt1: 1 cnt2: 4 cnt3: 2
                                    
 Increment increment = new Increment(row);
                                    
 // [minStamp, maxStamp) 当前timestamp=1441112747878,所以改为setTimeRange(1,3)就会每次都置为1
                                    
 increment.setTimeRange(3l, 1541112747878l);
                                    
 increment.addColumn(family, qualifier, 1);
                                    
 increment.addColumn(family, qualifier2, 1);
                                    
 Result result = table.increment(increment);
                                    
 for(KeyValue kv:result.raw()){
                                    
 System.out.println(kv.toString()+"||"+Bytes.toLong(kv.getValue()));
                                    
 }
                                    
 table.close();
                                
协处理器
协处理器的概念有点像Oracle的Trigger和PLSQL存储过程,用户可以在region server端进行region级的代码操作。用户不关心具体再哪里执行,hbase分布式框架会隐式处理。使用方式是监听一些隐式的事件,用钩子完成一些操作,或者自己扩展现有的RPC协议来引入自己的调用。使用场景包括维护辅助索引、维护数据间的引用完整性,权限控制等。
                                    
 协处理器最底层的接口是Coprocessor,所有的Coprocessor都可以串起来使用,用法是类似servlet的过滤器的,实际上就是个责任链模式。 基于Coprocessor这个接口,hbase提供两类协处理器接口,就是对应上面说的Trigger和PLSQL存储过程概念:
                                    
 observer:类似Trigger,提供一些hook,在服务端的特定事件发生时触发,这些事件包括用户调用产生的事件,也包含hbase服务端内部事件。 observer有几个基础接口:
                                    
 ü RegionObserver:监听和处理表的region数据修改事件
                                    
 ü MasterObserver:监听和处理集群级的事件,如管理或DDL类操作
                                    
 ü WALObserver:控制WAL的钩子
                                    
 endpoint:类似存储过程。动态扩展RPC协议,把用户自定义的操作添加到服务端。
                                
Coprocessor接口
Coprocessor接口内部定义了一个枚举Coprocessor.State,定义了Coprocessor的生命周期的各个状态。
| UNINSTALLED | 最初状态,没有环境,也没有初始化 | 
| INSTALLED | 实例装载了环境参数 | 
| STARTING | 即将开始操作,start()方法调用前 | 
| ACTIVE | start()方法调用后 | 
| STOPPING | Stop()调用前 | 
| STOPPED | Stop()方法将控制权交给框架 | 
Coprocessor有优先级的概念,重要的级别是PRIORITY_SYSTEM和PRIORITY_USER,前者会在后者前执行。 Coprocessor只定义了两个方法start(CoprocessorEnvironment env) 和stop(CoprocessorEnvironment env) ,框架会控制Coprocessor的生命周期。
                                    
 CoprocessorEnvironment 是Coprocessor生命周期中的环境参数,Coprocessor实例也保存在这个环境参数中,调用CoprocessorEnvironment. getInstance()方法得到这个实例。Coprocessor也只和他的环境参数做交互。协处理器不应该使用HTable实例,而应该使用CoprocessorEnvironment.getTable()方法访问表数据,这是因为Coprocessor不应该对行数据枷锁等原因,这个方法在HTable基础上做了限制。0.94版本还没有什么强制措施这样去限制,目前还是靠开发人员自己去约束。
下图是协处理器在每个region中的执行顺序描述
 
                                RegionObserver
 
                                pending open:
region打开前会调用preOpen(ObserverContext
                                    
 preOpen可以通知框架,控制本次打开操作是否放弃,postOpen可以执行预热或其他操作。
                                            
 pending
                                            
 open和open之间,region server会检查是否从WAL恢复数据到region中,这前后会触发preWALRestore()/ postWALRestore()两个方法。可以控制WAL重做时哪些修改要提交,也可以监督哪些记录被修改。
                                            
 open:
                                            
 region
                                            
 打开后被加载到region
                                            
 server并正常工作时,这个region就是open状态。这里面可以监控客户端API调用事件,如 preGet/postGet,preScannerOpen/postScannerOpen等等,具体可以查看RegionObserver的 API。
                                            
 还有一些服务端事件,如preCompact/postCompact。
                                            
 pending close:
                                            
 preClose(ObserverContext
                                            
 boolean abortRequested)和postClose是对应的hook,其中abortRequested表示了本次是否是正 常关闭,如果是true,表明是abort操作。
                                                
 RegionCoprocessorEnvironment:
                                                
 所有hook方法的第一个参数都是这个接口泛型的ObserverContext,它继承了CoprocessorEnvironment接口的。这个接 口新定义了三个方法,getRegion() 返回当前正在管理的HRegion
                                                
实 例,getRegionServerServices() 返回共享的RegionServerServices
                                                
实 例,getSharedData()返回当前所有coprocessor共享的数据。
                                                
 RegionServerServices
                                                
 实际上就是HRegionServer提供的一些方法,如getWAL() 方法返回HLog实例,可以访问 WAL;getFlushRequester() 方法返回FlushRequester实例,可以用来执行memstore的flush操 作。其他方法可以参考RegionServerServices接口的API。
                                                
 ObserverContext,
                                                
 所有Coprocessor执行时是,这个上下文实例都是一样的,只是包装了不同的 RegionCoprocessorEnvironment,Environment可以通过getEnvironment() 方法返回。
                                                
 ObserverContext还有两个重要的上下文方法是bypass()和complete(),bypass()调用后,框架将使用用户提供的值, 而不使用框架通常使用的值;complete()表示执行链上后面的处理可以跳过,剩下没执行的协处理器也可以跳过,即当前协处理器的响应是最后一个协处理器。
                                                
bypass的一种用法是可以阻止当前事件的处理,如停止region的自动拆分:
												
public void preSplit(ObserverContext
{ 
												
      e.bypass();
                                                
    }; 
												
BaseRegionObserver是采用了swing的类似思路,实现了所有RegionObserver接口的空方法,默认情况下,这个类是没有任何功能的。可以继承后,选择感兴趣的方法进行重载。
                                                
public class RegionObserverDemo extends BaseRegionObserver {
                                                
public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");
                                                
        @Override
                                                
public void preGet(final ObserverContext
                   final List
              if (Bytes.equals(get.getRow(), FIXED_ROW)) {                                               
                                                
                   KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis()));                                                
                                                
                   results.add(kv);                                               
                                                
                   e.bypass();                                                
                                                
              }                                                
                                                
          }                                                
                                                
     }
                                                
     MasterObserver
												
     
                                                
     MasterObserver处理master服务器的所有hook,主要是DDL操作和整体的一些操作。           
                                                
     方法命名也是preXX和postXX的风格,但是有两个特殊的preShutdown(...)和preStopMaster(...),前一个是集群关闭前的hook,后一个是master进程关闭前的hook,这两个是没有对应的post函数的。
                                                
     
                                                
     DDL类的hook有:
                                                
     
                                                
     void preCreateTable(...) / void postCreateTable(...)
                                                
     void preDeleteTable(...) / void postDeleteTable(...)  
                                                
     void preModifyTable(...) / void postModifyTable(...) 
                                                
     void preAddColumn(...) / void postAddColumn(...)
                                                
     void preModifyColumn(...) / void postModifyColumn(...)
                                                
     void preDeleteColumn(...) / void postDeleteColumn(...)
                                                
     
                                                
     集群类操作的有:
                                                
     
                                                
     void preEnableTable(...) / void postEnableTable(...)
                                                
     void preDisableTable(...) / void postDisableTable(...) 
                                                
     void preMove(...) / void postMove(...)
                                                
     void preAssign(...) / void postAssign(...)
                                                
     void preUnassign(...) / void postUnassign(...)
                                                
     void preBalance(...) / void postBalance(...)
                                                
     boolean preBalanceSwitch(...) / void postBalanceSwitch(...)
                                                
     void preShutdown(...)
                                                
     void preStopMaster(...)
                                                
     
                                                
     方法参数结构和RegionObserver是类似的,第一个参数也是ObserverContext,是如preModifyTable(ObserverContext
     
                                                
     MasterServices 实例提供了一些master端的服务实例,这些功能要慎用: 
                                                
     AssignmentManager getAssignmentManager():AssignmentManager实例负责所有的region分配操作,如分配、卸载、负载均衡 
                                                
     MasterFileSystem getMasterFileSystem():MasterFileSystem提供一个与master操作相关的文件系统抽象,如创建表或日志文件的目录
                                                
     ServerManager getServerManager():ServerManager可以访问所有服务,包括激活和去激活状态。
                                                
     ExecutorService getExecutorService():ExecutorService用于master系统调度
                                                
     void checkTableModifiable(byte[] tableName)检查表是否已存在、已离线
                                                
     
                                                
     类似的,也有个BaseMasterObserver辅助类
                                                
     
                                                
     public class MasterObserverDemo extends BaseMasterObserver {
                                                
     
                                                
         @Override
                                                
         public void preEnableTable(ObserverContext
             System.out.println("preEnableTable: "+Bytes.toString(tableName));
                                                
             super.preEnableTable(ctx, tableName);
                                                
         }
                                                
     
                                                
         @Override
                                                
         public void postEnableTable(ObserverContext
             System.out.println("postEnableTable: "+Bytes.toString(tableName));
                                                
             super.postEnableTable(ctx, tableName);
                                                
         }
                                                
     
                                                
         @Override
                                                
         public void preDisableTable(ObserverContext
             System.out.println("preDisableTable: "+Bytes.toString(tableName));
                                                
             super.preDisableTable(ctx, tableName);
                                                
         }
                                                
     }
                                                
     Endpoint
                                                
     
                                                
     实现一个endpoint需要两步:
                                                
     1、  扩展CoprocessorProtocol接口;定义与endpoint的通信协议,其实就是客户端和服务端间的RPC协议。   
                                                
     2、  扩展BaseEndpointCoprocessor类;实现上面自定义协议的接口方法和抽象类BaseEndpointCoprocessor的方法。
                                                
         
                                                
     HTable有几个方法,可以得到CoprocessorProtocol实例,进而与定义的endpoint通信
                                                
     
     
     
     这几个方法要么定义了起止键,要么直接给了行健,这是因为CoprocessorProtocol要和表中的region关联。org.apache.hadoop.hbase.client.coprocessor. Batch.Call是一个回调的接口,因为coprocessorExec方法可能涉及多个region,所以每个匹配的region都会调用一次这个接口的R call(T instance)方法,方法参数就是CoprocessorProtocol实例,可以调用定义好的协议方法。Batch.Callback 
                                                
     定义协议:
                                                
     public interface RowCountProtocol extends CoprocessorProtocol {
                                                
     
                                                
          long getRowCount() throws IOException;
                                                
          long getRowCount(Filter filter) throws IOException;    
                                                
          long getKeyValueCount() throws IOException;   
                                                
     }
     
                                                
     服务端的endpoint实现:
  
                                                
     public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol {  
                                                
          private long getCount(Filter filter, boolean countKeyValues) throws IOException {    
                                                
              Scan scan = new Scan();    
                                                
              scan.setMaxVersions(1);    
                                                
              if (filter != null) {    
                                                
                   scan.setFilter(filter);  
                                                
              }   
                                                
              RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();    
                                                
              // use an internal scanner to perform scanning.     
                                                
              InternalScanner scanner = environment.getRegion().getScanner(scan);
                                                
												
    int result = 0;
                                                
         
                                                
                  try {
                                                
                       List
                       boolean done = false;
                                                
                       do {
                                                
                            curVals.clear();
                                                
                            done = scanner.next(curVals);
                                                
                            result += countKeyValues ? curVals.size() : 1;
                                                
                       } while (done);
                                                
                  } finally {
                                                
                       scanner.close();
                                                
                  }
                                                
                  return result;
                                                
              }
                                                
         
                                                
              @Override
                                                
         
                                                
              public long getRowCount() throws IOException {      
                                                
                  return getRowCount(new FirstKeyOnlyFilter());
                                                
              }
                                                
         
                                                
              @Override
                                                
         
                                                
              public long getRowCount(Filter filter) throws IOException {       
                                                
                  return getCount(filter, false);       
                                                
              }        
                                                
    
                                                
         @Override
                                                
    
                                                
         public long getKeyValueCount() throws IOException {   
                                                
             return getCount(null, true);  
                                                
         }  
                                                
    }
   
                                                
    客户端调用:
                                                
    
                                                
    public static void main(String[] args) throws IOException {   
                                                
          Configuration conf = HBaseConfiguration.create();   
                                                
          HTable table = new HTable(conf, "t1");
    
                                                
          try {  
                                                
               // 这里起止键为空,统计表的所有行   
                                                
               Map
                          new Batch.Call
    // 也可以用Batch.forMethod通过反射方法预先定义Batch.Call,但不够灵活,不展开
                                                
    
                                                
                                @Override
												
                                                
                                public Long call(RowCountProtocol counter) throws IOException {   
                                                
                                      // 这里实际执行endpoint的方法  
                                                
                                      return counter.getRowCount();   
                                                
                                }  
                                                
                          });  
                                                
               long total = 0;    
                                                
               for (Map.Entry
                     total += entry.getValue().longValue(); 
                                                
                     System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue());
                                                
               } 
                                                
               System.out.println("Total Count: " + total);  
                                                
          } catch (Throwable throwable) {   
                                                
               throwable.printStackTrace(); 
                                                
          }   
                                                
          table.close();   
                                                
    }
                                                
    协处理器的加载
  
                                                
    首先要把自定义的代码打成jar上传到各个服务器,并在hbase-env.sh配置classpath   
                                                
    export HBASE_CLASSPATH=/opt/netwatcher/pm4h2/app/opt/hbase_derek.jar     
                                                
    协处理器要在hbase-site.xml中分类配置,多个类用逗号分隔,类的顺序就是执行顺序:  
                                                
    
    
    
    
    
    
    
    
    重启hbase使配置生效。   
                                                
    查看之前的协处理器是否生效:  
                                                
    regionObserver:  
                                                
    hbase(main):001:0> get 't1','@@@GETTIME@@@'  
                                                
    COLUMN                                                       CELL                                                                                                                                                                          
                                                
     @@@GETTIME@@@:@@@GETTIME@@@                                 timestamp=9223372036854775807, value=\x00\x00\x01O\xB4\xFD\x03x                                                                                                                
                                                
    1 row(s) in 0.5840 seconds   
                                                
    masterObserver:
                                                
    hbase(main):001:0> disable 't1'  
                                                
    0 row(s) in 2.6950 seconds  
                                                
    在hbase-acrosspm-master-pmapp1.log日志中可以看到插入的日志:
                                                
    2015-09-10 10:11:46,968 WARN coprocessor.MasterObserverDemo: derek-test======preDisableTable: t1  
                                                
    2015-09-10 10:11:46,986 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Attemping to disable table t1 
                                                
    2015-09-10 10:11:47,000 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Offlining 1 regions.  
                                                
    2015-09-10 10:11:48,011 INFO org.apache.hadoop.hbase.master.handler.DisableTableHandler: Disabled table is done=true   
                                                
    endpoint客户端main函数输出: 
                                                
    Region: t1,,1439347624316.5f2df32e5af7eaedb49133aa6cf65b7e., Count: 5  
                                                
    Total Count: 5
                                
												
												
												
                                
- 点赞
- 收藏
- 关注作者
 
             
           
评论(0)