MRS:SparkStreaming对接kafka写入hbase样例
MRS:SparkStreaming对接kafka写入hbase样例
关键词:官网样例 Kerberos认证 kafka SparkStreaming hbase
摘要:MRS spark官网样例的补充,实现Kerberos认证集群SparkStreaming对接安全kafka写入hbase
前期准备:
1. 创建MRS 1.9.0 混合集群,大数据组件至少包括Hadoop , Spark , Hive , HBase , Kafka,开启Kerberos认证
2. 集群创建好之后,参照官网https://support.huaweicloud.com/devg-mrs/mrs_06_0154.html准备开发用户,作者创建的用户名为sparkuser ,然后下载keytab与krb5.conf文件待用
3. 样例下载地址https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9
开发程序:
1. 将huaweicloud-mrs-example/src/spark-examples/SparkStreamingKafka010JavaExample样例导入idea
2. 删除掉com.huawei.bigdata.spark.examples包下面所有类,将 SparkOnStreamingToHbase.java和StreamingExampleProducer.java文件(见附件)复制到com.huawei.bigdata.spark.examples包下
3. 将pom.xml文件替换掉原工程的pom文件
场景说明:
1. 假定某个业务Kafka每3秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。现table1表有10条记录,表示有用户名分别为1-10的用户,他们的历史消费金额初始化都是0元。基于某些业务要求,开发的Spark应用程序实现如下功能:实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。
2. 创建HBase表,并插入数据
a.通过HBase创建名为table1的表,命令如下:
create 'table1', 'cf'
b.通过HBase执行如下命令,将数据插入table1表中
put 'table1', '1', 'cf:cid', '0'
put 'table1', '2', 'cf:cid', '0'
put 'table1', '3', 'cf:cid', '0'
put 'table1', '4', 'cf:cid', '0'
put 'table1', '5', 'cf:cid', '0'
put 'table1', '6', 'cf:cid', '0'
put 'table1', '7', 'cf:cid', '0'
put 'table1', '8', 'cf:cid', '0'
put 'table1', '9', 'cf:cid', '0'
put 'table1', '10', 'cf:cid', '0'
c. 通过HBase执行scan 'table1'命令,
3. 集群组件的配置
a. 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”
b. 如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true。
c. 需要修改程序SparkOnStreamingToHbase类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。
d. 用户需要对接安全Kafka,创建jaas.conf文件待用,文件内容内容如下. 注意:在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="./user.keytab"
principal="sparkuser"
useTicketCache=false
storeKey=true
debug=true;
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab = "./user.keytab"
principal="sparkuser"
useTicketCache=false
storeKey=true
debug=true;
};
调测程序:
1. 在后台master节点创建/root/jars和/root/jars/conf文件夹,然后将程序打包上传至/root/jars下,再将jaas.conf,keytab,krb5.conf文件上传至/root/jars/conf下.在idea中将程序打包,上传至/root/jars目录下
2. 创建Topic, 并且启动Kafka的Producer,向Kafka发送数据,{zkQuorum}表示ZooKeeper集群信息,格式为IP:port, JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092, {Topic}为kafka的topic名称,作者为apple。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}
作者命令:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.0.122:2181/kafka --replication-factor 2 --partitions 3 --topic apple
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}
作者命令:
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:/root/jars/SparkStreamingKafka010JavaExample-1.0.jar com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.106:9092 apple
3. 在运行样例主程序时需要指定<checkpointDir> <brokers> <topic> <batchTime>,其中<checkPointDir>指应用程序结果备份到HDFS的路径,<brokers>指获取元数据的Kafka地址,安全集群格式为brokerIp:21007,<topic>指读取Kafka上的topic名称,<batchTime>指Streaming分批的处理间隔.
切换目录到/root/jars下面
a. yarn-client模式下的运行命令:
spark-submit --master yarn --deploy-mode client --files ./conf/jaas.conf,./conf/user.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.10-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
作者命令:
spark-submit --master yarn --deploy-mode client --files ./conf/jaas.conf,./conf/user.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.11-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.2.2-mrs-1.9.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.106:21007 apple 6
b. yarn-cluster模式下,首先需要修改SparkOnStreamingToHbase.java文件中
将代码”String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;””修改为”String filePath = System.getProperty("user.dir") + File.separator ;”,运行命令如下:
spark-submit --master yarn --deploy-mode cluster --files ./conf/jaas.conf,./conf/user.keytab,./conf/krb5.conf --conf "spark.yarn.cluster.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.10-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
作者命令:
spark-submit --master yarn --deploy-mode cluster --files ./conf/jaas.conf,./conf/user.keytab,./conf/krb5.conf --conf "spark.yarn.cluster.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.11-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.2.2-mrs-1.9.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.106:21007 apple 6
4. 在hbase下面查看表scan ”table1”,验证成功
注: 本人首次发表在论坛https://bbs.huaweicloud.com/forum/thread-58924-1-1.html
- 点赞
- 收藏
- 关注作者
评论(0)