[项目实践-对接系列] GaussDB(DWS) 对接系列 - Flink实时流入库简单介绍
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。
当前Flink被友商收购,使用上偏向通过Flink SQL执行Flink作业来消费Kafka的数据,在Flink SQL中,开发者/运维者可以自己定义想要的逻辑,例如将source来的byte流通过udf函数转换成json格式数据,例如通过where条件做过滤,case when、substring等等常用SQL处理函数皆可使用。处理后的数据会通过sink进入目标端,在我方而言即是数据仓库服务DWS。
Flink连接DWS目前仍旧是走jdbc接口,所以需要DWS jdbc驱动包,请在华为云DWS数据仓库服务中下载最新的驱动包。
Flink连接DWS第二个必须项是flink connector。 Connector里可以定义任何在Flink→DWS过程中希望实现的逻辑。开源纯净版的flink connector只有最简单的单条插入insert功能,在某局点的业务POC场景中,DWS在flink connector中加入了如下功能。
- Insert or ignore 能力,实现的方式为
- 数据流copy进入临时表
- 通过以下子句实现数据入库目标表,同时通过时间范围进行过滤(目标表创建为按小时分区的分区表)。
Insert into target_table select * from temp_table a where not exists (select 1 from target_table b where a.pk=b.pk and b.date between ‘$begin_date’ and ‘$end_date’)。
由于列存表当前copy膨胀的问题,临时表创建为行存表,在insert语句中应用行存强制向量化的特性优化。
- 解析json数据并按schema拆开分别插入DWS 目标表中。
该业务场景中目标表大概有480+个字段,在Flink中关联的维表会送来特征值数据(400左右)。通过解析json数据并按特征值schema一个个拆分插入对应的字段中。当前connector的能力可以实现: 当维表中多了新的特征值,目标表手动加上字段,connector可以自动匹配新的特征值并插入对应字段。
在另一个场景中,Flink入库为纯copy入库,将copy的入库方式从串行(数据积攒在flink buffer中(积攒够数据量) → copy → commit→ 积攒下一批数据) 修改为异步执行(积攒数据 → copy(同时积攒下一批数据) → commit) 。 这样只要copy的时间不超过积攒数据的时间,理论上入库会造成kafka积压。
具体的Flink SQL中配置connector的语法参考如下(具体参数由内部开发connector的同事定义,可咨询获得详细信息)
create table dws_source
(
Id int,
…
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:gaussdb://10.***.***.**:25308/postgres',
'connector.table' = 'table',
'connector.username' = 'flink_test',
'connector.password' = '******',
'connector.driver' = 'com.huawei.gauss200.jdbc.Driver',
'connector.write.flush.max-rows' = '10000', --每批攒数条数
'connector.write.flush.interval' = '30000', --攒数时间
'connector.write.flush.max-batch-row' = 120000, --攒够多少条commit一次
'deduplication.keys' = 'ads_target_track_id,userid', -- insert or ignore按主键去重字段
'deduplication.mode' = '1',
'json.need.to.convert' = 'feature_list_special,track_list_special' –需要做Json解析的字段
);
Flink SQL:
--定义source kafka
create table kafka_table
(
…
)
with (
…
);
--定义target dws表
create table dws_source
(
Id int,
…
) with (
…);
--flink sql逻辑
Insert into dws_source
(…)
select
…
from kafka_table
[where … ]
- 点赞
- 收藏
- 关注作者
评论(0)