实时即未来,大数据项目车联网之原始数据实时ELT流式任务流程总结
1 原始数据实时ETL任务分析结果落地
1 Json解析hdfs数据映射hive表(建议)
l 用StreamingFileSink方法将正确数据和错误数据写入到对应的HDFS目录中后,需要创建对应的hive表,并关联hdfs数据到hive表中,以实现原始数据实时ETL结果数据落地Hive需求。
l 实现步骤:
- 启动 Hive 并启动 Hive 对应的服务
n hive脚本:分别创建外部分区表:正常数据分区表和错误数据分区表。
外部表:删除表时,不会删除表对应的原始数据
分区表: 便于后期使用分区进行数据查询;便于hdfs分目录存放数据与hive分区数据对应,便于数据存放管理
n shell脚本开发:自定化运行hive脚本,实现自动完成关联hive表与hdfs数据,完成hive脚本的自动化运行。(手动编写shell,提前准备好分区表数据)
#!/bin/bash
#yesterday=`date --date '1 days ago' +%Y%m%d`
yesterday=20200904
tableName="itcast_error"
ssh node03 `/export/servers/apache-hive-2.1-bin/bin/hive -e "use itcast_ods;alter table $tableName add partition (dt='$yesterday') location '/apps/hive/warehouse/ods.db/$tableName/$yesterday'"`
if [ $? -eq 0 ]; then
echo "load $tableName partition $yesterday succesful."
else
echo "load $tableName partition $yesterday error."
fi
#!/bin/bash
#yesterday=`date --date '1 days ago' +%Y%m%d`
yesterday=20200904
tableName="itcast_src"
ssh node03 `/export/servers/apache-hive-2.1-bin/bin/hive -e "use itcast_ods;alter table $tableName add partition (dt='$yesterday') location '/apps/hive/warehouse/ods.db/$tableName/$yesterday'"`
if [ $? -eq 0 ]; then
echo "load $tableName partition $yesterday succesful."
else
echo "load $tableName partition $yesterday error."
fi
- 定时脚本:定时设置shell脚本的运行时间。
srcrdatahive.sh脚本
# 1、创建目录
mkdir -p /export/servers/shell
cd /export/servers/shell
# 2、创建errordatahive.sh脚本
脚本内容为:../shell/errordatahive.sh
# 3、给脚本赋值'可执行'权限
chmod > +x errordatahive.sh
# 4、配置crontab脚本(每天凌晨2点执行脚本)
0 2 * * * /export/servers/shell/srcdatahive.sh
srcdatahive.sh脚本
mkdir -p /export/servers/shell
cd /export/servers/shell
脚本内容为:../shell/srcdatahive.sh
chmod > +x srcdatahive.sh
0 2 * * * /export/servers/shell/errordatahive.sh
注意:为了保证数据的完整,因此采用离线数据常用的时间计算方式*T+1的方式(今天处理昨天的数据)
2 自定义Sink数据写入hive表(不建议)
l 原始数据实时ETL中结果错误数据落地Hive的第一种方法:
- 使用自定义Hive Sink把数据写入到hive表中,创建SaveErrorDataHiveSink
- 实现步骤:
自定义HiveSink,继承RichSinkFunction方法,输入类型为<ItcastDataObj>
重写open、invoke、close方法
l open方法最先执行,用于初始化工作
l invoke方法在open之后执行,执行具体任务和逻辑操作
l close方法在invoke之后执行,用于释放资源
初始化Hive客户端远程访问属性,在open中创建hive连接,在invoke中执行插入数据逻辑,在close方法中关闭数据库对象连接
Task中调用SaveErrorDataHiveSink
SaveErrorDataHiveSink hiveSink = new SaveErrorDataHiveSink("itcast_ods", "itcast_error");
errorJsonData.addSink(hiveSink);
总结
原始数据实时ELT流式任务流程实现步骤:
-
原始数据实时ETL流式任务创建
-
原始数据实时ETL任务设置
-
定义任务Kakfa数据源信息(topic,group,key、value序列化信息等)
-
任务添加Kafka数据源,消费数据
-
原始数据json解析
-
逻辑处理,区分原始数据中的解析失败的数据和解析成功的数据
-
数据落地HDFSSink,创建Hive表与HDFS数据进行关联
-
定时任务脚本
- 点赞
- 收藏
- 关注作者
评论(0)