大数据物流项目:实时增量ETL存储Kudu(七.五)

Maynor学长 发表于 2022/06/28 08:53:40 2022/06/28
【摘要】 07-[掌握]-项目初始化之构建公共模块任务:对项目公共模块进行初始化操作,包含创建表,导入工具类等等。针对物流项目来说,涉及2个系统,物流系统Logistics:48张表和CRM系统:3张表,每张表数据都会封装到JavaBean对象中。==对于数据库中51张表来说:id字段作为表主键,remark字段作为备注说明,cdt和udt分别表示数据创建时间和最后更新数据。==公共模块创建包完成以...

07-[掌握]-项目初始化之构建公共模块

任务:对项目公共模块进行初始化操作,包含创建表,导入工具类等等。

针对物流项目来说,涉及2个系统,物流系统Logistics:48张表和CRM系统:3张表,每张表数据都会封装到JavaBean对象中。

==对于数据库中51张表来说:id字段作为表主键,remark字段作为备注说明,cdt和udt分别表示数据创建时间和最后更新数据。==

1616035908137

公共模块创建包完成以后,如下图所示:

1616036014799

在公共模块【logistics-common】的scala目录下,创建如下程序包

1616036044389

结构如下所示:

1616036056523

导入 JavaBean 对象:数据库中51张表,对应JavaBean实体类,直接放入包中即可

  • 1)、将:资料\公共模块\beans目录下文件导入到common

1616036189879

导入公共处理类:连接数据库工具类等

  • 将:资料\公共模块\utils目录下文件导入到common

1616036279668

重新,刷新整个Maven Project,导入相关依赖。

08-[理解]-实时ETL开发之加载配置文件

任务:首先对ETL模块进行初始化(创建包)和项目属性配置文件(properties)及加载配置。

  • 1)、本次项目采用Scala编程语言,因此创建scala目录

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pm6l9kuW-1652004162163)(/img/1616036610270.png)]

创建完成以后,目录结构如下所示:

1616036624720

  • 2)、每个项目,需要将数据库等连接信息配置到属性文件中,方便测试、开发和生产环境修改

在公共模块【logistics-common】的resources目录创建配置文件:config.properties

# CDH-6.2.1
bigdata.host=node2.itcast.cn

# HDFS
dfs.uri=hdfs://node2.itcast.cn:8020
# Local FS
local.fs.uri=file://

# Kafka
kafka.broker.host=node2.itcast.cn
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2.itcast.cn:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm

# ZooKeeper
zookeeper.host=node2.itcast.cn
zookeeper.port=2181

# Kudu
kudu.rpc.host=node2.itcast.cn
kudu.rpc.port=7051
kudu.http.host=node2.itcast.cn
kudu.http.port=8051

# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2.itcast.cn:8123/logistics
clickhouse.user=root
clickhouse.password=123456

# ElasticSearch
elasticsearch.host=node2.itcast.cn
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200

# Azkaban
app.first.runnable=true

# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=itcast
db.oracle.password=itcast

# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456

## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars

# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars

# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars

需要编写工具类,读取属性文件内容,解析每个Key的值,可使用ResourceBundle

1616037067692

package cn.itcast.logistics.common

import java.util.{Locale, ResourceBundle}

/**
 * 读取配置文件的工具类
 */
object Configuration {
	/**
	 * 定义配置文件操作的对象
	 */
	private lazy val resourceBundle: ResourceBundle = ResourceBundle.getBundle(
		"config", new Locale("zh", "CN")
	)
	private lazy val SEP = ":"
	
	// CDH-6.2.1
	lazy val BIGDATA_HOST: String = resourceBundle.getString("bigdata.host")
	
	// HDFS
	lazy val DFS_URI: String = resourceBundle.getString("dfs.uri")
	
	// Local FS
	lazy val LOCAL_FS_URI: String = resourceBundle.getString("local.fs.uri")
	
	// Kafka
	lazy val KAFKA_BROKER_HOST: String = resourceBundle.getString("kafka.broker.host")
	lazy val KAFKA_BROKER_PORT: Integer = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
	lazy val KAFKA_INIT_TOPIC: String = resourceBundle.getString("kafka.init.topic")
	lazy val KAFKA_LOGISTICS_TOPIC: String = resourceBundle.getString("kafka.logistics.topic")
	lazy val KAFKA_CRM_TOPIC: String = resourceBundle.getString("kafka.crm.topic")
	lazy val KAFKA_ADDRESS: String = KAFKA_BROKER_HOST + SEP + KAFKA_BROKER_PORT
	
