实时即未来,大数据项目车联网之原始数据实时ETL任务消费数据策略

举报
Maynor学长 发表于 2022/10/20 14:17:29 2022/10/20
【摘要】 实时ETL业务开发 1 原始数据实时ETL分析根据kafka集群中的数据进行实时ETL过滤,对数据进行划分,并将划分的数据分别落地到不同的数据库中。 2 实时ETL开发流程分析原始数据实时ETL,读取kafka中的原始json数据,解析json数据,flink实时逻辑开发,数据落地到hive与hbasel 实现步骤:原始数据实时ETL流式任务创建原始数据实时ETL任务设置定义任务K...

实时ETL业务开发

1 原始数据实时ETL分析

根据kafka集群中的数据进行实时ETL过滤,对数据进行划分,并将划分的数据分别落地到不同的数据库中。

image-20221007105927589

2 实时ETL开发流程分析

原始数据实时ETL,读取kafka中的原始json数据,解析json数据,flink实时逻辑开发,数据落地到hive与hbase

image-20221007110005934

l 实现步骤:

原始数据实时ETL流式任务创建

原始数据实时ETL任务设置

定义任务Kakfa数据源信息(topic,group,key、value序列化信息等)

任务添加Kafka数据源,消费数据

原始数据json解析

逻辑处理,区分原始数据中的解析失败的数据和解析成功的数据

数据落地HDFSSink,创建Hive表与HDFS数据进行关联

定时任务脚本

image-20221007110019679

3 解析工具类引入

解析工具类创建的程序包路径: cn. maynor.streaming.utils

1 日期处理工具

l 在程序包下创建日期工具类 DateUtil .java

l 定义时间处理的枚举类

l 日期处理工具,包含:字符串转日期,获得当前日期、日期格式化等处理

直接获得当前日期,格式:“yyyy-MM-dd HH:mm:ss”

直接获得当前日期,格式:”yyyyMMdd”

字符串日期格式转换,传入参数格式:“yyyy-MM-dd”,转成Date类型

字符串日期格式转换,传入参数格式:“yyyy-MM-dd HH:mm:ss”,转成Date类型

字符串日期格式转换,传入参数格式:”yyyy-MM-dd HH:mm:ss“,转成”yyyyMMdd”格式

l 测试日期工具类

2 配置文件加载工具

l 配置文件加载工具,加载key在conf.properties文件中对应的值

l 在程序包下创建日期工具类 ConfigLoader

n ClassLoader(类加载器),加载conf.properties

通过Properties的load方法加载InputStream

编写方法实现获得string的key值

编写方法实现获得int的key值

3 字符串处理工具

后续在使用hbase的rowkey生成的时候,为了避免热点问题,将字符串进行反转,因此创建字符串反转方法

l 在程序包下创建日期工具类 StringUtil .java

l 定义字符串反转的方法

4 JSON解析工具

Json解析工具,即编写原始数据json解析工具类,使用第一章json解析中的第二种方式org.json解析:原始数据\sourcedata.txt

l 编写json解析对象:ItcastDataObj对象

n json的key作为对象的属性

扩展字段

类型转换、构造方法、toString方法等

讲义关联资料\ItcastDataObj属性.md

讲义关联资料\ItcastDataObj方法.md

l Json解析工具类: JsonParseUtil

讲义关联资料\JsonParseUtil.java

区分出来正确数据或者异常数据

Vin不为空且 terminalTime(事件时间)不为空,即正常数据,反之异常数据

为扩展字段进行赋值操作:errorData、terminalTimeStamp

l 工具类测试方法: JsonParseUtilTest

img

4 原始数据 实时 ETL任务设置

l Task任务创建所在的程序包: cn. maynor.streaming.task

l 创建task: KafkaSourceDataTask

img

l 原始数据实时ETL任务编写,流任务设置检查点、任务重启策略、kafka分区发现、数据积压

checkpoint

任务重启策略

kafka分区发现

数据积压

1 checkpoint配置

l 选择合适的Checkpoint存储方式

l CheckPoint存储方式存在三种

官方文档:https://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html

n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend

服务器中配置默认状态存储路径:flink-conf.yaml

# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints 服务器环境存储多个挂载的本地磁盘为最优
# prod envirment file:///data/flink/checkpoints
state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints

不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,在代码中设置检查点存储,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择存在挂在存储在磁盘上的FsStateBackend、RocksDBStateBackend。

这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。 受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;

其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。 与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。

“state.checkpoints.dir”参数来指定所有的checkpoints数据和元数据存储的位置

// 本地访问hdfs设置系统属性
System.setProperty("HADOOP_USER_NAME", "root");
//  创建flink的流式环境、设置任务的检查点(数据存储hdfs)、设置分区发现、设置任务的重启策略、数据积压内部解决策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 //   设置流式数据的参照时间 ProcessingTime:事件被处理时机器的系统时间;IngestionTime:事件进入flink事件;EventTime:事件数据中某个字段的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//  2.checkpoint配置 开启检查点,设置时间间隔为5分钟
env.enableCheckpointing(300000);
//  3.检查点Model设置 exactly once 仅消费一次 保证消息不丢失不重复
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 4 防止checkpoint 太过于频繁而导致业务处理的速度下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// 5 设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
// 6 设置checkpoint最大的尝试次数,次数必须 >= 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 7 设置取消任务时保留checkpoint,checkpoint默认会在整个job取消时被删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 8 设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
String hdfsUri = ConfigLoader.getProperty("hdfsUri");
//  9.设置检查点存储的位置,使用RocksDBStateBackend,存储在hdfs分布式文件系统中,增量检查点
try {
    env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask"));
} catch (IOException e) {
    e.printStackTrace();
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。