实时即未来,大数据项目车联网之原始数据实时ETL任务消费数据策略
实时ETL业务开发
1 原始数据实时ETL分析
根据kafka集群中的数据进行实时ETL过滤,对数据进行划分,并将划分的数据分别落地到不同的数据库中。
2 实时ETL开发流程分析
原始数据实时ETL,读取kafka中的原始json数据,解析json数据,flink实时逻辑开发,数据落地到hive与hbase
l 实现步骤:
原始数据实时ETL流式任务创建
原始数据实时ETL任务设置
定义任务Kakfa数据源信息(topic,group,key、value序列化信息等)
任务添加Kafka数据源,消费数据
原始数据json解析
逻辑处理,区分原始数据中的解析失败的数据和解析成功的数据
数据落地HDFSSink,创建Hive表与HDFS数据进行关联
定时任务脚本
3 解析工具类引入
解析工具类创建的程序包路径: cn. maynor.streaming.utils
1 日期处理工具
l 在程序包下创建日期工具类 DateUtil .java
l 定义时间处理的枚举类
略
l 日期处理工具,包含:字符串转日期,获得当前日期、日期格式化等处理
直接获得当前日期,格式:“yyyy-MM-dd HH:mm:ss”
直接获得当前日期,格式:”yyyyMMdd”
字符串日期格式转换,传入参数格式:“yyyy-MM-dd”,转成Date类型
字符串日期格式转换,传入参数格式:“yyyy-MM-dd HH:mm:ss”,转成Date类型
字符串日期格式转换,传入参数格式:”yyyy-MM-dd HH:mm:ss“,转成”yyyyMMdd”格式
略
l 测试日期工具类
2 配置文件加载工具
l 配置文件加载工具,加载key在conf.properties文件中对应的值
l 在程序包下创建日期工具类 ConfigLoader
n ClassLoader(类加载器),加载conf.properties
通过Properties的load方法加载InputStream
编写方法实现获得string的key值
编写方法实现获得int的key值
3 字符串处理工具
后续在使用hbase的rowkey生成的时候,为了避免热点问题,将字符串进行反转,因此创建字符串反转方法
l 在程序包下创建日期工具类 StringUtil .java
l 定义字符串反转的方法
略
4 JSON解析工具
Json解析工具,即编写原始数据json解析工具类,使用第一章json解析中的第二种方式org.json解析:原始数据\sourcedata.txt
l 编写json解析对象:ItcastDataObj对象
n json的key作为对象的属性
扩展字段
类型转换、构造方法、toString方法等
l Json解析工具类: JsonParseUtil
区分出来正确数据或者异常数据
Vin不为空且 terminalTime(事件时间)不为空,即正常数据,反之异常数据
为扩展字段进行赋值操作:errorData、terminalTimeStamp
略
l 工具类测试方法: JsonParseUtilTest
4 原始数据 实时 ETL任务设置
l Task任务创建所在的程序包: cn. maynor.streaming.task
l 创建task: KafkaSourceDataTask
l 原始数据实时ETL任务编写,流任务设置检查点、任务重启策略、kafka分区发现、数据积压
checkpoint
任务重启策略
kafka分区发现
数据积压
1 checkpoint配置
l 选择合适的Checkpoint存储方式
l CheckPoint存储方式存在三种
官方文档:https://ci.apache.org/projects/flink/flink-docs-release- 10/ops/state/state_backends.html
n MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend
服务器中配置默认状态存储路径:flink-conf.yaml
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints 服务器环境存储多个挂载的本地磁盘为最优
# prod envirment file:///data/flink/checkpoints
state.checkpoints.dir: hdfs://namenode:8082/flink/checkpoints
不同 StateBackend 之间的性能以及安全性是有很大差异的。通常情况下,在代码中设置检查点存储,MemoryStateBackend 适合应用于测试环境,线上环境则最好选择存在挂在存储在磁盘上的FsStateBackend、RocksDBStateBackend。
这有两个原因:首先,RocksDBStateBackend 是外部存储,其他两种 Checkpoint 存储方式都是 JVM 堆存储。 受限于 JVM 堆内存的大小,Checkpoint 状态大小以及安全性可能会受到一定的制约;
其次,RocksDBStateBackend 支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。 与完整检查点相比,增量检查点可以显著缩短 checkpointing 时间,但代价是需要更长的恢复时间。
“state.checkpoints.dir”参数来指定所有的checkpoints数据和元数据存储的位置
// 本地访问hdfs设置系统属性
System.setProperty("HADOOP_USER_NAME", "root");
// 创建flink的流式环境、设置任务的检查点(数据存储hdfs)、设置分区发现、设置任务的重启策略、数据积压内部解决策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设置流式数据的参照时间 ProcessingTime:事件被处理时机器的系统时间;IngestionTime:事件进入flink事件;EventTime:事件数据中某个字段的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2.checkpoint配置 开启检查点,设置时间间隔为5分钟
env.enableCheckpointing(300000);
// 3.检查点Model设置 exactly once 仅消费一次 保证消息不丢失不重复
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 4 防止checkpoint 太过于频繁而导致业务处理的速度下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// 5 设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
// 6 设置checkpoint最大的尝试次数,次数必须 >= 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 7 设置取消任务时保留checkpoint,checkpoint默认会在整个job取消时被删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 8 设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
String hdfsUri = ConfigLoader.getProperty("hdfsUri");
// 9.设置检查点存储的位置,使用RocksDBStateBackend,存储在hdfs分布式文件系统中,增量检查点
try {
env.setStateBackend(new RocksDBStateBackend(hdfsUri + "/flink/checkpoint/KafkaSourceDataTask"));
} catch (IOException e) {
e.printStackTrace();
}
- 点赞
- 收藏
- 关注作者
评论(0)