实时即未来,大数据项目车联网之实时ETL开发的核心逻辑
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第6天,点击查看活动详情
1 实时ETL开发的核心逻辑
1 自定义方法解析json数据
读取kafka数据后,对原始数据解析,筛选出解析成功的数据,并把数据转换对象,便于后续逻辑操作。
自定义解析json数据为对象:
//TODO 7)将json字符串解析成对象
SingleOutputStreamOperator<ItcastDataObj> itcastDataObjStream = dataStreamSource.map(JsonParseUtil::parseJsonToObject);
itcastDataObjStream.printToErr("解析后的数据>>>");
2 实时ETL数据存储
思考:数据写入hive中,有几种方法?
原始json数据解析完后,逻辑处理部分
解析成功的数据,hive与hbase各存一份
- 先把数据写入hdfs上,再创建hive表:itcast_src,与hdfs进行数据关联
解析失败的数据,存储在hive上
- 先把数据写入hdfs上,再创建hive表:itcast_error,与hdfs进行数据关联,hdfs增量数据,通过定时任务脚本加载到hive表中
2.1 数据落地HDFS:StreamingFileSink
参见:https://ci.apache.org/projects/flink/flink-docs-release-10/zh/dev/connectors/streamfile_sink.html
这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。
Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。
桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。
重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
理解:
由于我们用的是流任务,那么任务会一直持续进行,数据也会持续不断的写出,由于数据是源源不断的产生,那么就需要给数据设立边界,让其完成某个文件数据的写出。不然某个文件会一直处于写入状态中。
那么StreamingFileSink就是一个写出流数据的类
它会将数据分桶(分part)写出到文件中,按照指定规则(时间、文件大小等),完成某一part的写入过程。
比如:每隔1小时或者每当文件大小达到比如1GB的时候,就完成当前文件的写入,将状态标记为Finished,然后开启一个新文件继续写流数据。
数据在写出之前,在Flink内部会按照各个子任务(并行)划分数据桶,每个桶可以包含多个part文件
文件在写的过程中有3个状态:
In-progress :当前文件正在写入中
Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
2.1 文件格式
StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。 这两种变体随附了各自的构建器,可以使用以下静态方法创建:
Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
- 一次写入一行数据
Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
- 一次写入一批数据, 如parquet、avro
重要:批量编码模式仅支持OnCheckpointRollingPolicy策略,在每次checkpoint的时候切割文件。
2.2 桶分配逻辑
简单理解:如何划分桶
桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录
行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd–HH 。日期格式(即桶的大小)和时区都可以手动配置。
我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 。
Flink 有两个内置的 BucketAssigners :
DateTimeBucketAssigner :默认基于时间的分配器
BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
内置的不满足需求可以自定义实现BucketAssigner
2.3 滚动策略
简单理解:啥时候(按时间、按大小等)算完成1个文件的写入。
滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
Flink 有两个内置的滚动策略:
- 核心策略:
u 当没有正在写入的part文件的时候,不工作,
u 当文件达到最大桶大小的时候关闭文件完成写入 (by default 128MB), 可设置
u 当前写入文件写入时长超过默认间隔 (by default 60 sec), 或者 可设置
u 当前文件一定时间内没有写入(by default 60 sec). 可设置
- 核心策略:
u 当进行一次CheckPoint活动的时候,完成当前文件的写入(跟随检查点的节奏走)
u 内置的不满足需求可以自定义实现RollingPolicy
2.2 异常数据落地hdfs
原始数据实时ETL逻辑处理流程,异常数据落地hdfs,使用StreamingFileSink,指定hdfs路径作为异常数据存储基础路径,以当天日期分区存储hive数据
- 第一种方式实现(自定义hiveSink)
u 使用jdbc的方式将数据写入到hive数据库中,这种方式效率比较低
- 第二种实现方式(使用StreamingFileSink)
u 流式写入hdfs,吞吐量较高
实现步骤如下:
-
创建StreamingFileSink对象并设置HDFS数据存储路径
-
从解析后的原始数据汇总,筛选出异常数据
u json格式不正确;缺少必要字段,导致数据格式不正确的原因:
网络原因,车辆行驶在信号差的地方,数据传输不完整;
数据传输过程中,数据会传TSP造成数据丢失;
数据采集终端设备故障,导致数据部分丢失
- 指定异常数据输出的sink为StreamingFileSink
//TODO 8)获取到异常的数据
SingleOutputStreamOperator<ItcastDataObj> errorDataStream = itcastDataObjStream.filter(itcastDataObj -> !StringUtils.isEmpty(itcastDataObj.getErrorData()));
errorDataStream.printToErr("异常数据>>>");
//指定写入的文件名称和格式
OutputFileConfig config = OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".txt").build();
//TODO 9)将异常的数据写入到hdfs中
StreamingFileSink errorFileSink = StreamingFileSink.forRowFormat(new Path(parameterTool.getRequired("hdfsUri")+"/apps/hive/warehouse/ods.db/itcast_error"),
new SimpleStringEncoder<>("utf-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,5秒钟产生一个文件
.withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态时滚动文件
.withMaxPartSize(128*1024*1024)//文件大小,默认是128M滚动一次
.build()
).withOutputFileConfig(config).build();
errorDataStream.map(ItcastDataObj::toHiveString).addSink(errorFileSink);
3 正常数据落地hdfs
采样相同的方法把正常数据写入hdfs中
正常数据写入HDFS,数据合适转换,字段属性之间按’\t’进行分割
-
自定义toHiveString方法,将所有属性使用制表符连接
-
具体定义:toHiveString方法
正常数据会比异常数据大很多,在大数据项目里面对大数据量进行处理的时候,减少操作往往意味着减少资源的浪费。所以经常对数据进行批量处理,而不是一条条的处理。所以正常数据落地hdfs会控制每次落地的数据量大小,是企业中选用的优化方案之一
正常数据写入HDFS实现步骤:
-
创建StreamingFileSink对象并设置HDFS数据存储路径
-
从解析后的原始数据汇总,过滤错误数据,筛选出正确数据
-
指定正确数据输出的sink为StreamingFileSink
-
设置StreamingFileSink批量处理数据容量
-
写入前把数据格式转换为toHiveString返回的字符串,再写入StreamingFileSink中
略
- 点赞
- 收藏
- 关注作者
评论(0)