HBase 读写过程中的线程池模型原理实现
【摘要】 hbase socket连接池,hbase 表操作线程池,hbase直接内存泄露
HBase socket连接池原理实现
首先我们来看下HBase的API文档当中对Connection的定义是:A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.,这句话的理解就是和regionserver(包括HMaster)以及zookeeper直间的连接,总结起来就是如下图:
connection = ConnectionFactory.createConnection(conf);
conn = HConnectionManager.getConnection(conf);
static ClusterConnection getConnectionInternal(final Configuration conf)
throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf);
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = (HConnectionImplementation)createConnection(conf, true);
CONNECTION_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {
ConnectionManager.deleteConnection(connectionKey, true);
connection = (HConnectionImplementation)createConnection(conf, true);
CONNECTION_INSTANCES.put(connectionKey, connection);
return connection;
HConnectionKey(Configuration conf) {
Map<String, String> m = new HashMap<String, String>();
if (conf != null) {
for (String property : CONNECTION_PROPERTIES) {
String value = conf.get(property);
if (value != null) {
m.put(property, value);
this.properties = Collections.unmodifiableMap(m);
try {
UserProvider provider = UserProvider.instantiate(conf);
User currentUser = provider.getCurrent();
if (currentUser != null) {
username = currentUser.getName();
} catch (IOException ioe) {
"Error obtaining current user, skipping username in HConnectionKey", ioe);
HBase 表操作线程接池原理实现
public HTableInterface getTable(TableName tableName) throws IOException {
return getTable(tableName, getBatchPool());
private ExecutorService getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
this.cleanupPool = true;
return this.batchPool;
这里要注意,对于meta表操作来讲 ,默认是10个核心线程,并且线程名也不一样
private ExecutorService getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
//Some of the threads would be used for meta replicas
//To start with, threads.max.core threads can hit the meta (including replicas).
//After that, requests will get queued up in the passed queue, and only after
//the queue is full, a new thread will be started
this.metaLookupPool = getThreadPool(
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
"-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
return this.metaLookupPool;
HBase 线程池引发的直接内存泄露
JDK对每个线程进行NIO交互时,都会持有一个缓存对象(属于堆外),此对象不限制大小空间,并且随着线程消亡才会释放,线程每次申请直接内存空间时,都是在已有的对象上新增加空间,意味着,此空间可能会无限增大,而hbase的表操作线程池,线程不会消亡,所以此直接内存会不断增大,直到触发了FULL GC,才有可能释放,否则应用就会报直接内存溢出错误。相关源码分析如下:
// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache =
new ThreadLocal<BufferCache>()
protected BufferCache initialValue() {
return new BufferCache();
* Returns the max size allowed for a cached temp buffers, in
* bytes. It defaults to Long.MAX_VALUE. It can be set with the
* jdk.nio.maxCachedBufferSize property. Even though
* ByteBuffer.capacity() returns an int, we're using a long here
* for potential future-proofing.
private static long getMaxCachedBufferSize() {
String s = java.security.AccessController.doPrivileged(
new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdk.nio.maxCachedBufferSize");
if (s != null) {
try {
long m = Long.parseLong(s);
if (m >= 0) {
return m;
} else {
// if it's negative, ignore the system property
} catch (NumberFormatException e) {
// if the string is not well formed, ignore the system property
return Long.MAX_VALUE;
* Returns a temporary buffer of at least the given size
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
return ByteBuffer.allocateDirect(size);
值得注意的是HBase2.x以后使用netty框架实现rpc交互,所以就不存在这个问题了,另外JDK上述修复是 jdk8u102 and jdk9版本及以上才能配置上述参数解决
JDK bug介绍如下:https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
- 点赞
- 收藏
- 关注作者