HBase分库场景下的Spark统一访问示例

举报
Sailing27 发表于 2022/05/24 12:03:04 2022/05/24
【摘要】 本文介绍了在一个MRS集群内,通过多HBase实例进行数据分库,同时在Spark层通过一个APP将多个实例中的数据Scan为一个RDD进行业务处理的样例。

分库场景介绍

HBase作为一个分布式的存储引擎,其本身具备一定的扩展性,单集群规模可以做到数百几点,但在一些极端海量的场景,如搜索引擎中用于存储爬虫网页爬取结果场景,单HBase实例无法支撑,但如果将数据拆到多个集群中,但在管理和应用访问上,又会带来额外的复杂度。

本文展示了结合MRS单服务多实例的方式,在一个MRS集群内通过多HBase实例对数据进行分库,同时在Spark中单APP将多个HBase的数据Scan为一个RDD。

数据准备:

HBase1、HBase2实例建相同的表“TestTBL”:

create 'TestTBL',  {NAME=>'cf1'}

根据分库规则,HBase1中写入以“A"为RowKey开头的数据,HBase2中写入以”B“为RowKey开头的数据。

HBase1:

hbase:002:0> put'TestTBL', 'A0001','cf1:name','A-NAME001'
hbase:003:0> put'TestTBL', 'A0002','cf1:name','A-NAME002'
hbase:004:0> put'TestTBL', 'A0003','cf1:name','A-NAME003'
hbase:005:0> put'TestTBL', 'A0004','cf1:name','A-NAME004'

HBase2:

hbase:006:0> put'TestTBL', 'B0001','cf1:name','B-NAME001'
hbase:007:0> put'TestTBL', 'B0002','cf1:name','B-NAME002'
hbase:008:0> put'TestTBL', 'B0003','cf1:name','B-NAME003'
hbase:009:0> put'TestTBL', 'B0004','cf1:name','B-NAME004'

为了能同时访问多个HBase,需要将多个HBase的配置文件以及Keytab文件传给Spark应用,配置文件打包到一个压缩文件中,压缩文件内容如下:

# tar -tvf conf.tar.gz
drwxr-xr-x root/root         0 2022-05-23 17:22 A/
-rwxr-xr-x root/root     17100 2022-05-23 17:22 A/hbase-site.xml
-rwxr-xr-x root/root      4464 2022-05-23 17:22 A/hdfs-site.xml
-rwxr-xr-x root/root     13067 2022-05-23 17:22 A/core-site.xml
drwxr-xr-x root/root         0 2022-05-23 17:23 B/
-rwxr-xr-x root/root     16299 2022-05-23 17:23 B/hbase-site.xml
-rwxr-xr-x root/root      4328 2022-05-23 17:23 B/hdfs-site.xml
-rwxr-xr-x root/root     12609 2022-05-23 17:23 B/core-site.xml
-rw-r--r-- root/root      1288 2022-05-23 20:51 krb5.conf
-rw-r--r-- root/root       134 2022-05-23 20:51 user.keytab

应用访问

以上述准备的数据为例,在一个Spark应用中,同时访问两个HBase的数据,查询结果生成一个RDD。

object MultiHBaseScanExample {
  private val ROWKEY_PREFIXS = List("A", "B")

  private val TBL_NAME = "TestTBL"

  private val ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client"

  private val ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop.hadoop.com"

  private val ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"

  private val CONF_PACKAGE_FILE = "conf.tar.gz"

  private val USER_PRINCIPAL = "admintest"

  private val USER_KEYTAB_PATH = "./user.keytab"

  private val KRB5_CONF_PATH = "./krb5.conf"


  /**
   * get Hadoop Configuration from sub folder.
   *
   * @param subConfDir
   * @return
   */
  def getConf(subConfDir: String): Configuration = {
    val conf = HBaseConfiguration.create
    val userdir = System.getProperty("user.dir") + File.separator + CONF_PACKAGE_FILE + File.separator + subConfDir + File.separator
    conf.addResource(new File(userdir + "hbase-site.xml").toURI.toURL)
    conf.addResource(new File(userdir + "hdfs-site.xml").toURI.toURL)
    conf.addResource(new File(userdir + "core-site.xml").toURI.toURL)
    conf
  }

