华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践

晋红轻 发表于 2021/12/29 11:05:34 2021/12/29
【摘要】 本文学习如何使用flinksql语法解析复杂嵌套的json格式内容。

华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践

背景说明

随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。

SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。

本文介绍如何使用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而处理复杂的嵌套Json格式

Json内容

下面以cdl新增数据的json为例

{
    "schema":{
        "type":"struct",
        "fields":[
            {
                "type":"string",
                "optional":false,
                "field":"DATA_STORE"
            },
            {
                "type":"string",
                "optional":false,
                "field":"SEG_OWNER"
            },
            {
                "type":"string",
                "optional":false,
                "field":"TABLE_NAME"
            },
            {
                "type":"int64",
                "optional":false,
                "name":"org.apache.kafka.connect.data.Timestamp",
                "version":1,
                "field":"TIMESTAMP"
            },
            {
                "type":"string",
                "optional":false,
                "field":"OPERATION"
            },
            {
                "type":"string",
                "optional":true,
                "field":"LOB_COLUMNS"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"array",
                        "items":{
                            "type":"struct",
                            "fields":[
                                {
                                    "type":"string",
                                    "optional":false,
                                    "field":"name"
                                },
                                {
                                    "type":"string",
                                    "optional":true,
                                    "field":"value"
                                }
                            ],
                            "optional":false
                        },
                        "optional":false,
                        "field":"properties"
                    }
                ],
                "optional":false,
                "name":"transaction",
                "field":"transaction"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"int64",
                        "optional":false,
                        "field":"uid"
                    }
                ],
                "optional":true,
                "name":"unique",
                "field":"unique"
            },
            {
                "type":"struct",
                "fields":[
                    {
                        "type":"int64",
                        "optional":false,
                        "field":"uid"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "default":"",
                        "field":"uname"
                    },
                    {
                        "type":"int64",
                        "optional":true,
                        "field":"age"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"sex"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"mostlike"
                    },
                    {
                        "type":"string",
                        "optional":true,
                        "field":"lastview"
                    },
                    {
                        "type":"int64",
                        "optional":true,
                        "field":"totalcost"
                    }
                ],
                "optional":true,
                "name":"data",
                "field":"data"
            },
            {
                "type":"struct",
                "fields":[

                ],
                "optional":true,
                "name":"EMPTY",
                "field":"before"
            },
            {
                "type":"string",
                "optional":true,
                "field":"HEARTBEAT_IDENTIFIER"
            }
        ],
        "optional":false,
        "name":"hudi.hudisource"
    },
    "payload":{
        "DATA_STORE":"MYSQL",
        "SEG_OWNER":"hudi",
        "TABLE_NAME":"hudisource",
        "TIMESTAMP":1631070742000,
        "OPERATION":"INSERT",
        "LOB_COLUMNS":"",
        "transaction":{
            "properties":[
                {
                    "name":"file",
                    "value":"mysql-bin.000005"
                },
                {
                    "name":"pos",
                    "value":"32307"
                },
                {
                    "name":"gtid",
                    "value":""
                }
            ]
        },
        "unique":{
            "uid":11
        },
        "data":{
            "uid":11,
            "uname":"蒋语堂",
            "age":38,
            "sex":"",
            "mostlike":"",
            "lastview":"播放器",
            "totalcost":28732
        },
        "before":null,
        "HEARTBEAT_IDENTIFIER":"998d66cc-1405-40e2-bbdc-41f2adf40724"
    }
}

上面的数据信息为复杂的json嵌套结构,包含了 Map、Array、Row 等类型, 对于这样的复杂格式需要有一种高效的方式进行解析,下面介绍如何实现。

华为FusionInsight MRS Flink WebUI介绍

Flink WebUI提供基于Web的可视化开发平台,用户只需要编写SQL即可开发作业,极大降低作业开发门槛。同时通过作业平台能力开放,支持业务人员自行编写SQL开发作业来快速应对需求,大大减少Flink作业开发工作量。

Flink WebUI主要有以下特点:

  • 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。
  • 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。
  • 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。
  • 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。
  • 图形化作业管理:简单易用。

下面介绍如何使用Flink WebUI开发FlinkSQL DDL语句解析出有效信息

操作步骤

  • 登录华为FusionInisght MRS Flink WebUI

    20210908_115504_73.png

  • 在作业管理选择新建作业创建一个FlinkSQL任务

    20210908_115556_96.png

  • 编辑Flink SQL语句

    SQL说明:创建两张kafka流表,起作用为从kafka源端读取cdl对应topic,解析出需要的字段。并将结果写入另外一个kafka topic

    1. Json 中的每个 {} 都需要用 Row 类型来表示
    2. Json 中的每个 [] 都需要用 Arrary 类型来表示
    3. 数组的下标是从 1 开始的不是 0 如下面 SQL 中的 `schema`.`fields`[1].type
    4. 关键字在任何地方都需要加反引号 如上面 SQL 中的 `type`
    5. select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致
    6. 可使用flink函数比如LOCALTIMESTAMP为获取flink系统时间

    20210908_141252_72.png

CREATE TABLE huditableout_source(
  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,
  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT> >,
  type1 as `schema`.`fields`[1].type,
  optional1 as `schema`.`fields`[1].optional,
  field1 as `schema`.`fields`[1].field,
  type2 as `schema`.`fields`[2].type,
  optional2 as `schema`.`fields`[2].optional,
  field2 as `schema`.`fields`[2].field,
  ts as payload.`TIMESTAMP`,
  uid as payload.`data`.uid,
  uname as payload.`data`.uname,
  age as payload.`data`.age,
  sex as payload.`data`.sex,
  mostlike as payload.`data`.mostlike,
  lastview as payload.`data`.lastview,
  totalcost as payload.`data`.totalcost,
  localts as LOCALTIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
CREATE TABLE huditableout(
  type1 VARCHAR(32),
  optional1 BOOLEAN,
  field1 VARCHAR(32),
  type2 VARCHAR(32),
  optional2 BOOLEAN,
  field2 VARCHAR(32),
  ts BIGINT,
  uid INT,
  uname VARCHAR(32),
  age INT,
  sex VARCHAR(30),
  mostlike VARCHAR(30),
  lastview VARCHAR(30),
  totalcost INT,
  localts TIMESTAMP
) WITH(
  'connector' = 'kafka',
  'topic' = 'huditableout2',
  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',
  'properties.group.id' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
insert into
  huditableout
  select
  type1,
  optional1,
  field1,
  type2,
  optional2,
  field2,
  ts,
  uid,
  uname,
  age,
  sex,
  mostlike,
  lastview,
  totalcost,
  localts
from
  huditableout_source;
  • 点击语义校验,确保语义校验通过

    20210908_142109_23.png

  • 启动该Flink SQL任务

    20210908_142205_53.png

  • 检查结果

    源端kafka 数据

    20210908_142329_14.png

    目标端kafka 数据

    20210908_142409_93.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。