HBase分库场景下的Spark统一访问示例
分库场景介绍
HBase作为一个分布式的存储引擎,其本身具备一定的扩展性,单集群规模可以做到数百几点,但在一些极端海量的场景,如搜索引擎中用于存储爬虫网页爬取结果场景,单HBase实例无法支撑,但如果将数据拆到多个集群中,但在管理和应用访问上,又会带来额外的复杂度。
本文展示了结合MRS单服务多实例的方式,在一个MRS集群内通过多HBase实例对数据进行分库,同时在Spark中单APP将多个HBase的数据Scan为一个RDD。
数据准备:
HBase1、HBase2实例建相同的表“TestTBL”:
create 'TestTBL', {NAME=>'cf1'}
根据分库规则,HBase1中写入以“A"为RowKey开头的数据,HBase2中写入以”B“为RowKey开头的数据。
HBase1:
hbase:002:0> put'TestTBL', 'A0001','cf1:name','A-NAME001'
hbase:003:0> put'TestTBL', 'A0002','cf1:name','A-NAME002'
hbase:004:0> put'TestTBL', 'A0003','cf1:name','A-NAME003'
hbase:005:0> put'TestTBL', 'A0004','cf1:name','A-NAME004'
HBase2:
hbase:006:0> put'TestTBL', 'B0001','cf1:name','B-NAME001'
hbase:007:0> put'TestTBL', 'B0002','cf1:name','B-NAME002'
hbase:008:0> put'TestTBL', 'B0003','cf1:name','B-NAME003'
hbase:009:0> put'TestTBL', 'B0004','cf1:name','B-NAME004'
为了能同时访问多个HBase,需要将多个HBase的配置文件以及Keytab文件传给Spark应用,配置文件打包到一个压缩文件中,压缩文件内容如下:
# tar -tvf conf.tar.gz
drwxr-xr-x root/root 0 2022-05-23 17:22 A/
-rwxr-xr-x root/root 17100 2022-05-23 17:22 A/hbase-site.xml
-rwxr-xr-x root/root 4464 2022-05-23 17:22 A/hdfs-site.xml
-rwxr-xr-x root/root 13067 2022-05-23 17:22 A/core-site.xml
drwxr-xr-x root/root 0 2022-05-23 17:23 B/
-rwxr-xr-x root/root 16299 2022-05-23 17:23 B/hbase-site.xml
-rwxr-xr-x root/root 4328 2022-05-23 17:23 B/hdfs-site.xml
-rwxr-xr-x root/root 12609 2022-05-23 17:23 B/core-site.xml
-rw-r--r-- root/root 1288 2022-05-23 20:51 krb5.conf
-rw-r--r-- root/root 134 2022-05-23 20:51 user.keytab
应用访问
以上述准备的数据为例,在一个Spark应用中,同时访问两个HBase的数据,查询结果生成一个RDD。
object MultiHBaseScanExample {
private val ROWKEY_PREFIXS = List("A", "B")
private val TBL_NAME = "TestTBL"
private val ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client"
private val ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop.hadoop.com"
private val ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"
private val CONF_PACKAGE_FILE = "conf.tar.gz"
private val USER_PRINCIPAL = "admintest"
private val USER_KEYTAB_PATH = "./user.keytab"
private val KRB5_CONF_PATH = "./krb5.conf"
/**
* get Hadoop Configuration from sub folder.
*
* @param subConfDir
* @return
*/
def getConf(subConfDir: String): Configuration = {
val conf = HBaseConfiguration.create
val userdir = System.getProperty("user.dir") + File.separator + CONF_PACKAGE_FILE + File.separator + subConfDir + File.separator
conf.addResource(new File(userdir + "hbase-site.xml").toURI.toURL)
conf.addResource(new File(userdir + "hdfs-site.xml").toURI.toURL)
conf.addResource(new File(userdir + "core-site.xml").toURI.toURL)
conf
}
/**
* Login
*
* @param conf Hadoop Configuration
* @return
*/
def login(conf: Configuration, subPath: String): UserGroupInformation = {
val userdir = System.getProperty("user.dir") + File.separator + subPath
val userKeytabFile = userdir + USER_KEYTAB_PATH
val krb5File = userdir + KRB5_CONF_PATH
try {
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, USER_PRINCIPAL, userKeytabFile)
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL)
val ugi = LoginUtil.login(USER_PRINCIPAL, userKeytabFile, krb5File, conf)
return ugi
} catch {
case e: IOException =>
e.printStackTrace()
}
null
}
/**
* Main
*
* @param args parramters
*/
def main(args: Array[String]): Unit = {
println("--------------MultiHBaseScanner---------------")
val conf = new SparkConf().setAppName("MultiHBaseScanner")
conf.set("spark.yarn.security.credentials.hbase.enabled", "true")
val hadoopConf = new Configuration
login(hadoopConf, "")
println("Login success!")
val sc: SparkContext = new SparkContext(conf)
sc.parallelize(ROWKEY_PREFIXS).map(prefix => {
println(s"Task for scan $prefix")
val scan = new Scan
// Set Scan range, read each instance data: [0002~0004)
scan.addFamily(Bytes.toBytes("cf1"))
val startBytes = Bytes.toBytes(String.valueOf(prefix + "0002"))
val stopBytes = Bytes.toBytes(String.valueOf(prefix + "0004"))
scan.withStartRow(startBytes).withStopRow(stopBytes)
var table: Table = null
var rScanner: ResultScanner = null
var result: Iterator[String] = null
var connection: Connection = null
try {
val ugi: UserGroupInformation = login(getConf(prefix), CONF_PACKAGE_FILE + File.separator)
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val configuration = getConf(prefix)
connection = ConnectionFactory.createConnection(configuration)
table = connection.getTable(TableName.valueOf(TBL_NAME))
rScanner = table.getScanner(scan)
result = rScanner.map(r => {
val key = Bytes.toString(r.getRow)
val name = Bytes.toString(r.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")))
s"Data from $prefix: $key=>$name"
}).iterator
}
})
}
finally {
if (rScanner != null) {
// Close the scanner object.
rScanner.close()
}
if (table != null)
try {
// Close the HTable object.
table.close()
}
catch {
case e: IOException =>
println("Close table failed ", e)
}
if (null != connection) {
connection.close()
}
}
result
}).flatMap(results => {
results
}).collect().foreach(result => {
// print data
println(s"--Row--> : $result")
})
println("---------------------Done---------------------")
sc.stop()
}
}
注意,上面一个HBase实例只有一个Task进行访问,如果查询数据量比较大,需要根据RowKey范围再进行一次FlatMap。
提交应用,注意需要将多个HBase的配置文件压缩包传入:
spark-submit --master yarn --archives conf.tar.gz --keytab user.keytab --principal admintest@HADOOP.COM --class com.huawei.bigdata.spark.examples.hbasecontext.MultiHBaseScanExample SparkOnHbaseExample-1.0.jar
运行结果输出,根据代码查询条件,从两个Base实例分别查询了两条数据:
--Row--> : Data from A: A0002=>B-NAME002
--Row--> : Data from A: A0003=>B-NAME003
--Row--> : Data from B: B0002=>B-NAME002
--Row--> : Data from B: B0003=>B-NAME003
总结
此示例展示了在海量数据场景下,数据规模突破单HBase实例规模的情况下,通过HBase分库的方式,来管理海量的数据。结合华为云MRS的单服务多实例,可以在一个MRS集群部署多个HBase实例的方式,再通过上述示例,实现对应用层的透明,达到数据分库、管控统一、访问统一的效果。
- 点赞
- 收藏
- 关注作者
评论(0)