HBase 读写过程中的线程池模型原理实现
【简介】
随着大数据的发展,HBase已经成为大数据高性能读写场景不可或缺的组件,HBase也也提供了丰富的API,应用开发也相对简单,如何在开发时准确使用API就显得比较重要了,如果使用不准确,往往会与设计者背到而驰,并且造成应用访问抖动、资源泄露等问题,等等……
HBase socket连接池原理实现
首先我们来看下HBase的API文档当中对Connection的定义是:A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.,这句话的理解就是和regionserver(包括HMaster)以及zookeeper直间的连接,总结起来就是如下图:
所以对一个connection实列来讲,如果保持单例模式,和regionserver之间的socket连接最大就是五个,并且每次socket连接就不会再次创建,除非这个链接长时间(默认两分钟)和regionserver之间不进行交互,那么client端会自动关闭这个链接,应用开发时一般使用如下方法创建connection
connection = ConnectionFactory.createConnection(conf);
除了此种方式创建连接外,hbase还可通过ConnectionManager获取连接,不过此种方式就要注意,一定要保持conf对象单例,否则也就是不断创建新的连接
conn = HConnectionManager.getConnection(conf);
static ClusterConnection getConnectionInternal(final Configuration conf)
throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (CONNECTION_INSTANCES) {
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = (HConnectionImplementation)createConnection(conf, true);
CONNECTION_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {
ConnectionManager.deleteConnection(connectionKey, true);
connection = (HConnectionImplementation)createConnection(conf, true);
CONNECTION_INSTANCES.put(connectionKey, connection);
}
connection.incCount();
return connection;
}
}
HConnectionKey(Configuration conf) {
Map<String, String> m = new HashMap<String, String>();
if (conf != null) {
for (String property : CONNECTION_PROPERTIES) {
String value = conf.get(property);
if (value != null) {
m.put(property, value);
}
}
}
this.properties = Collections.unmodifiableMap(m);
try {
UserProvider provider = UserProvider.instantiate(conf);
User currentUser = provider.getCurrent();
if (currentUser != null) {
username = currentUser.getName();
}
} catch (IOException ioe) {
ConnectionManager.LOG.warn(
"Error obtaining current user, skipping username in HConnectionKey", ioe);
}
}
此处这两段代码逻辑是根据传入的conf构建HConnectionKey,然后以HConnectionKey实例为key到连接池Map对象CONNECTION_INSTANCES中去查找connection,如果找到就返回connection,如果找不到就新建,如果找到但已被关闭,就删除再新建;接收conf构造HConnectionKey实例时,其实是将conf配置文件中的属性赋值给HConnectionKey自身的属性,所以使用此种方式时,要保证conf对象不变,new出来的HConnectionKey实例的属性才相同。
HBase 表操作线程接池原理实现
应用端读写hbase时,目前实现都是通过将每次读写操作封装成为一个个task,然后提交到应用端的表操作池中去执行,默认情况下,如果保证connection单例,那么应用将有256个线程、对应每个regionserver5个(由参数hbase.client.ipc.pool.size决定)socket链接,有的人可能会认为,这样会不会造成性能瓶颈,这里256线程是针对所有操作,也就是针对的是整个集群,而5个socket链接是针对单个regionserver,并且和regionserver的socket交互耗时是非常短的,所以一般情况下这个默认配置已经完全够用
相关代码实现原理如下:
public HTableInterface getTable(TableName tableName) throws IOException {
return getTable(tableName, getBatchPool());
}
private ExecutorService getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
this.cleanupPool = true;
}
}
}
return this.batchPool;
}
这里要注意,对于meta表操作来讲 ,默认是10个核心线程,并且线程名也不一样
private ExecutorService getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
//Some of the threads would be used for meta replicas
//To start with, threads.max.core threads can hit the meta (including replicas).
//After that, requests will get queued up in the passed queue, and only after
//the queue is full, a new thread will be started
this.metaLookupPool = getThreadPool(
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
"-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
}
}
}
return this.metaLookupPool;
}
HBase 线程池引发的直接内存泄露
JDK对每个线程进行NIO交互时,都会持有一个缓存对象(属于堆外),此对象不限制大小空间,并且随着线程消亡才会释放,线程每次申请直接内存空间时,都是在已有的对象上新增加空间,意味着,此空间可能会无限增大,而hbase的表操作线程池,线程不会消亡,所以此直接内存会不断增大,直到触发了FULL GC,才有可能释放,否则应用就会报直接内存溢出错误。相关源码分析如下:
// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache =
new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
/**
* Returns the max size allowed for a cached temp buffers, in
* bytes. It defaults to Long.MAX_VALUE. It can be set with the
* jdk.nio.maxCachedBufferSize property. Even though
* ByteBuffer.capacity() returns an int, we're using a long here
* for potential future-proofing.
*/
private static long getMaxCachedBufferSize() {
String s = java.security.AccessController.doPrivileged(
new PrivilegedAction<String>() {
@Override
public String run() {
return System.getProperty("jdk.nio.maxCachedBufferSize");
}
});
if (s != null) {
try {
long m = Long.parseLong(s);
if (m >= 0) {
return m;
} else {
// if it's negative, ignore the system property
}
} catch (NumberFormatException e) {
// if the string is not well formed, ignore the system property
}
}
return Long.MAX_VALUE;
}
/**
* Returns a temporary buffer of at least the given size
*/
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
HBase开源社区针对此问题的解决方式是设置-Djdk.nio.maxCachedBufferSize,保证线程级别的buffercache不会持续增大,开源单号如下:https://issues.apache.org/jira/browse/HBASE-19320
值得注意的是HBase2.x以后使用netty框架实现rpc交互,所以就不存在这个问题了,另外JDK上述修复是 jdk8u102 and jdk9版本及以上才能配置上述参数解决
JDK bug介绍如下:https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
- 点赞
- 收藏
- 关注作者
评论(0)