实时即未来,大数据项目车联网之实时ETL开发的核心逻辑

举报
Maynor学长 发表于 2022/10/20 14:19:24 2022/10/20
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第6天,点击查看活动详情 1 实时ETL开发的核心逻辑 1 自定义方法解析json数据读取kafka数据后,对原始数据解析,筛选出解析成功的数据,并把数据转换对象,便于后续逻辑操作。自定义解析json数据为对象://TODO 7)将json字符串解析成对象SingleOutputStr...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第6天,点击查看活动详情

1 实时ETL开发的核心逻辑

1 自定义方法解析json数据

读取kafka数据后,对原始数据解析,筛选出解析成功的数据,并把数据转换对象,便于后续逻辑操作。

自定义解析json数据为对象:

//TODO 7)将json字符串解析成对象
SingleOutputStreamOperator<ItcastDataObj> itcastDataObjStream = dataStreamSource.map(JsonParseUtil::parseJsonToObject);
itcastDataObjStream.printToErr("解析后的数据>>>");

2 实时ETL数据存储

思考:数据写入hive中,有几种方法?

原始json数据解析完后,逻辑处理部分

解析成功的数据,hive与hbase各存一份

  • 先把数据写入hdfs上,再创建hive表:itcast_src,与hdfs进行数据关联

解析失败的数据,存储在hive上

  • 先把数据写入hdfs上,再创建hive表:itcast_error,与hdfs进行数据关联,hdfs增量数据,通过定时任务脚本加载到hive表中

image-20221009172428562

2.1 数据落地HDFS:StreamingFileSink

参见:https://ci.apache.org/projects/flink/flink-docs-release-10/zh/dev/connectors/streamfile_sink.html

这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。

Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。

桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。

重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

img

理解:

由于我们用的是流任务,那么任务会一直持续进行,数据也会持续不断的写出,由于数据是源源不断的产生,那么就需要给数据设立边界,让其完成某个文件数据的写出。不然某个文件会一直处于写入状态中。

那么StreamingFileSink就是一个写出流数据的类

它会将数据分桶(分part)写出到文件中,按照指定规则(时间、文件大小等),完成某一part的写入过程。

比如:每隔1小时或者每当文件大小达到比如1GB的时候,就完成当前文件的写入,将状态标记为Finished,然后开启一个新文件继续写流数据。

数据在写出之前,在Flink内部会按照各个子任务(并行)划分数据桶,每个桶可以包含多个part文件

文件在写的过程中有3个状态:

In-progress :当前文件正在写入中

Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

2.1 文件格式

StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。 这两种变体随附了各自的构建器,可以使用以下静态方法创建:

Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)

  • 一次写入一行数据

Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

  • 一次写入一批数据, 如parquet、avro

重要:批量编码模式仅支持OnCheckpointRollingPolicy策略,在每次checkpoint的时候切割文件。

2.2 桶分配逻辑

简单理解:如何划分桶

桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录

行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd–HH 。日期格式(即桶的大小)和时区都可以手动配置。

我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner

Flink 有两个内置的 BucketAssigners :

DateTimeBucketAssigner :默认基于时间的分配器

BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)

内置的不满足需求可以自定义实现BucketAssigner

2.3 滚动策略

简单理解:啥时候(按时间、按大小等)算完成1个文件的写入。

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

Flink 有两个内置的滚动策略:

DefaultRollingPolicy

  • 核心策略:

u 当没有正在写入的part文件的时候,不工作,

u 当文件达到最大桶大小的时候关闭文件完成写入 (by default 128MB), 可设置

u 当前写入文件写入时长超过默认间隔 (by default 60 sec), 或者 可设置

u 当前文件一定时间内没有写入(by default 60 sec). 可设置

OnCheckpointRollingPolicy

  • 核心策略:

u 当进行一次CheckPoint活动的时候,完成当前文件的写入(跟随检查点的节奏走)

u 内置的不满足需求可以自定义实现RollingPolicy

2.2 异常数据落地hdfs

原始数据实时ETL逻辑处理流程,异常数据落地hdfs,使用StreamingFileSink,指定hdfs路径作为异常数据存储基础路径,以当天日期分区存储hive数据

  • 第一种方式实现(自定义hiveSink)

u 使用jdbc的方式将数据写入到hive数据库中,这种方式效率比较低

  • 第二种实现方式(使用StreamingFileSink)

u 流式写入hdfs,吞吐量较高

实现步骤如下:

  • 创建StreamingFileSink对象并设置HDFS数据存储路径

  • 从解析后的原始数据汇总,筛选出异常数据

u json格式不正确;缺少必要字段,导致数据格式不正确的原因:

网络原因,车辆行驶在信号差的地方,数据传输不完整;

数据传输过程中,数据会传TSP造成数据丢失;

数据采集终端设备故障,导致数据部分丢失

  • 指定异常数据输出的sink为StreamingFileSink
//TODO 8)获取到异常的数据
SingleOutputStreamOperator<ItcastDataObj> errorDataStream = itcastDataObjStream.filter(itcastDataObj -> !StringUtils.isEmpty(itcastDataObj.getErrorData()));
errorDataStream.printToErr("异常数据>>>");

//指定写入的文件名称和格式
OutputFileConfig config = OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".txt").build();
//TODO 9)将异常的数据写入到hdfs中
StreamingFileSink errorFileSink = StreamingFileSink.forRowFormat(new Path(parameterTool.getRequired("hdfsUri")+"/apps/hive/warehouse/ods.db/itcast_error"),
        new SimpleStringEncoder<>("utf-8"))
        .withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
        .withRollingPolicy(
                DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,5秒钟产生一个文件
                        .withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态时滚动文件
                        .withMaxPartSize(128*1024*1024)//文件大小,默认是128M滚动一次
                        .build()
        ).withOutputFileConfig(config).build();
errorDataStream.map(ItcastDataObj::toHiveString).addSink(errorFileSink);
3 正常数据落地hdfs

采样相同的方法把正常数据写入hdfs中

正常数据写入HDFS,数据合适转换,字段属性之间按’\t’进行分割

  • 自定义toHiveString方法,将所有属性使用制表符连接

  • 具体定义:toHiveString方法

正常数据会比异常数据大很多,在大数据项目里面对大数据量进行处理的时候,减少操作往往意味着减少资源的浪费。所以经常对数据进行批量处理,而不是一条条的处理。所以正常数据落地hdfs会控制每次落地的数据量大小,是企业中选用的优化方案之一

正常数据写入HDFS实现步骤:

  • 创建StreamingFileSink对象并设置HDFS数据存储路径

  • 从解析后的原始数据汇总,过滤错误数据,筛选出正确数据

  • 指定正确数据输出的sink为StreamingFileSink

  • 设置StreamingFileSink批量处理数据容量

  • 写入前把数据格式转换为toHiveString返回的字符串,再写入StreamingFileSink中

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。