  /**
   * Login
   *
   * @param conf Hadoop Configuration
   * @return
   */
  def login(conf: Configuration, subPath: String): UserGroupInformation = {
    val userdir = System.getProperty("user.dir") + File.separator + subPath
    val userKeytabFile = userdir + USER_KEYTAB_PATH
    val krb5File = userdir + KRB5_CONF_PATH

    try {
      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, USER_PRINCIPAL, userKeytabFile)
      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL)
      val ugi = LoginUtil.login(USER_PRINCIPAL, userKeytabFile, krb5File, conf)
      return ugi
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
    null
  }


  /**
   * Main
   *
   * @param args parramters
   */
  def main(args: Array[String]): Unit = {
    println("--------------MultiHBaseScanner---------------")
    val conf = new SparkConf().setAppName("MultiHBaseScanner")
    conf.set("spark.yarn.security.credentials.hbase.enabled", "true")

    val hadoopConf = new Configuration
    login(hadoopConf, "")
    println("Login success!")
    val sc: SparkContext = new SparkContext(conf)

    sc.parallelize(ROWKEY_PREFIXS).map(prefix => {
      println(s"Task for scan $prefix")
      val scan = new Scan

      // Set Scan range, read each instance data: [0002~0004)
      scan.addFamily(Bytes.toBytes("cf1"))
      val startBytes = Bytes.toBytes(String.valueOf(prefix + "0002"))
      val stopBytes = Bytes.toBytes(String.valueOf(prefix + "0004"))
      scan.withStartRow(startBytes).withStopRow(stopBytes)

      var table: Table = null
      var rScanner: ResultScanner = null
      var result: Iterator[String] = null
      var connection: Connection = null

      try {
        val ugi: UserGroupInformation = login(getConf(prefix), CONF_PACKAGE_FILE + File.separator)
        ugi.doAs(new PrivilegedExceptionAction[Unit]() {
          override def run(): Unit = {
            val configuration = getConf(prefix)
            connection = ConnectionFactory.createConnection(configuration)
            table = connection.getTable(TableName.valueOf(TBL_NAME))
            rScanner = table.getScanner(scan)
            result = rScanner.map(r => {
              val key = Bytes.toString(r.getRow)
              val name = Bytes.toString(r.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")))
              s"Data from $prefix: $key=>$name"
            }).iterator
          }
        })
      }
      finally {
        if (rScanner != null) {
          // Close the scanner object.
          rScanner.close()
        }
        if (table != null)
          try {
            // Close the HTable object.
            table.close()
          }
          catch {
            case e: IOException =>
              println("Close table failed ", e)
          }
        if (null != connection) {
          connection.close()
        }
      }
      result
    }).flatMap(results => {
      results
    }).collect().foreach(result => {
      // print data
      println(s"--Row--> : $result")
    })
    println("---------------------Done---------------------")
    sc.stop()
  }
}

注意,上面一个HBase实例只有一个Task进行访问,如果查询数据量比较大,需要根据RowKey范围再进行一次FlatMap。

提交应用,注意需要将多个HBase的配置文件压缩包传入:

spark-submit --master yarn --archives conf.tar.gz  --keytab user.keytab --principal admintest@HADOOP.COM --class com.huawei.bigdata.spark.examples.hbasecontext.MultiHBaseScanExample SparkOnHbaseExample-1.0.jar

运行结果输出,根据代码查询条件,从两个Base实例分别查询了两条数据:

--Row--> : Data from A: A0002=>B-NAME002
--Row--> : Data from A: A0003=>B-NAME003
--Row--> : Data from B: B0002=>B-NAME002
--Row--> : Data from B: B0003=>B-NAME003

总结

此示例展示了在海量数据场景下,数据规模突破单HBase实例规模的情况下,通过HBase分库的方式,来管理海量的数据。结合华为云MRS的单服务多实例,可以在一个MRS集群部署多个HBase实例的方式,再通过上述示例,实现对应用层的透明,达到数据分库、管控统一、访问统一的效果。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。