MRS:SparkStreaming对接kafka写入hbase样例

举报
剑指南天 发表于 2020/08/02 15:07:16 2020/08/02
【摘要】 MRS spark官网样例的补充,实现Kerberos认证集群SparkStreaming对接kafka写入hbase

MRS:SparkStreaming对接kafka写入hbase样例

关键词:官网样例 Kerberos认证 kafka SparkStreaming hbase

摘要:MRS spark官网样例的补充,实现Kerberos认证集群SparkStreaming对接安全kafka写入hbase

 

前期准备:

1.      创建MRS 1.9.0 混合集群,大数据组件至少包括Hadoop , Spark , Hive , HBase , Kafka,开启Kerberos认证

2.      集群创建好之后,参照官网https://support.huaweicloud.com/devg-mrs/mrs_06_0154.html准备开发用户,作者创建的用户名为sparkuser ,然后下载keytabkrb5.conf文件待用

3.      样例下载地址https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9

 

开发程序:

1.      huaweicloud-mrs-example/src/spark-examples/SparkStreamingKafka010JavaExample样例导入idea

                 

2.      删除掉com.huawei.bigdata.spark.examples包下面所有类, SparkOnStreamingToHbase.javaStreamingExampleProducer.java文件(见附件)复制到com.huawei.bigdata.spark.examples包下

3.      pom.xml文件替换掉原工程的pom文件

 

场景说明:

1.      假定某个业务Kafka每3秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。现table1表有10条记录,表示有用户名分别为1-10的用户,他们的历史消费金额初始化都是0元。基于某些业务要求,开发的Spark应用程序实现如下功能:实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。

2.    创建HBase表,并插入数据

a.通过HBase创建名为table1的表,命令如下:

create 'table1', 'cf'

b.通过HBase执行如下命令,将数据插入table1表中

                 put 'table1', '1', 'cf:cid', '0'

                 put 'table1', '2', 'cf:cid', '0'

                 put 'table1', '3', 'cf:cid', '0'

                 put 'table1', '4', 'cf:cid', '0'

                 put 'table1', '5', 'cf:cid', '0'

                 put 'table1', '6', 'cf:cid', '0'

                 put 'table1', '7', 'cf:cid', '0'

                 put 'table1', '8', 'cf:cid', '0'

                 put 'table1', '9', 'cf:cid', '0'

                 put 'table1', '10', 'cf:cid', '0'

c.     通过HBase执行scan 'table1'命令,

                 

3.      集群组件的配置

a.      kafkaBroker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”

b.      如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true

c.       需要修改程序SparkOnStreamingToHbase类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。

d.      用户需要对接安全Kafka,创建jaas.conf文件待用,文件内容内容如下. 注意:Spark on YARN模式下,jaas.confuser.keytab通过YARN分发到Spark on YARNcontainer目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名。

Client {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab="./user.keytab"

principal="sparkuser"

useTicketCache=false

storeKey=true

debug=true;

};

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab = "./user.keytab"

principal="sparkuser"

useTicketCache=false

storeKey=true

debug=true;

};

 

调测程序:

1.      在后台master节点创建/root/jars/root/jars/conf文件夹,然后将程序打包上传至/root/jars,再将jaas.conf,keytab,krb5.conf文件上传至/root/jars/conf.idea中将程序打包,上传至/root/jars目录下

2.      创建Topic, 并且启动KafkaProducer,向Kafka发送数据,{zkQuorum}表示ZooKeeper集群信息,格式为IP:port, JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092, {Topic}kafkatopic名称,作者为apple

 

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

作者命令:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.0.122:2181/kafka --replication-factor 2 --partitions 3 --topic apple

 

java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

作者命令:

java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:/root/jars/SparkStreamingKafka010JavaExample-1.0.jar com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.106:9092 apple

3.      在运行样例主程序时需要指定<checkpointDir> <brokers> <topic> <batchTime>,其中<checkPointDir>指应用程序结果备份到HDFS的路径,<brokers>指获取元数据的Kafka地址,安全集群格式为brokerIp:21007,<topic>指读取Kafka上的topic名称,<batchTime>Streaming分批的处理间隔.

切换目录到/root/jars下面

a.     yarn-client模式下的运行命令:

spark-submit --master yarn --deploy-mode client --files ./conf/jaas.conf,./conf/user.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.10-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

作者命令:

spark-submit --master yarn --deploy-mode client --files ./conf/jaas.conf,./conf/user.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.11-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.2.2-mrs-1.9.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.106:21007 apple 6

b.     yarn-cluster模式下,首先需要修改SparkOnStreamingToHbase.java文件中

将代码”String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;””修改为”String filePath = System.getProperty("user.dir") + File.separator ;”,运行命令如下:

spark-submit --master yarn --deploy-mode cluster --files ./conf/jaas.conf,./conf/user.keytab,./conf/krb5.conf --conf "spark.yarn.cluster.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.10-0.10.0.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.1.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

 

作者命令:

spark-submit --master yarn --deploy-mode cluster --files ./conf/jaas.conf,./conf/user.keytab,./conf/krb5.conf --conf "spark.yarn.cluster.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --jars $SPARK_HOME/jars/streamingClient010/kafka-clients-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/kafka_2.11-1.1.0-mrs-1.9.0.jar,$SPARK_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.2.2-mrs-1.9.0.jar --class com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase /root/jars/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.106:21007 apple 6

4.     hbase下面查看表scan ”table1”,验证成功

            



注: 本人首次发表在论坛https://bbs.huaweicloud.com/forum/thread-58924-1-1.html

 


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。