HBase 读写过程中的线程池模型原理实现

举报
阿帕车 发表于 2020/12/30 16:34:57 2020/12/30
【摘要】 hbase socket连接池,hbase 表操作线程池,hbase直接内存泄露


【简介】

随着大数据的发展,HBase已经成为大数据高性能读写场景不可或缺的组件,HBase也也提供了丰富的API,应用开发也相对简单,如何在开发时准确使用API就显得比较重要了,如果使用不准确,往往会与设计者背到而驰,并且造成应用访问抖动、资源泄露等问题,等等……

HBase socket连接池原理实现

首先我们来看下HBase的API文档当中对Connection的定义是:A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.,这句话的理解就是和regionserver(包括HMaster)以及zookeeper直间的连接,总结起来就是如下图:

所以对一个connection实列来讲,如果保持单例模式,和regionserver之间的socket连接最大就是五个,并且每次socket连接就不会再次创建,除非这个链接长时间(默认两分钟)和regionserver之间不进行交互,那么client端会自动关闭这个链接,应用开发时一般使用如下方法创建connection

connection = ConnectionFactory.createConnection(conf);

除了此种方式创建连接外,hbase还可通过ConnectionManager获取连接,不过此种方式就要注意,一定要保持conf对象单例,否则也就是不断创建新的连接

conn = HConnectionManager.getConnection(conf);
static ClusterConnection getConnectionInternal(final Configuration conf)
  throws IOException {
  HConnectionKey connectionKey = new HConnectionKey(conf);
  synchronized (CONNECTION_INSTANCES) {
    HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
    if (connection == null) {
      connection = (HConnectionImplementation)createConnection(conf, true);
      CONNECTION_INSTANCES.put(connectionKey, connection);
    } else if (connection.isClosed()) {
      ConnectionManager.deleteConnection(connectionKey, true);
      connection = (HConnectionImplementation)createConnection(conf, true);
      CONNECTION_INSTANCES.put(connectionKey, connection);
    }
    connection.incCount();
    return connection;
  }
}

HConnectionKey(Configuration conf) {
  Map<String, String> m = new HashMap<String, String>();
  if (conf != null) {
    for (String property : CONNECTION_PROPERTIES) {
      String value = conf.get(property);
      if (value != null) {
        m.put(property, value);
      }
    }
  }
  this.properties = Collections.unmodifiableMap(m);

  try {
    UserProvider provider = UserProvider.instantiate(conf);
    User currentUser = provider.getCurrent();
    if (currentUser != null) {
      username = currentUser.getName();
    }
  } catch (IOException ioe) {
    ConnectionManager.LOG.warn(
        "Error obtaining current user, skipping username in HConnectionKey", ioe);
  }
}

此处这两段代码逻辑是根据传入的conf构建HConnectionKey,然后以HConnectionKey实例为key到连接池Map对象CONNECTION_INSTANCES中去查找connection,如果找到就返回connection,如果找不到就新建,如果找到但已被关闭,就删除再新建;接收conf构造HConnectionKey实例时,其实是将conf配置文件中的属性赋值给HConnectionKey自身的属性,所以使用此种方式时,要保证conf对象不变,new出来的HConnectionKey实例的属性才相同。

HBase 表操作线程接池原理实现
应用端读写hbase时,目前实现都是通过将每次读写操作封装成为一个个task,然后提交到应用端的表操作池中去执行,默认情况下,如果保证connection单例,那么应用将有256个线程、对应每个regionserver5个(由参数hbase.client.ipc.pool.size决定)socket链接,有的人可能会认为,这样会不会造成性能瓶颈,这里256线程是针对所有操作,也就是针对的是整个集群,而5个socket链接是针对单个regionserver,并且和regionserver的socket交互耗时是非常短的,所以一般情况下这个默认配置已经完全够用

相关代码实现原理如下:

public HTableInterface getTable(TableName tableName) throws IOException {
  return getTable(tableName, getBatchPool());
}
private ExecutorService getBatchPool() {
  if (batchPool == null) {
    synchronized (this) {
      if (batchPool == null) {
        this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
            conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
        this.cleanupPool = true;
      }
    }
  }
  return this.batchPool;
}

这里要注意,对于meta表操作来讲 ,默认是10个核心线程,并且线程名也不一样

private ExecutorService getMetaLookupPool() {
  if (this.metaLookupPool == null) {
    synchronized (this) {
      if (this.metaLookupPool == null) {
        //Some of the threads would be used for meta replicas
        //To start with, threads.max.core threads can hit the meta (including replicas).
        //After that, requests will get queued up in the passed queue, and only after
        //the queue is full, a new thread will be started
        this.metaLookupPool = getThreadPool(
           conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
           conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
         "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
      }
    }
  }
  return this.metaLookupPool;
}

HBase 线程池引发的直接内存泄露
JDK对每个线程进行NIO交互时,都会持有一个缓存对象(属于堆外),此对象不限制大小空间,并且随着线程消亡才会释放,线程每次申请直接内存空间时,都是在已有的对象上新增加空间,意味着,此空间可能会无限增大,而hbase的表操作线程池,线程不会消亡,所以此直接内存会不断增大,直到触发了FULL GC,才有可能释放,否则应用就会报直接内存溢出错误。相关源码分析如下:

// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache =
    new ThreadLocal<BufferCache>()
{
    @Override
    protected BufferCache initialValue() {
        return new BufferCache();
    }
};

/**
 * Returns the max size allowed for a cached temp buffers, in
 * bytes. It defaults to Long.MAX_VALUE. It can be set with the
 * jdk.nio.maxCachedBufferSize property. Even though
 * ByteBuffer.capacity() returns an int, we're using a long here
 * for potential future-proofing.
 */
private static long getMaxCachedBufferSize() {
    String s = java.security.AccessController.doPrivileged(
        new PrivilegedAction<String>() {
            @Override
            public String run() {
                return System.getProperty("jdk.nio.maxCachedBufferSize");
            }
        });
    if (s != null) {
        try {
            long m = Long.parseLong(s);
            if (m >= 0) {
                return m;
            } else {
                // if it's negative, ignore the system property
            }
        } catch (NumberFormatException e) {
            // if the string is not well formed, ignore the system property
        }
    }
    return Long.MAX_VALUE;
}

/**
 * Returns a temporary buffer of at least the given size
 */
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    // If a buffer of this size is too large for the cache, there
    // should not be a buffer in the cache that is at least as
    // large. So we'll just create a new one. Also, we don't have
    // to remove the buffer from the cache (as this method does
    // below) given that we won't put the new buffer in the cache.
    if (isBufferTooLarge(size)) {
        return ByteBuffer.allocateDirect(size);
    }

    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);
    }
}

HBase开源社区针对此问题的解决方式是设置-Djdk.nio.maxCachedBufferSize,保证线程级别的buffercache不会持续增大,开源单号如下:https://issues.apache.org/jira/browse/HBASE-19320

值得注意的是HBase2.x以后使用netty框架实现rpc交互,所以就不存在这个问题了,另外JDK上述修复是 jdk8u102 and jdk9版本及以上才能配置上述参数解决
JDK bug介绍如下:https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。