[项目实践-对接系列] GaussDB(DWS) 对接系列 - Flink实时流入库简单介绍

举报
听风吹雪 发表于 2020/12/30 11:44:05 2020/12/30
【摘要】 Flink作为当下火热的实时流入库组件,被很多客户所钟爱,本篇博客是Gaussdb(DWS)对接Flink实时流入库的简单介绍。

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用JavaScala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

 

当前Flink被友商收购,使用上偏向通过Flink SQL执行Flink作业来消费Kafka的数据,在Flink SQL中,开发者/运维者可以自己定义想要的逻辑,例如将source来的byte流通过udf函数转换成json格式数据,例如通过where条件做过滤,case whensubstring等等常用SQL处理函数皆可使用。处理后的数据会通过sink进入目标端,在我方而言即是数据仓库服务DWS

 

Flink连接DWS目前仍旧是走jdbc接口,所以需要DWS jdbc驱动包,请在华为云DWS数据仓库服务中下载最新的驱动包。

Flink连接DWS第二个必须项是flink connectorConnector里可以定义任何在FlinkDWS过程中希望实现的逻辑。开源纯净版的flink connector只有最简单的单条插入insert功能,在某局点的业务POC场景中,DWSflink connector中加入了如下功能。

  • Insert or ignore 能力,实现的方式为
  1. 数据流copy进入临时表
  2. 通过以下子句实现数据入库目标表,同时通过时间范围进行过滤(目标表创建为按小时分区的分区表)。

Insert into target_table select * from temp_table a where not exists (select 1 from target_table b where a.pk=b.pk and b.date between ‘$begin_date’ and ‘$end_date’)

   由于列存表当前copy膨胀的问题,临时表创建为行存表,在insert语句中应用行存强制向量化的特性优化。

  • 解析json数据并按schema拆开分别插入DWS 目标表中。

该业务场景中目标表大概有480+个字段,在Flink中关联的维表会送来特征值数据(400左右)。通过解析json数据并按特征值schema一个个拆分插入对应的字段中。当前connector的能力可以实现: 当维表中多了新的特征值,目标表手动加上字段,connector可以自动匹配新的特征值并插入对应字段。

在另一个场景中,Flink入库为纯copy入库,将copy的入库方式从串行(数据积攒在flink buffer(积攒够数据量) → copy → commit→ 积攒下一批数据) 修改为异步执行(积攒数据 → copy(同时积攒下一批数据) → commit)  。 这样只要copy的时间不超过积攒数据的时间,理论上入库会造成kafka积压。

具体的Flink SQL中配置connector的语法参考如下(具体参数由内部开发connector的同事定义,可咨询获得详细信息)

create table dws_source

(

Id int,

) with (

   'connector.type' = 'jdbc',

   'connector.url' = 'jdbc:gaussdb://10.***.***.**:25308/postgres',

   'connector.table' = 'table',

   'connector.username' = 'flink_test',

   'connector.password' = '******',

   'connector.driver' = 'com.huawei.gauss200.jdbc.Driver',

   'connector.write.flush.max-rows' = '10000',  --每批攒数条数

   'connector.write.flush.interval' = '30000',    --攒数时间

'connector.write.flush.max-batch-row' = 120000,    --攒够多少条commit一次

   'deduplication.keys' = 'ads_target_track_id,userid',  -- insert or ignore按主键去重字段

   'deduplication.mode' = '1',

   'json.need.to.convert' = 'feature_list_special,track_list_special' –需要做Json解析的字段

);

 

Flink SQL:

--定义source kafka

create table kafka_table

(

)

with (

);

 

--定义target dws

create table dws_source

(

Id int,

) with (

…);

 

--flink sql逻辑

Insert into dws_source

(…)

select

from kafka_table

[where … ]

 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。