	// Spark
	lazy val LOG_OFF = "OFF"
	lazy val LOG_DEBUG = "DEBUG"
	lazy val LOG_INFO = "INFO"
	lazy val LOCAL_HADOOP_HOME = "D:/BigdataUser/hadoop-3.0.0"
	lazy val SPARK_KAFKA_FORMAT = "kafka"
	lazy val SPARK_KUDU_FORMAT = "kudu"
	lazy val SPARK_ES_FORMAT = "es"
	lazy val SPARK_CLICK_HOUSE_FORMAT = "clickhouse"
	
	// ZooKeeper
	lazy val ZOOKEEPER_HOST: String = resourceBundle.getString("zookeeper.host")
	lazy val ZOOKEEPER_PORT: Integer = Integer.valueOf(resourceBundle.getString("zookeeper.port"))
	
	// Kudu
	lazy val KUDU_RPC_HOST: String = resourceBundle.getString("kudu.rpc.host")
	lazy val KUDU_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
	lazy val KUDU_HTTP_HOST: String = resourceBundle.getString("kudu.http.host")
	lazy val KUDU_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
	lazy val KUDU_RPC_ADDRESS: String = KUDU_RPC_HOST + SEP + KUDU_RPC_PORT
	
	// ClickHouse
	lazy val CLICK_HOUSE_DRIVER: String = resourceBundle.getString("clickhouse.driver")
	lazy val CLICK_HOUSE_URL: String = resourceBundle.getString("clickhouse.url")
	lazy val CLICK_HOUSE_USER: String = resourceBundle.getString("clickhouse.user")
	lazy val CLICK_HOUSE_PASSWORD: String = resourceBundle.getString("clickhouse.password")
	
	// ElasticSearch
	lazy val ELASTICSEARCH_HOST: String = resourceBundle.getString("elasticsearch.host")
	lazy val ELASTICSEARCH_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
	lazy val ELASTICSEARCH_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
	lazy val ELASTICSEARCH_ADDRESS: String = ELASTICSEARCH_HOST + SEP + ELASTICSEARCH_HTTP_PORT
	
	// Azkaban
	lazy val IS_FIRST_RUNNABLE: java.lang.Boolean = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))
	
	// ## Data path of ETL program output ##
	// # Run in the yarn mode in Linux
	lazy val SPARK_APP_DFS_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.dfs.checkpoint.dir") // /apps/logistics/dat-hdfs/spark-checkpoint
	lazy val SPARK_APP_DFS_DATA_DIR: String = resourceBundle.getString("spark.app.dfs.data.dir") // /apps/logistics/dat-hdfs/warehouse
	lazy val SPARK_APP_DFS_JARS_DIR: String = resourceBundle.getString("spark.app.dfs.jars.dir") // /apps/logistics/jars
	
	// # Run in the local mode in Linux
	lazy val SPARK_APP_LOCAL_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.local.checkpoint.dir") // /apps/logistics/dat-local/spark-checkpoint
	lazy val SPARK_APP_LOCAL_DATA_DIR: String = resourceBundle.getString("spark.app.local.data.dir") // /apps/logistics/dat-local/warehouse
	lazy val SPARK_APP_LOCAL_JARS_DIR: String = resourceBundle.getString("spark.app.local.jars.dir") // /apps/logistics/jars
	
	// # Running in the local Mode in Windows
	lazy val SPARK_APP_WIN_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.win.checkpoint.dir") // D://apps/logistics/dat-local/spark-checkpoint
	lazy val SPARK_APP_WIN_DATA_DIR: String = resourceBundle.getString("spark.app.win.data.dir") // D://apps/logistics/dat-local/warehouse
	lazy val SPARK_APP_WIN_JARS_DIR: String = resourceBundle.getString("spark.app.win.jars.dir") // D://apps/logistics/jars
	
	// # Oracle JDBC & # MySQL JDBC
	lazy val DB_ORACLE_URL: String = resourceBundle.getString("db.oracle.url")
	lazy val DB_ORACLE_USER: String = resourceBundle.getString("db.oracle.user")
	lazy val DB_ORACLE_PASSWORD: String = resourceBundle.getString("db.oracle.password")
	
	lazy val DB_MYSQL_DRIVER: String = resourceBundle.getString("db.mysql.driver")
	lazy val DB_MYSQL_URL: String = resourceBundle.getString("db.mysql.url")
	lazy val DB_MYSQL_USER: String = resourceBundle.getString("db.mysql.user")
	lazy val DB_MYSQL_PASSWORD: String = resourceBundle.getString("db.mysql.password")
	
	def main(args: Array[String]): Unit = {
		println("DB_ORACLE_URL = " + DB_ORACLE_URL)
		println("KAFKA_ADDRESS = " + KAFKA_ADDRESS)
	}
}

