MRS二次开发(12/27): Kafka接口调用样例
MRS二次开发(12/27): Kafka接口调用样例
一、Kafka简介
Kafka是一个分布式的消息发布-订阅系统。它采用独特的设计提供了类似JMS的特性,主要用于处理活跃的流式数据。Kafka有很多适用的场景:消息队列、行为跟踪、运维数据监控、日志收集、流处理、事件溯源、持久化日志等。
二、样例背景
Kafka是一个分布式消息系统,在此系统上我们可以做一些消息的发布和订阅操作,假定用户要开发一个Producer,让其每秒向Kafka集群某Topic发送一条消息,另外,我们还需要实现一个Consumer,订阅该Topic,实时消费该类消息。
三、Windows环境样例调用步骤
环境准备,参考第一课
比对时间,与集群时间误差不能超过5分钟
检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息
在IDEA打开样例代码的kafka-examples目录,检查SDK配置
默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕
从Manager界面下载用户认证凭据后,解压缩获取秘钥文件user.keytab和krb5.conf,将这两个文件放到样例代码的src\main\resources目录
将客户端解压目录例如/tmp/FusionInsight-Client/FusionInsightCluster1ServicesClientConfig/Kafka/config下consumer.properties,producer.properties,kafkaSecurityMode配置文件放到样例代码的src\main\resources目录下
修改src\main\java\com\huawei\bigdata\kafka\example\Producer.java类和src\main\java\com\huawei\bigdata\kafka\example\Consumer.java中USERKEYTABFILE参数,USER_PRINCIPAL参数。修改src\main\java\com\huawei\bigdata\kafka\example\security\LoginUtil.java类中securityPrepare方法里LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.<系统域名>");设置为用户当前集群的系统域名,用户可登录FusionInsight Manager,单击“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名
右键resources目录,选择“Mark Directory as”-"Resources Root",若已经是,则忽略
在src\main\java\com\huawei\bigdata\kafka\example\Producer.java右键执行Run 启动样例代码
在src\main\java\com\huawei\bigdata\kafka\example\Consumer.java右键执行Run 启动样例代码,待样例启动后,在同步执行步骤10
四、Linux环境调试步骤
前提:Linux环境有安装集群客户端
完成Windows环境样例调用步骤
在windows环境中执行打包
通过如下maven指令构建样例工程依赖jar包到lib目录
- mvn dependency:copy-dependencies -DoutputDirectory=lib --settings D:\tools\apache-maven-3.6.3\conf\settings.xml
检查linux环境时间与集群误差不超过5分钟
检查linux环境的JDK版本为1.8
配置linux环境的/etc/hosts文件
在Linux环境新建目录,例如“/opt/kafkatest”,并创建子目录“lib”和IntelliJ IDEA样例工程中的同级目录“src/main/resources”
将步骤3打的jar包和步骤4lib目录的jar包上传到Linux环境lib目录下,例如“/opt/kafkatest/lib”
将IntelliJ IDEA工程“src/main/resources”目录下的所有文件拷贝到与依赖库文件夹同级的目录“src/main/resources”下,例如“/opt/kafkatest/src/main/resources”
检查“/opt/kafkatest/src/main/resources”目录下和“/opt/kafkatest/lib”文件目录下的所有文件,对当前用户均具有可读权限。
切换到“/opt/kafkatest”,执行以下命令,运行样例程序。
- java -cp .:/opt/kafkatest/lib/*:/opt/kafkatest/src/main/resources com.huawei.bigdata.kafka.example.Producer
五、问题互动渠道
FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html
- 点赞
- 收藏
- 关注作者
评论(0)