MRS二次开发(17/27): Flink实现配置表与实时流join的样例
MRS二次开发(17/27): Flink实现配置表与实时流join的样例
一、Flink简介
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。
二、Flink客户端准备,可参考MRS二次开发(15/27)Flink构造DataStream样例,有详细的视频介绍
Kerberos认证
从服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。
在“flink-conf.yaml”上配置:
keytab路径。例如:security.kerberos.login.keytab: /opt/testclient/flinkuser.keytab
principal名。例如:security.kerberos.login.principal: flinkuser
对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下:
zookeeper.sasl.disable: false
security.kerberos.login.contexts: Client
如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下:
security.kerberos.login.contexts: Client,KafkaClient
Security Cookie认证
用户需要获取SSL证书,放置到Flink客户端中。
- 参考https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_0621.html
- 拷贝其中代码生成sh脚本,放置在Flink客户端的bin目录下。
- 在客户端目录下执行source bigdata_env。
- 执行命令“sh generatesh ”即可,例如“sh generatekeystore.sh 123456",会在Flink客户端的conf目录下生成flink.keystore,flink.truststore文件。
- 在Flink客户端同级目录下新建ssl目录,例如“/opt/testclient/Flink/flink/ssl”,将生成的flink.keystore,flink.truststore文件拷贝到ssl目录中。
获取证书后,在Flink客户端的conf目录下配置文件“flink-conf.yaml”中将以下配置项进行赋值。
- 将配置项“security.ssl.keystore”设置为keystore文件的相对路径,例如“ssl/flink.keystore”
- 将配置项“security.ssl.truststore”设置为truststore文件的相对路径,例如“ssl/flink.truststore”
- 将配置项“security.cookie”设置为一串密码,该密码可以是一串随机规则密码,可以取默认
- 使用Manager明文加密API进行获取密文:curl-k -i -u 用户名:密码 -X POST -HContent-type:application/json -d '{"plainText":""}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。
- 将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”设置为如上指令获取到的密文。
打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功。
加密传输
配置SSL传输,用户主要在客户端的“flink-conf.yaml”文件中做如下配置:
打开SSL开关和设置SSL加密算法,配置如下:
- ssl.enabled:true
- ssl.enabled: true
- service.ssl.enabled: true
- data.ssl.enabled: true
- ssl.algorithms: TLSDHERSAWITHAES128CBCSHA256,TLSDHEDSSWITHAES128CBCSHA256
配置keystore或truststore文件路径为相对路径时,Flink Client执行命令的目录需要可以直接访问该相对路径。
- 在Flink的CLI yarn-session.sh命令中增加“-t”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/yarn-session.sh -tssl/ ”
- 在Flink run命令中增加“-yt”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/flink run -ytssl/ -ys 3 -yn 3 -m yarn-cluster -c com.huawei.SocketWindowWordCount ../lib/flink-eg-1.0.jar --hostname r3-d3 --port 9000”
配置keystore或truststore文件路径为绝对路径时,需要在Flink Client以及Yarn各个节点的该绝对路径上放置keystore或truststore文件。
- 执行命令中不需要使用“-t”或“-yt”来传输keystore和truststore文件。
将客户端安装节点的业务ip和manager界面浮动ip追加到jobmanager.web.allow-access-address配置中用“,”隔开
三、样例背景
假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:
实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息;其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。data.txt:周末两天网民停留日志
LiuYang,female,20
YuanJing,male,10
GuoYijun,male,5
CaiXuyu,female,50
Liyuan,male,20
FangBo,female,50
LiuYang,female,20
YuanJing,male,10
GuoYijun,male,50
CaiXuyu,female,50
FangBo,female,60
LiuYang,female,20
YuanJing,male,10
CaiXuyu,female,50
FangBo,female,50
GuoYijun,male,5
CaiXuyu,female,50
Liyuan,male,20
CaiXuyu,female,50
FangBo,female,50
LiuYang,female,20
YuanJing,male,10
FangBo,female,50
GuoYijun,male,50
CaiXuyu,female,50
FangBo,female,60
NotExist,female,200
configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”
username,age,company,workLocation,educational,workYear,phone,nativeLocation,school
LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university
YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university
GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university
CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university
Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university
FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university
四、样例调试
前提:Linux环境有安装集群客户端,环境准备,参考第一课
数据规划:
样例工程的流数据存储在文本中,配置表为csv格式文件。
确保集群安装完成,包括HDFS、YARN、安全Redis和Flink。
创建Redis集群,添加Redis用户及权限配置,并下载“user.keytab”和“krb5.conf”文件。
调试步骤:
比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟
检查linux环境的JDK版本为1.8
配置linux环境的/etc/hosts文件
检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息
在IDEA打开样例代码的FlinkConfigtableJavaExample目录,检查SDK配置
默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕
在Flink客户端下创建config目录并将“user.keytab”、“krb5.conf”、“data.txt”、“configtable.csv”、“import.properties”和“read.properties”文件放置在config目录下,文件可在样例代码config目录和data目录获取,例如"/opt/Bigdata/client/Flink/flink/config/configtable.csv"等。
修改“import.properties”和“read.properties”文件的参数,可以参考redis配置
将如下jar包拷贝到“/opt/testclient/Flink/flink/lib”下
- commons-pool2-2.8.0.jar
- flink-dist_2.11-1.10.0-hw-ei-302002.jar
- jredisclient-8.0.2-302002.jar
- super-csv-2.2.0.jar
- wcc_krb5-8.0.2-302002.jar
在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。
在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。
根据实际情况设置Jar包的名称、类型以及输出路径。
选择“Put into Output Root”。然后单击“Apply”。
在IDEA主页面,选择“Build > Build Artifacts...”。在弹出的菜单中构建我们的jar包
从IDEA项目目录下的获取到Jar包,拷贝到Flink客户端目录,如“/opt/testclient/Flink/flink”。
导入configtable.csv进Redis集群。需要先source bigdata_env。
- java -cp /opt/testclient/Flink/flink/lib/*:/opt/testclient/Flink/flink/FlinkConfigtableJavaExample.jar com.huawei.bigdata.flink.examples.RedisDataImport --configPath config/import.properties
启动Flink集群。
- bin/yarn-session.sh -t ssl/ -t config -jm 1024 -tm 1024
流数据以网民姓名为关键字,读取Redis内的网民个人信息并JOIN输出。
- bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample /opt/testclient/Flink/flink/FlinkConfigtableJavaExample.jar --dataPath config/data.txt
使用Flink Web页面查看Flink应用程序运行结果。
五、问题互动渠道
FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html
- 点赞
- 收藏
- 关注作者
评论(0)