09-[掌握]-实时ETL开发之流计算程序【模板】

任务:==如何编写流式计算程序==,此处使用StructuredStreaming结构化流实时消费数据,进行ETL转换。

1616037425475

具体编写流式程序代码,分为三个部分完成:

  • 第一部分、编写程序【模板】
  • 第二部分、代码编写,消费数据,打印控制台
  • 第三部分、测试,启动MySQL数据库和Canal及Oracle数据库和OGG。

测试程序:==实时从Kafka消费数据(物流系统和CRM系统业务数据),将数据打印在控制台,没有任何逻辑==

step1、构建SparkSession对象
1. 初始化设置Spark Application配置
2. 判断Spark Application运行模式进行设置
3. 构建SparkSession实例对象

step2、消费数据,打印控制台
4. 初始化消费物流Topic数据参数
5. 消费物流Topic数据,打印控制台
6. 初始化消费CRM Topic数据参数
7. 消费CRM Topic数据,打印控制台

step3、启动等待终止
8. 启动流式应用,等待终止

创建对象LogisticsEtlApp,编写main方式, 主要代码步骤如下:

package cn.itcast.logistics.etl.realtime

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
	 * 1. 初始化设置Spark Application配置
	 * 2. 判断Spark Application运行模式进行设置
	 * 3. 构建SparkSession实例对象
	 * 4. 初始化消费物流Topic数据参数
	 * 5. 消费物流Topic数据,打印控制台
	 * 6. 初始化消费CRM Topic数据参数
	 * 7. 消费CRM Topic数据,打印控制台
	 * 8. 启动流式应用,等待终止
 */
object LogisticsEtlApp {
	
	def main(args: Array[String]): Unit = {
		// step1. 构建SparkSession实例对象,设置相关属性参数值
		/*
			1. 初始化设置Spark Application配置
			2. 判断Spark Application运行模式进行设置
			3. 构建SparkSession实例对象
		 */
		val spark: SparkSession = SparkSession.builder()
			.getOrCreate()
		import spark.implicits._
		
		// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
		// step3. 将ETL转换后数据打印到控制台,启动流式应用
		/*
			4. 初始化消费物流Topic数据参数
			5. 消费物流Topic数据,打印控制台
			6. 初始化消费CRM Topic数据参数
			7. 消费CRM Topic数据,打印控制
		 */
		val logisticsDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "logistics")
			.option("maxOffsetsPerTrigger", "100000")
			.load()

		
		// step4. 流式应用启动以后,等待终止,关闭资源
		/*
			8. 启动流式应用,等待终止
		 */
		
	}
	
}

10-[掌握]-实时ETL开发之流计算程序【编程】

编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
	 * 1. 初始化设置Spark Application配置
	 * 2. 判断Spark Application运行模式进行设置
	 * 3. 构建SparkSession实例对象
	 * 4. 初始化消费物流Topic数据参数
	 * 5. 消费物流Topic数据,打印控制台
	 * 6. 初始化消费CRM Topic数据参数
	 * 7. 消费CRM Topic数据,打印控制台
	 * 8. 启动流式应用,等待终止
 */
object LogisticsEtlApp {
	
	def main(args: Array[String]): Unit = {
		// step1. 构建SparkSession实例对象,设置相关属性参数值
		// 1. 初始化设置Spark Application配置
		val sparkConf = new SparkConf()
    		.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
			.set("spark.sql.session.timeZone", "Asia/Shanghai")
			.set("spark.sql.files.maxPartitionBytes", "134217728")
			.set("spark.sql.files.openCostInBytes", "134217728")
			.set("spark.sql.shuffle.partitions", "3")
			.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
		// 2. 判断Spark Application运行模式进行设置
		if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
			//本地环境LOCAL_HADOOP_HOME
			System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
			//设置运行环境和checkpoint路径
			sparkConf
				.set("spark.master", "local[3]")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
		} else {
			//生产环境
			sparkConf
				.set("spark.master", "yarn")
				.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
		}
		// 3. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
    		.config(sparkConf)
			.getOrCreate()
		import spark.implicits._
		
		// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
		// step3. 将ETL转换后数据打印到控制台,启动流式应用
		// 4. 初始化消费物流Topic数据参数
		val logisticsDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "logistics")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 5. 消费物流Topic数据,打印控制台
		logisticsDF.writeStream
			.queryName("query-logistics-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// 6. 初始化消费CRM Topic数据参数
		val crmDF: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
			.option("subscribe", "crm")
			.option("maxOffsetsPerTrigger", "100000")
			.load()
		// 7. 消费CRM Topic数据,打印控制
		crmDF.writeStream
			.queryName("query-crm-console")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "10")
			.option("truncate", "false")
			.start()
		
		// step4. 流式应用启动以后,等待终止,关闭资源
		// 8. 启动流式应用,等待终止
		spark.streams.active.foreach(query => println("启动Query:" + query.name))
		spark.streams.awaitAnyTermination()
	}
	
}

SparkSQL 参数调优设置:

  • 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")

  • 2)、设置读取文件时单个分区可容纳的最大字节数

    set("spark.sql.files.maxPartitionBytes", "134217728")

  • 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")

  • 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")

  • 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小

    set("spark.sql.autoBroadcastJoinThreshold", "67108864")

11-[掌握]-实时ETL开发之流计算程序【测试】

任务:运行编写流式计算程序,实时从Kafka消费数据,打印到控制台上。

1616039305085

  • 2)、启动MySQL数据库和Canal采集CRM系统业务数据
使用VMWare 启动node1.itcast.cn虚拟机,使用root用户(密码123456)登录
1) 启动MySQL数据库
	# 查看容器
	[root@node1 ~]# docker ps -a
	8b5cd2152ed9        mysql:5.7     0.0.0.0:3306->3306/tcp   mysql	
	
	# 启动容器
	[root@node1 ~]# docker start mysql
	myoracle
	
	# 容器状态
	[root@node1 ~]# docker ps
	8b5cd2152ed9        mysql:5.7   Up 6 minutes        0.0.0.0:3306->3306/tcp   mysql	

2) 启动CanalServer服务
	# 查看容器
	[root@node1 ~]# docker ps -a
	28888fad98c9        canal/canal-server:v1.1.2        0.0.0.0:11111->11111/tcp   canal-server
	
	# 启动容器
	[root@node1 ~]# docker start canal-server
	myoracle
	
	# 容器状态
	[root@node1 ~]# docker ps
	28888fad98c9        canal/canal-server:v1.1.2       Up 2 minutes  0.0.0.0:11111->11111/tcp   canal-server	
	
	# 进入容器
	[root@node1 ~]# docker exec -it canal-server /bin/bash
	[root@28888fad98c9 admin]# 
	
	# 进入CanalServer启动脚本目录
	[root@28888fad98c9 admin]# cd canal-server/bin/
	
	# 重启CanalServer服务
	[root@28888fad98c9 bin]# ./restart.sh 
	
	# 退出容器
	[root@28888fad98c9 bin]# exit
  • 3)、启动流式应用程序,对MySQL数据库中CRM系统表数据进行更新和删除

测试运行流式计算程序时,检查本地Checkpoint目录是否存在,如果存在,将其删除。

1616039884369

可以启动Oracle数据库和OGG服务,测试是否消费数据,此处省略。

12-[掌握]-实时ETL开发之实时业务数据测试

任务:运行数据模拟生成器程序,实时向CRM系统或Logistics物流系统插入数据,Canal和OGG采集,流式程序实时消费,以实时CRM系统为例,实时向CRM系统写入数据

1616040115880

运行数据模拟生成器程序,实时产生数据。

1616040236891

  • 2)、运行流式计算程序,查看控制台界面,实时消费Kafka数据

1616040267494

针对物流系统Logistics来说,可以采取同样方式实时产生数据,进行消费。

运行模拟数据生成器:MockLogisticsDataApp,吸怪【isClean=true】表示先清空表的数据,再删除。

1616040359469

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。