实时即未来,大数据项目车联网之原始数据实时ELT流式任务流程总结

举报
Maynor学长 发表于 2022/10/20 14:21:02 2022/10/20
【摘要】 1 原始数据实时ETL任务分析结果落地 1 Json解析hdfs数据映射hive表(建议)l 用StreamingFileSink方法将正确数据和错误数据写入到对应的HDFS目录中后,需要创建对应的hive表,并关联hdfs数据到hive表中,以实现原始数据实时ETL结果数据落地Hive需求。l 实现步骤:启动 Hive 并启动 Hive 对应的服务n hive脚本:分别创建外部分区表...

1 原始数据实时ETL任务分析结果落地

1 Json解析hdfs数据映射hive表(建议)

l 用StreamingFileSink方法将正确数据和错误数据写入到对应的HDFS目录中后,需要创建对应的hive表,并关联hdfs数据到hive表中,以实现原始数据实时ETL结果数据落地Hive需求。

l 实现步骤:

  • 启动 Hive 并启动 Hive 对应的服务

n hive脚本:分别创建外部分区表:正常数据分区表和错误数据分区表。

外部表:删除表时,不会删除表对应的原始数据

分区表: 便于后期使用分区进行数据查询;便于hdfs分目录存放数据与hive分区数据对应,便于数据存放管理

讲义关联资料\hive.md

n shell脚本开发:自定化运行hive脚本,实现自动完成关联hive表与hdfs数据,完成hive脚本的自动化运行。(手动编写shell,提前准备好分区表数据)

errordatahive.sh

#!/bin/bash
#yesterday=`date --date '1 days ago' +%Y%m%d`
yesterday=20200904
tableName="itcast_error"
ssh node03 `/export/servers/apache-hive-2.1-bin/bin/hive -e "use itcast_ods;alter table $tableName add partition (dt='$yesterday') location '/apps/hive/warehouse/ods.db/$tableName/$yesterday'"`
if [ $? -eq 0 ]; then
  echo "load $tableName partition $yesterday succesful."
else
  echo "load $tableName partition $yesterday error."
fi

srcdatahive.sh

#!/bin/bash
#yesterday=`date --date '1 days ago' +%Y%m%d`
yesterday=20200904
tableName="itcast_src"
ssh node03 `/export/servers/apache-hive-2.1-bin/bin/hive -e "use itcast_ods;alter table $tableName add partition (dt='$yesterday') location '/apps/hive/warehouse/ods.db/$tableName/$yesterday'"`
if [ $? -eq 0 ]; then
  echo "load $tableName partition $yesterday succesful."
else
  echo "load $tableName partition $yesterday error."
fi
  • 定时脚本:定时设置shell脚本的运行时间。

srcrdatahive.sh脚本

# 1、创建目录
mkdir -p /export/servers/shell
cd /export/servers/shell
# 2、创建errordatahive.sh脚本
脚本内容为:../shell/errordatahive.sh
# 3、给脚本赋值'可执行'权限
chmod > +x errordatahive.sh
# 4、配置crontab脚本(每天凌晨2点执行脚本)
0 2 * * * /export/servers/shell/srcdatahive.sh

srcdatahive.sh脚本

mkdir -p /export/servers/shell
cd /export/servers/shell
脚本内容为:../shell/srcdatahive.sh
chmod > +x srcdatahive.sh
0 2 * * * /export/servers/shell/errordatahive.sh

注意:为了保证数据的完整,因此采用离线数据常用的时间计算方式*T+1的方式(今天处理昨天的数据)

2 自定义Sink数据写入hive表(不建议)

l 原始数据实时ETL中结果错误数据落地Hive的第一种方法:

  • 使用自定义Hive Sink把数据写入到hive表中,创建SaveErrorDataHiveSink

img

  • 实现步骤:

自定义HiveSink,继承RichSinkFunction方法,输入类型为<ItcastDataObj>

重写open、invoke、close方法

l open方法最先执行,用于初始化工作

l invoke方法在open之后执行,执行具体任务和逻辑操作

l close方法在invoke之后执行,用于释放资源

初始化Hive客户端远程访问属性,在open中创建hive连接,在invoke中执行插入数据逻辑,在close方法中关闭数据库对象连接

Task中调用SaveErrorDataHiveSink

SaveErrorDataHiveSink hiveSink = new SaveErrorDataHiveSink("itcast_ods", "itcast_error");
errorJsonData.addSink(hiveSink);

总结

原始数据实时ELT流式任务流程实现步骤:

  • 原始数据实时ETL流式任务创建

  • 原始数据实时ETL任务设置

  • 定义任务Kakfa数据源信息(topic,group,key、value序列化信息等)

  • 任务添加Kafka数据源,消费数据

  • 原始数据json解析

  • 逻辑处理,区分原始数据中的解析失败的数据和解析成功的数据

  • 数据落地HDFSSink,创建Hive表与HDFS数据进行关联

  • 定时任务脚本

image-20221010171108431

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。