MRS 3.X集群Spark on CloudTable使用指导

举报
讲道理不讲感情 发表于 2021/02/03 17:29:43 2021/02/03
5.4k+ 0 1
【摘要】 1. 参考官方文档-使用2.x及之前的开发指南(https://support.huaweicloud.com/devg-mrs/mrs_06_0187.html),开发指南(适用于2.x及之前)->Spark应用开发章节->Spark on HBase,将样例代码的pom文件hbase.version 配置成<hbase.version>1.3.1-mrs-1.9.0</hbase.ver...

1. 参考官方文档-使用2.x及之前的开发指南(https://support.huaweicloud.com/devg-mrs/mrs_06_0187.html),开发指南(适用于2.x及之前)->Spark应用开发章节->Spark on HBase,将样例代码的pom文件hbase.version 配置成<hbase.version>1.3.1-mrs-1.9.0</hbase.version>,样例大妈中对应的CreateTableTableInputDataTableOutputData类换成如下内容。(其中hbase.zookeeper.quorum值根据不同CloudTable集群会不同,具体的获取联系CloudTableSRE)

CreateTable类如下:

package com.huawei.bigdata.spark.examples;

import
java.io.File;
import
java.io.IOException;
import
java.util.Iterator;
import
java.util.List;

import
scala.Tuple2;

import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.hbase.Cell;
import
org.apache.hadoop.hbase.CellUtil;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.client.Result;
import
org.apache.hadoop.hbase.client.Scan;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import
org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import
org.apache.hadoop.hbase.util.Base64;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaPairRDD;
import
org.apache.spark.api.java.JavaSparkContext;

import
com.huawei.hadoop.security.LoginUtil;

/**
 * Get data from table.
 */
public class TableOutputData {
 
public static void main(String[] args) throws IOException {
    Configuration hadoopConf =
new Configuration();
    if
("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){
     
//security mode

     
final String userPrincipal = "sparkuser";
      final
String USER_KEYTAB_FILE = "user.keytab";
     
String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
     
String krbFile = filePath + "krb5.conf";
     
String userKeyTableFile = filePath + USER_KEYTAB_FILE;

     
String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";
     
String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
     
String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

     
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);
     
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);
     
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;
   
}

    System.setProperty(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer");
   
System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

   
// Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.
   
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
   
JavaSparkContext jsc = new JavaSparkContext(conf);
   
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

   
String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";
   
String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";
   
String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";
   
hbConf.set("hbase.zookeeper.quorum",ip);
   
hbConf.set("hbase.zookeeper.quorum",ip1);
   
hbConf.set("hbase.zookeeper.quorum",ip2);
   
// Declare the information of the table to be queried.
   
Scan scan = new org.apache.hadoop.hbase.client.Scan();
   
scan.addFamily(Bytes.toBytes("info"));
   
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
   
String scanToString = Base64.encodeBytes(proto.toByteArray());
   
hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
   
hbConf.set(TableInputFormat.SCAN, scanToString);

   
// Obtain the data in the table through the Spark interface.
   
JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

   
// Traverse every row in the HBase table and print the results.
   
List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();
    for
(int i = 0; i < rddList.size(); i++) {
      Tuple2<ImmutableBytesWritable
, Result> t2 = rddList.get(i);
     
ImmutableBytesWritable key = t2._1();
     
Iterator<Cell> it = t2._2().listCells().iterator();
      while
(it.hasNext()) {
        Cell c = it.next()
;
       
String family = Bytes.toString(CellUtil.cloneFamily(c));
       
String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
       
String value = Bytes.toString(CellUtil.cloneValue(c));
       
Long tm = c.getTimestamp();
       
System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
     
}
    }
    jsc.stop()
;
 
}
}

 

TableInputData类如下:

package com.huawei.bigdata.spark.examples;
import java.io.File;

import java.io.IOException;

import java.util.List;
import scala.Tuple4;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;
import com.huawei.hadoop.security.LoginUtil;
/**

 * Input data to hbase table.

 */

public class TableInputData {

  public static void main(String[] args) throws IOException {

    Configuration hadoopConf = new Configuration();

    if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

      //security mode
      final String userPrincipal = "sparkuser";

      final String USER_KEYTAB_FILE = "user.keytab";

      String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

      String krbFile = filePath + "krb5.conf";

      String userKeyTableFile = filePath + USER_KEYTAB_FILE;
      String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

      String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

      String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

      LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

    }
    // Create the configuration parameter to connect the HBase.

    SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

    JavaSparkContext jsc = new JavaSparkContext(conf);

    Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
    String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

    String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

    String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

    hbConf.set("hbase.zookeeper.quorum",ip);

    hbConf.set("hbase.zookeeper.quorum",ip1);

    hbConf.set("hbase.zookeeper.quorum",ip2);

    // Declare the information of the table.

    Table table = null;

    String tableName = "shb1";

    byte[] familyName = Bytes.toBytes("info");

    Connection connection = null;
    try {

      // Connect to the HBase.

      connection = ConnectionFactory.createConnection(hbConf);

      // Obtain the table object.

      table = connection.getTable(TableName.valueOf(tableName));

      List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map(

          new Function<String, Tuple4<String, String, String, String>>() {

            public Tuple4<String, String, String, String> call(String s) throws Exception {

              String[] tokens = s.split(",");
              return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]);

            }

          }).collect();
      Integer i = 0;

      for (Tuple4<String, String, String, String> line : data) {

        Put put = new Put(Bytes.toBytes("row" + i));

        put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));

        put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));

        put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));

        put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));

        i += 1;

        table.put(put);

      }
    } catch (IOException e) {

      e.printStackTrace();

    } finally {

      if (table != null) {

        try {

          // Close the HTable.

          table.close();

        } catch (IOException e) {

          e.printStackTrace();

        }

      }

      if (connection != null) {

        try {

          // Close the HBase connection.

          connection.close();

        } catch (IOException e) {

          e.printStackTrace();

        }

      }
      jsc.stop();

    }

  }

}

 

 

