大数据物流项目:实时增量ETL存储Kudu(八)
Logistics_Day08:实时增量ETL存储Kudu
01-[复习]-上次课程内容回顾
主要讲解2个方面内容:搭建物流项目环境(Maven Project)和结构化流程序(测试)
1、搭建物流项目环境
- Windows系统开发环境初始化
设置HADOOP_HOME:指向在windows下编译HADOOP,bin目录winutils.exe和hadoop.dll
设置hadoop.dll 文件:放置C:\Windows/System32
- 创建MavenProject工程
1个父工程,4个子模块(common、etl、generate、offline)
创建工程、创建模块、加入POM依赖
导入genereate数据生成器模块
进行初始化操作
创建基础包、导入工具类
属性文件:config.properties,连接服务信息参数
2、测试结构化流程序
编写结构化流应用程式,实时从Kafka消费数据(2个Topic,对应2个业务系统数据),将其打印控制台
- 启动数据库和采集框架,对表的数据进行更新和删除,流式是否消费到数据
- 运行数据模拟生成器,实时产生业务数据,插入到数据库表中,流式是否消费到数据
02-[了解]-第8天:课程内容提纲
主要物流项目业务数据实时ETL转换操作,流程如下图中:
process方法
功能
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzpbYYdC-1652014600908)(image-20210526072519874.)]
在流式应用程序中,通常都是从Kafka消费数据,基本上形成固定代码结构
spark.readStream.format("kafka").option("bootstrop.servers").option("topic", "").load()
最重要核心代码逻辑:
对消费到Kafka业务数据(JSON字符串)进行ETL转换
1. JSON 字符串 -> JavaBean对象中
2. JavaBean 对象 -> 提取字段,封装到具体表对应POJO对象,方便存储业务数据
由于物流项目中,需要编写多个流式计算程序,实时消费Kafka数据,进行ETL转换,存储到不同引擎,封装流式计算程序公共接口,定义程序执行业务流程步骤。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eIQ7RaET-1652014600910)(image-20210526084023859.)]
03-[掌握]-实时ETL开发之封装流计算公共接口
为什么封装接口:物流项目来说,需要编写3个流式应用程序,消费业务数据,ETL转换后存储到不同引擎(Kudu、Es和CK),步骤基本类似:
- 1)、第一步、创建SparkSession实例对象,基本相同
- 封装到工具类中,专门创建SparkSession实例对象
- 2)、第二步、从Kafka消费数据,基本相同
- 加载数据:
load
方法- 3)、第三步、对消费JSON数据进行ETL转换,有所变化,不同ETL业务不同,具体实现
- 处理数据:
process
方法- 4)、第四步、将转换后数据保存至外部存储,不一样,具体实现
- 保存数据:
save
方法
==在etl模块【logistics-etl
】的 realtime
包下创建 BasicStreamApp
特质Trait,定义方法==
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEW3MnrA-1652014600911)(1616040734866.)]
- 实现方法:创建
load
,读取Kafka集群指定主题Topi的数据- 实现方法:创建
process
方法,对消费数据进行ETL转换操作- 实现方法:创建
save
方法,将ETL转换后数据保存至外部存储引擎
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 所有ETL流式处理的基类,实时增量ETL至:Kudu、Elasticsearch和ClickHouse都要实现此基类,定义三个方法
* - 1. 加载数据:load
* - 2. 处理数据:process
* - 3. 保存数据:save
*/
trait BasicStreamApp {
/**
* 读取数据的方法
*
* @param spark SparkSession
* @param topic 指定消费的主题
* @param selectExpr 默认值:CAST(value AS STRING)
*/
def load(spark: SparkSession, topic: String, selectExpr: String = "CAST(value AS STRING)"): DataFrame = {
spark.readStream
.format(Configuration.SPARK_KAFKA_FORMAT)
.option("kafka.bootstrap.servers", Configuration.KAFKA_ADDRESS)
.option("subscribe", topic)
.option("maxOffsetsPerTrigger", "100000")
.load()
.selectExpr(selectExpr)
}
/**
* 数据的处理
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
def process(streamDF: DataFrame, category: String): DataFrame
/**
* 数据的保存
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit
}
当公共接口完成以后,某个实时ETL应用,创建对象object时,继承公共接口,实现其中:
process
和save
方法即可。
04-[掌握]-实时ETL开发之SparkUtils工具类
任务:==编写工具类
SparkUtils
,创建SparkSession实例对象,并且可以对应用进行设置。==
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cx4tJuDH-1652014600911)(1616049306846.)]
创建对象
SparkUtils
,按照上述结构,实现具体方法,代码如下所示:
package cn.itcast.logistics.common
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Spark 操作的工具类
*/
object SparkUtils {
// 定义变量,类型匿名函数类型,返回为SparkConf对象
lazy val sparkConf = () => {
new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.sql.session.timeZone", "Asia/Shanghai")
.set("spark.sql.files.maxPartitionBytes", "134217728")
.set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
}
// 依据应用运行操作系统,设置运行模式:local本地、yarn集群
lazy val autoSettingEnv = (sparkConf: SparkConf) => {
if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
//本地环境LOCAL_HADOOP_HOME
System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
//设置运行环境和checkpoint路径
sparkConf
.set("spark.master", "local[3]")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
} else {
//生产环境
sparkConf
.set("spark.master", "yarn")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
}
// 返回设置以后SparkConf对象
sparkConf
}
/**
* 创建sparkSession对象
* @param sparkConf SparkConf实例,设置应用惨啊户数
* @param clazz 每个应用Class实例对象
*/
def createSparkSession(sparkConf: SparkConf, clazz: Class[_]): SparkSession = {
SparkSession.builder()
.appName(clazz.getSimpleName.stripSuffix("$"))
.config(sparkConf)
.getOrCreate()
}
}
编写main方法,创建SparkSession对象,查看4040界面页面(线程休眠即可)
// 测试
def main(args: Array[String]): Unit = {
val spark: SparkSession = createSparkSession(
autoSettingEnv(sparkConf()), this.getClass
)
println(spark)
Thread.sleep(10000000)
spark.stop()
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-irIwFlIe-1652014600912)(1616049966978.)]
05-[理解]-实时ETL开发之KuduStreamApp程序
任务:编写流式程序,实时消费Kafka数据,进行ETL转换,最终存储到Kudu表中,继承公共接口:
BasicStreamApp
,实现其中方法。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ijwk5iEE-1652014600912)(https://gitee.com/the_efforts_paid_offf/picture-blog/raw/master20220508182256.)]
具体开发步骤如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kaLMr39B-1652014600912)(1616050178069.)]
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.SparkUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = ???
/**
* 数据的保存
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = ???
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
import spark.implicits._
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(logisticsDF, "crm")
// step4. 保存转换后数据到外部存储
save(etlLogisticsDF, "logistics-console")
save(etlCrmDF, "crm-console")
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
上述代码,已经实现MAIN方法,接下来只要实现其中:process【ETL转换】和save【保存数据】方法即可。
先不考虑ETL业务逻辑和具体save保存,数据直接进行ETL转换,将数据打印到控制台即可。
- 实现方法:
process
,没有任何逻辑
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jFM2zN69-1652014600913)(1616051030658.)]
- 实现方法:
save
,将数据打印控制台
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pmeOkIof-1652014600913)(1616051118704.)]
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.SparkUtils
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理,此时不进行任何业务逻辑转换
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = {
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据
case "logistics" =>
streamDF
// TODO: CRM系统业务数据
case "crm" =>
streamDF
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
import spark.implicits._
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(logisticsDF, "crm")
// step4. 保存转换后数据到外部存储
save(etlLogisticsDF, "logistics-console")
save(etlCrmDF, "crm-console")
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
编程完成以后,运行流式计算程序:KuduStreamApp,
启动MySQL数据库和Canal及Oracle数据库和OGG
。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SlCVJaiw-1652014600914)(1616051529743.)]
06-[理解]-实时ETL开发之Kafka数据JSON格式
实时从Kafka消费数据,无论是OGG采集还是Canal采集数据,都是以JSON字符串格式发送KafkaTopic,此时需要查看OGG采集数据格式字段和Canal采集数据格式字段。
- 1)、OGG 采集数据格式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Of6k9pCU-1652014600914)(1616051796175.)]
具体分析:插入数据Insert、更新数据update和删除数据delete
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hSS9jVRN-1652014600914)(1616052050412.)]
- 2)、Canal采集数据格式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6qUKxp1f-1652014600915)(1616052064308.)]
具体查看Canal采集数据,分析字段
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mqaOU4ec-1652014600915)(1616052381480.)]
07-[掌握]-实时ETL开发之定义数据Bean对象
无论是OGG采集数据还是Canal采集数据,JSON数据各式字段,基本一致,所以定义JavaBean,分别解析封装数据到JavaBean对象
- 1)、OGG采集JSON数据:7个字段
- 2)、Canal采集JSON数据:12个字段
- 1)、定义 Bean 对象基类
根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:
table
,因此将该属性作为公共属性进行提取,抽象成基类。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zmcLPflD-1652014600916)(1616053553019.)]
package cn.itcast.logistics.common.beans.parser;
import java.io.Serializable;
/**
* 根据数据源定义抽象类,数据源:ogg 和 canal, 两者有共同的table属性
*/
public abstract class MessageBean implements Serializable {
private static final long serialVersionUID = 373363837132138843L;
private String table;
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
@Override
public String toString() {
return table;
}
}
- 2)、定义 OGG 数据 Bean 对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YcbgHW28-1652014600917)(1616053885882.)]
package cn.itcast.logistics.common.beans.parser;
import java.util.Map;
/**
* 定义消费OGG数据的JavaBean对象
* {
* "table": "ITCAST.tbl_route", //表名:库名.表名
* "op_type": "U", //操作类型:U表示修改
* "op_ts": "2020-10-08 09:10:54.000774",
* "current_ts": "2020-10-08T09:11:01.925000",
* "pos": "00000000200006645758",
* "before": { //操作前的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "蚌埠中转部",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
* },
* "after": { //操作后的字段集合
* "id": 104,
* "start_station": "东莞中心",
* "start_station_area_id": 441900,
* "start_warehouse_id": 1,
* "end_station": "TBD",
* "end_station_area_id": 340300,
* "end_warehouse_id": 107,
* "mileage_m": 1369046,
* "time_consumer_minute": 56172,
* "state": 1,
* "cdt": "2020-02-02 18:51:39",
* "udt": "2020-02-02 18:51:39",
* "remark": null
* }
* }
*/
public class OggMessageBean extends MessageBean {
private static final long serialVersionUID = -4763944161833712521L;
//定义操作类型
private String op_type;
@Override
public void setTable(String table) {
//如果表名不为空
if (table != null && !table.equals("")) {
table = table.replaceAll("[A-Z]+\\.", "");
}
super.setTable(table);
}
public String getOp_type() {
return op_type;
}
public void setOp_type(String op_type) {
this.op_type = op_type;
}
public String getOp_ts() {
return op_ts;
}
public void setOp_ts(String op_ts) {
this.op_ts = op_ts;
}
public String getCurrent_ts() {
return current_ts;
}
public void setCurrent_ts(String current_ts) {
this.current_ts = current_ts;
}
public String getPos() {
return pos;
}
public void setPos(String pos) {
this.pos = pos;
}
public Map<String, Object> getBefore() {
return before;
}
public void setBefore(Map<String, Object> before) {
this.before = before;
}
public Map<String, Object> getAfter() {
return after;
}
public void setAfter(Map<String, Object> after) {
this.after = after;
}
//操作时间
private String op_ts;
@Override
public String toString() {
return "OggMessageBean{" +
"table='" + super.getTable() + '\'' +
", op_type='" + op_type + '\'' +
", op_ts='" + op_ts + '\'' +
", current_ts='" + current_ts + '\'' +
", pos='" + pos + '\'' +
", before=" + before +
", after=" + after +
'}';
}
/**
* 返回需要处理的列的集合
* @return
*/
public Map<String, Object> getValue() {
//如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
if (after == null) {
return before;
} else {
return after;
}
}
//同步时间
private String current_ts;
//偏移量
private String pos;
//操作之前的数据
private Map<String, Object> before;
//操作之后的数据
private Map<String, Object> after;
}
当OGG采集数据时,需要获取关心操作的数据,定义方法:
getValue
,删除数据时获取before数据,插入或更新获取after数据,代码如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IapFgUjs-1652014600918)(1616054052067.)]
- 3)、定义 Canal 数据 Bean 对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZckDhdXi-1652014600918)(1616054087741.)]
package cn.itcast.logistics.common.beans.parser;
import java.util.List;
import java.util.Map;
/**
* 定义消费canal数据对应的JavaBean对象
* {
* "data": [{
* "id": "1",
* "name": "北京",
* "tel": "222",
* "mobile": "1111",
* "detail_addr": "北京",
* "area_id": "1",
* "gis_addr": "1",
* "cdt": "2020-10-08 17:20:12",
* "udt": "2020-11-05 17:20:16",
* "remark": null
* }],
* "database": "crm",
* "es": 1602148867000,
* "id": 15,
* "isDdl": false,
* "mysqlType": {
* "id": "bigint(20)",
* "name": "varchar(50)",
* "tel": "varchar(20)",
* "mobile": "varchar(20)",
* "detail_addr": "varchar(100)",
* "area_id": "bigint(20)",
* "gis_addr": "varchar(20)",
* "cdt": "datetime",
* "udt": "datetime",
* "remark": "varchar(100)"
* },
* "old": [{
* "tel": "111"
* }],
* "sql": "",
* "sqlType": {
* "id": -5,
* "name": 12,
* "tel": 12,
* "mobile": 12,
* "detail_addr": 12,
* "area_id": -5,
* "gis_addr": 12,
* "cdt": 93,
* "udt": 93,
* "remark": 12
* },
* "table": "crm_address",
* "ts": 1602148867311,
* "type": "UPDATE" //修改数据
* }
*/
public class CanalMessageBean extends MessageBean {
private static final long serialVersionUID = -3147101694588578078L;
//操作的数据集合
private List<Map<String, Object>> data;
public List<Map<String, Object>> getData() {
return data;
}
public void setData(List<Map<String, Object>> data) {
this.data = data;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public Long getEs() {
return es;
}
public void setEs(Long es) {
this.es = es;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public boolean isDdl() {
return isDdl;
}
public void setDdl(boolean ddl) {
isDdl = ddl;
}
public Map<String, Object> getMysqlType() {
return mysqlType;
}
public void setMysqlType(Map<String, Object> mysqlType) {
this.mysqlType = mysqlType;
}
public String getOld() {
return old;
}
public void setOld(String old) {
this.old = old;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Map<String, Object> getSqlType() {
return sqlType;
}
public void setSqlType(Map<String, Object> sqlType) {
this.sqlType = sqlType;
}
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
//数据库名称
private String database;
private Long es;
private Long id;
private boolean isDdl;
private Map<String, Object> mysqlType;
private String old;
private String sql;
private Map<String, Object> sqlType;
private Long ts;
private String type;
/**
* 重写父类的settable方法,将表名修改成统一的前缀
* @param table
*/
@Override
public void setTable(String table) {
if(table!=null && !table.equals("")){
if(table.startsWith("crm_")) {
table = table.replace("crm_", "tbl_");
}
}
super.setTable(table);
}
}
CRM系统中每个表的名称都有前缀【
crm_
】,解析JSON字符串时,将其替换【tbl_
】
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP63yxtA-1652014600918)(1616054221345.)]
至此,定义MessageBean实体类,接下来,需要编写代码解析JSON字符串为MessageBean对象,封装实例对象中,方便获取各个字段的值,进行相应的处理转换操作。
- 点赞
- 收藏
- 关注作者
评论(0)