MRS二次开发(20/27): Flink TableAPI使用样例

举报
晋红轻 发表于 2020/12/26 11:37:35 2020/12/26
【摘要】 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。本文介绍FusionInsight MRS Flink TableAPI使用样例.

MRS二次开发(20/27): Flink TableAPI使用样例

一、Flink简介

    Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。

二、Flink客户端准备,可参考MRS二次开发(15/27)Flink构造DataStream样例,有详细的视频介绍

Kerberos认证

从服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。

“flink-conf.yaml”上配置:

keytab路径。例如:security.kerberos.login.keytab: /opt/testclient/flinkuser.keytab

principal名。例如:security.kerberos.login.principal: flinkuser

对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下:

    zookeeper.sasl.disable: false

    security.kerberos.login.contexts: Client

如果用户对于Kafka clientKafka broker之间也需要做kerberos认证,配置如下:

    security.kerberos.login.contexts: Client,KafkaClient

 Security Cookie认证 

用户需要获取SSL证书,放置到Flink客户端中。

 拷贝其中代码生成generate_keystore.sh脚本,放置在Flink客户端的bin目录下。

 在客户端目录下执行source bigdata_env

 执行命令“sh generate_keystore.sh <password>”即可,例如“sh generate_keystore.sh 123456",会在Flink客户端的conf目录下生成flink.keystoreflink.truststore文件。

 Flink客户端同级目录下新建ssl目录,例如“/opt/testclient/Flink/flink/ssl”,将生成的flink.keystoreflink.truststore文件拷贝到ssl目录中。

获取证书后,在Flink客户端的conf目录下配置文件“flink-conf.yaml”中将以下配置项进行赋值。

  • 将配置项“security.ssl.keystore”设置为keystore文件的相对路径,例如“ssl/flink.keystore”
  • 将配置项“security.ssl.truststore”设置为truststore文件的相对路径,例如“ssl/flink.truststore”
  • 将配置项“security.cookie”设置为一串密码,该密码可以是一串随机规则密码,可以取默认
  • 使用Manager明文加密API进行获取密文:curl -k -i -u 用户名:密码 -X POST -HContent-type:application/json -d '{"plainText":""}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。
  • 将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”设置为如上指令获取到的密文。

打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功。

加密传输

配置SSL传输,用户主要在客户端的“flink-conf.yaml”文件中做如下配置:

打开SSL开关和设置SSL加密算法,配置如下:

  • ssl.enabled:true
  • ssl.enabled: true
  • service.ssl.enabled: true
  • data.ssl.enabled: true
  • ssl.algorithms: TLSDHERSAWITHAES128CBCSHA256,TLSDHEDSSWITHAES128CBCSHA256

配置keystore或truststore文件路径为相对路径时,Flink Client执行命令的目录需要可以直接访问该相对路径。

  • Flink的CLI yarn-session.sh命令中增加“-t”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/yarn-session.sh -tssl/ ”
  • Flink run命令中增加“-yt”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/flink run -ytssl/ -ys 3 -yn 3 -m yarn-cluster -c com.huawei.SocketWindowWordCount ../lib/flink-eg-1.0.jar --hostname r3-d3 --port 9000”

配置keystore或truststore文件路径为绝对路径时,需要在Flink Client以及Yarn各个节点的该绝对路径上放置keystore或truststore文件。

  • 执行命令中不需要使用“-t”或“-yt”来传输keystore和truststore文件。

将客户端安装节点的业务ip和manager界面浮动ip追加到jobmanager.web.allow-access-address配置中用“,”隔开

 

三、样例背景

 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。
 基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

四、样例调试

前提:Linux环境有安装集群客户端,环境准备,参考第一课

数据规划:

业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。kafka配置可参考MRS二次开发(18/27): Flink读写Kafka样例。
业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
a. 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
b. 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。
 

调试步骤:

比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟

检查linux环境的JDK版本为1.8

配置linux环境的/etc/hosts文件

检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息

IDEA打开样例代码的FlinkStreamSqlJoinExample目录,检查SDK配置

默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕

IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。

“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。

根据实际情况设置Jar包的名称、类型以及输出路径。

选择“Put into Output Root”。然后单击“Apply”。

IDEA主页面,选择“Build > Build Artifacts...”。在弹出的菜单中构建我们的jar包

IDEA项目out目录下的获取到Jar包,拷贝到Flink客户端目录,如“/opt/testclient/Flink/flink”。

将如下包拷到"/opt/testclient/Flink/flink/lib"目录下,若已存在则忽略。

  • kafka-clients-2.4.0-hw-ei-302002.jar
  • flink-connector-kafka_2.11-1.10.0-hw-ei-302002.jar
  • flink-connector-kafka-0.10_2.11-1.10.0-hw-ei-302002.jar
  • flink-connector-kafka-0.9_2.11-1.10.0-hw-ei-302002.jar
  • flink-connector-kafka-base_2.11-1.10.0-hw-ei-302002.jar
  • flink-dist_2.11-1.10.0-hw-ei-302002.jar
  • flink-table_2.11-1.10.0-hw-ei-302002.jar

Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn session命令,启动flink集群。

  • 例如我们使用ssl相对路径指令为“bin/yarn-session.sh -tssl/ -jm 1024 -tm 1024”

在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码,例如:

  1. 启动程序向Kafka生产
  • bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/testclient/Flink/flink/FlinkStreamSqlJoinExample.jar --topic topic3 --bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASLPLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoopcom
  1. 在集群内任一节点启动netcat命令,等待应用程序连接
  • nc -l -p 9000
  1. 启动程序接受Socket数据,并执行联合查询。
  • bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/testclient/Flink/flink/FlinkStreamSqlJoinExample.jar --topic topic3 --bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASLPLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoopcom --hostname 10.244.230.229 --port 9000

使用Flink Web页面查看Flink应用程序运行情况。

五、问题互动渠道

FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。