TableOutputData类如下:

package com.huawei.bigdata.spark.examples;
import java.io.File;

import java.io.IOException;

import java.util.Iterator;

import java.util.List;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

import org.apache.hadoop.hbase.util.Base64;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;
import com.huawei.hadoop.security.LoginUtil;
/**

 * Get data from table.

 */

public class TableOutputData {

  public static void main(String[] args) throws IOException {

    Configuration hadoopConf = new Configuration();

    if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

      //security mode
      final String userPrincipal = "sparkuser";

      final String USER_KEYTAB_FILE = "user.keytab";

      String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

      String krbFile = filePath + "krb5.conf";

      String userKeyTableFile = filePath + USER_KEYTAB_FILE;
      String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

      String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

      String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

      LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

    }
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
    // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.

    SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

    JavaSparkContext jsc = new JavaSparkContext(conf);

    Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
    String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

    String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

    String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

    hbConf.set("hbase.zookeeper.quorum",ip);

    hbConf.set("hbase.zookeeper.quorum",ip1);

    hbConf.set("hbase.zookeeper.quorum",ip2);

    // Declare the information of the table to be queried.

    Scan scan = new org.apache.hadoop.hbase.client.Scan();

    scan.addFamily(Bytes.toBytes("info"));

    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

    String scanToString = Base64.encodeBytes(proto.toByteArray());

    hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");

    hbConf.set(TableInputFormat.SCAN, scanToString);
    // Obtain the data in the table through the Spark interface.

    JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
    // Traverse every row in the HBase table and print the results.

    List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();

    for (int i = 0; i < rddList.size(); i++) {

      Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i);

      ImmutableBytesWritable key = t2._1();

      Iterator<Cell> it = t2._2().listCells().iterator();

      while (it.hasNext()) {

        Cell c = it.next();

        String family = Bytes.toString(CellUtil.cloneFamily(c));

        String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));

        String value = Bytes.toString(CellUtil.cloneValue(c));

        Long tm = c.getTimestamp();

        System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);

      }

    }

    jsc.stop();

  }

}

 

2. 集群的各个节点的/etc/hosts文件添加如下内容(据不同CloudTable集群会不同,具体的获取联系CloudTable服务的SRE):


3. 找一个1.9.0集群将/opt/client/Spark/spark/jars/下面的这些1.3.1版本的hbase的包

 hbase-client-1.3.1-mrs-1.9.0.jar, hbase-common-1.3.1-mrs-1.9.0.jar, hbase-hadoop2-compat-1.3.1-mrs-1.9.0.jar, hbase-protocol-1.3.1-mrs-1.9.0.jar,  hbase-server-1.3.1-mrs-1.9.0.jar, htrace-core-3.1.0-incubating.jar

都换到3.0.1集群的/opt/client/Spark2x/spark/jars/下面,将/opt/client/Spark2x/spark/jars/下面2.3.2版本的hbase包都移掉。

按照如下步骤执行官方样例代码即可完成:



 

 

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。