大数据物流项目:实时增量ETL存储Kudu(七.五)
07-[掌握]-项目初始化之构建公共模块
任务:对项目公共模块进行初始化操作,包含创建表,导入工具类等等。
针对物流项目来说,涉及2个系统,物流系统Logistics:48张表和CRM系统:3张表,每张表数据都会封装到JavaBean对象中。
==对于数据库中51张表来说:id字段作为表主键,remark字段作为备注说明,cdt和udt分别表示数据创建时间和最后更新数据。==
公共模块创建包完成以后,如下图所示:
在公共模块【
logistics-common
】的scala
目录下,创建如下程序包
结构如下所示:
导入
JavaBean
对象:数据库中51张表,对应JavaBean实体类,直接放入包中即可
- 1)、将:
资料\公共模块\beans
目录下文件导入到common
包
导入公共处理类:连接数据库工具类等
- 将:
资料\公共模块\utils
目录下文件导入到common
包
重新,刷新整个Maven Project,导入相关依赖。
08-[理解]-实时ETL开发之加载配置文件
任务:首先对ETL模块进行初始化(创建包)和项目属性配置文件(
properties
)及加载配置。
- 1)、本次项目采用Scala编程语言,因此创建
scala目录
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pm6l9kuW-1652004162163)(/img/1616036610270.png)]
创建完成以后,目录结构如下所示:
- 2)、每个项目,需要将数据库等连接信息配置到属性文件中,方便测试、开发和生产环境修改
在公共模块【
logistics-common
】的resources
目录创建配置文件:config.properties
。
# CDH-6.2.1
bigdata.host=node2.itcast.cn
# HDFS
dfs.uri=hdfs://node2.itcast.cn:8020
# Local FS
local.fs.uri=file://
# Kafka
kafka.broker.host=node2.itcast.cn
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2.itcast.cn:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm
# ZooKeeper
zookeeper.host=node2.itcast.cn
zookeeper.port=2181
# Kudu
kudu.rpc.host=node2.itcast.cn
kudu.rpc.port=7051
kudu.http.host=node2.itcast.cn
kudu.http.port=8051
# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2.itcast.cn:8123/logistics
clickhouse.user=root
clickhouse.password=123456
# ElasticSearch
elasticsearch.host=node2.itcast.cn
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200
# Azkaban
app.first.runnable=true
# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=itcast
db.oracle.password=itcast
# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456
## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars
# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars
# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars
需要编写工具类,读取属性文件内容,解析每个Key的值,可使用
ResourceBundle
。
package cn.itcast.logistics.common
import java.util.{Locale, ResourceBundle}
/**
* 读取配置文件的工具类
*/
object Configuration {
/**
* 定义配置文件操作的对象
*/
private lazy val resourceBundle: ResourceBundle = ResourceBundle.getBundle(
"config", new Locale("zh", "CN")
)
private lazy val SEP = ":"
// CDH-6.2.1
lazy val BIGDATA_HOST: String = resourceBundle.getString("bigdata.host")
// HDFS
lazy val DFS_URI: String = resourceBundle.getString("dfs.uri")
// Local FS
lazy val LOCAL_FS_URI: String = resourceBundle.getString("local.fs.uri")
// Kafka
lazy val KAFKA_BROKER_HOST: String = resourceBundle.getString("kafka.broker.host")
lazy val KAFKA_BROKER_PORT: Integer = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
lazy val KAFKA_INIT_TOPIC: String = resourceBundle.getString("kafka.init.topic")
lazy val KAFKA_LOGISTICS_TOPIC: String = resourceBundle.getString("kafka.logistics.topic")
lazy val KAFKA_CRM_TOPIC: String = resourceBundle.getString("kafka.crm.topic")
lazy val KAFKA_ADDRESS: String = KAFKA_BROKER_HOST + SEP + KAFKA_BROKER_PORT
// Spark
lazy val LOG_OFF = "OFF"
lazy val LOG_DEBUG = "DEBUG"
lazy val LOG_INFO = "INFO"
lazy val LOCAL_HADOOP_HOME = "D:/BigdataUser/hadoop-3.0.0"
lazy val SPARK_KAFKA_FORMAT = "kafka"
lazy val SPARK_KUDU_FORMAT = "kudu"
lazy val SPARK_ES_FORMAT = "es"
lazy val SPARK_CLICK_HOUSE_FORMAT = "clickhouse"
// ZooKeeper
lazy val ZOOKEEPER_HOST: String = resourceBundle.getString("zookeeper.host")
lazy val ZOOKEEPER_PORT: Integer = Integer.valueOf(resourceBundle.getString("zookeeper.port"))
// Kudu
lazy val KUDU_RPC_HOST: String = resourceBundle.getString("kudu.rpc.host")
lazy val KUDU_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
lazy val KUDU_HTTP_HOST: String = resourceBundle.getString("kudu.http.host")
lazy val KUDU_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
lazy val KUDU_RPC_ADDRESS: String = KUDU_RPC_HOST + SEP + KUDU_RPC_PORT
// ClickHouse
lazy val CLICK_HOUSE_DRIVER: String = resourceBundle.getString("clickhouse.driver")
lazy val CLICK_HOUSE_URL: String = resourceBundle.getString("clickhouse.url")
lazy val CLICK_HOUSE_USER: String = resourceBundle.getString("clickhouse.user")
lazy val CLICK_HOUSE_PASSWORD: String = resourceBundle.getString("clickhouse.password")
// ElasticSearch
lazy val ELASTICSEARCH_HOST: String = resourceBundle.getString("elasticsearch.host")
lazy val ELASTICSEARCH_RPC_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
lazy val ELASTICSEARCH_HTTP_PORT: Integer = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
lazy val ELASTICSEARCH_ADDRESS: String = ELASTICSEARCH_HOST + SEP + ELASTICSEARCH_HTTP_PORT
// Azkaban
lazy val IS_FIRST_RUNNABLE: java.lang.Boolean = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))
// ## Data path of ETL program output ##
// # Run in the yarn mode in Linux
lazy val SPARK_APP_DFS_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.dfs.checkpoint.dir") // /apps/logistics/dat-hdfs/spark-checkpoint
lazy val SPARK_APP_DFS_DATA_DIR: String = resourceBundle.getString("spark.app.dfs.data.dir") // /apps/logistics/dat-hdfs/warehouse
lazy val SPARK_APP_DFS_JARS_DIR: String = resourceBundle.getString("spark.app.dfs.jars.dir") // /apps/logistics/jars
// # Run in the local mode in Linux
lazy val SPARK_APP_LOCAL_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.local.checkpoint.dir") // /apps/logistics/dat-local/spark-checkpoint
lazy val SPARK_APP_LOCAL_DATA_DIR: String = resourceBundle.getString("spark.app.local.data.dir") // /apps/logistics/dat-local/warehouse
lazy val SPARK_APP_LOCAL_JARS_DIR: String = resourceBundle.getString("spark.app.local.jars.dir") // /apps/logistics/jars
// # Running in the local Mode in Windows
lazy val SPARK_APP_WIN_CHECKPOINT_DIR: String = resourceBundle.getString("spark.app.win.checkpoint.dir") // D://apps/logistics/dat-local/spark-checkpoint
lazy val SPARK_APP_WIN_DATA_DIR: String = resourceBundle.getString("spark.app.win.data.dir") // D://apps/logistics/dat-local/warehouse
lazy val SPARK_APP_WIN_JARS_DIR: String = resourceBundle.getString("spark.app.win.jars.dir") // D://apps/logistics/jars
// # Oracle JDBC & # MySQL JDBC
lazy val DB_ORACLE_URL: String = resourceBundle.getString("db.oracle.url")
lazy val DB_ORACLE_USER: String = resourceBundle.getString("db.oracle.user")
lazy val DB_ORACLE_PASSWORD: String = resourceBundle.getString("db.oracle.password")
lazy val DB_MYSQL_DRIVER: String = resourceBundle.getString("db.mysql.driver")
lazy val DB_MYSQL_URL: String = resourceBundle.getString("db.mysql.url")
lazy val DB_MYSQL_USER: String = resourceBundle.getString("db.mysql.user")
lazy val DB_MYSQL_PASSWORD: String = resourceBundle.getString("db.mysql.password")
def main(args: Array[String]): Unit = {
println("DB_ORACLE_URL = " + DB_ORACLE_URL)
println("KAFKA_ADDRESS = " + KAFKA_ADDRESS)
}
}
09-[掌握]-实时ETL开发之流计算程序【模板】
任务:==如何编写流式计算程序==,此处使用StructuredStreaming结构化流实时消费数据,进行ETL转换。
具体编写流式程序代码,分为三个部分完成:
- 第一部分、编写程序【模板】
- 第二部分、代码编写,消费数据,打印控制台
- 第三部分、测试,启动MySQL数据库和Canal及Oracle数据库和OGG。
测试程序:==实时从Kafka消费数据(物流系统和CRM系统业务数据),将数据打印在控制台,没有任何逻辑==
step1、构建SparkSession对象
1. 初始化设置Spark Application配置
2. 判断Spark Application运行模式进行设置
3. 构建SparkSession实例对象
step2、消费数据,打印控制台
4. 初始化消费物流Topic数据参数
5. 消费物流Topic数据,打印控制台
6. 初始化消费CRM Topic数据参数
7. 消费CRM Topic数据,打印控制台
step3、启动等待终止
8. 启动流式应用,等待终止
创建对象LogisticsEtlApp,编写main方式, 主要代码步骤如下:
package cn.itcast.logistics.etl.realtime
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
* 1. 初始化设置Spark Application配置
* 2. 判断Spark Application运行模式进行设置
* 3. 构建SparkSession实例对象
* 4. 初始化消费物流Topic数据参数
* 5. 消费物流Topic数据,打印控制台
* 6. 初始化消费CRM Topic数据参数
* 7. 消费CRM Topic数据,打印控制台
* 8. 启动流式应用,等待终止
*/
object LogisticsEtlApp {
def main(args: Array[String]): Unit = {
// step1. 构建SparkSession实例对象,设置相关属性参数值
/*
1. 初始化设置Spark Application配置
2. 判断Spark Application运行模式进行设置
3. 构建SparkSession实例对象
*/
val spark: SparkSession = SparkSession.builder()
.getOrCreate()
import spark.implicits._
// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
// step3. 将ETL转换后数据打印到控制台,启动流式应用
/*
4. 初始化消费物流Topic数据参数
5. 消费物流Topic数据,打印控制台
6. 初始化消费CRM Topic数据参数
7. 消费CRM Topic数据,打印控制
*/
val logisticsDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
.option("subscribe", "logistics")
.option("maxOffsetsPerTrigger", "100000")
.load()
// step4. 流式应用启动以后,等待终止,关闭资源
/*
8. 启动流式应用,等待终止
*/
}
}
10-[掌握]-实时ETL开发之流计算程序【编程】
编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
* 1. 初始化设置Spark Application配置
* 2. 判断Spark Application运行模式进行设置
* 3. 构建SparkSession实例对象
* 4. 初始化消费物流Topic数据参数
* 5. 消费物流Topic数据,打印控制台
* 6. 初始化消费CRM Topic数据参数
* 7. 消费CRM Topic数据,打印控制台
* 8. 启动流式应用,等待终止
*/
object LogisticsEtlApp {
def main(args: Array[String]): Unit = {
// step1. 构建SparkSession实例对象,设置相关属性参数值
// 1. 初始化设置Spark Application配置
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.sql.session.timeZone", "Asia/Shanghai")
.set("spark.sql.files.maxPartitionBytes", "134217728")
.set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
// 2. 判断Spark Application运行模式进行设置
if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
//本地环境LOCAL_HADOOP_HOME
System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
//设置运行环境和checkpoint路径
sparkConf
.set("spark.master", "local[3]")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
} else {
//生产环境
sparkConf
.set("spark.master", "yarn")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
}
// 3. 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
// step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
// step3. 将ETL转换后数据打印到控制台,启动流式应用
// 4. 初始化消费物流Topic数据参数
val logisticsDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
.option("subscribe", "logistics")
.option("maxOffsetsPerTrigger", "100000")
.load()
// 5. 消费物流Topic数据,打印控制台
logisticsDF.writeStream
.queryName("query-logistics-console")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
// 6. 初始化消费CRM Topic数据参数
val crmDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
.option("subscribe", "crm")
.option("maxOffsetsPerTrigger", "100000")
.load()
// 7. 消费CRM Topic数据,打印控制
crmDF.writeStream
.queryName("query-crm-console")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
// step4. 流式应用启动以后,等待终止,关闭资源
// 8. 启动流式应用,等待终止
spark.streams.active.foreach(query => println("启动Query:" + query.name))
spark.streams.awaitAnyTermination()
}
}
SparkSQL 参数调优设置:
1)、设置会话时区:
set("spark.sql.session.timeZone", "Asia/Shanghai")
2)、设置读取文件时单个分区可容纳的最大字节数
set("spark.sql.files.maxPartitionBytes", "134217728")
3)、设置合并小文件的阈值:
set("spark.sql.files.openCostInBytes", "134217728")
4)、设置 shuffle 分区数:
set("spark.sql.shuffle.partitions", "4")
5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小
set("spark.sql.autoBroadcastJoinThreshold", "67108864")
11-[掌握]-实时ETL开发之流计算程序【测试】
任务:运行编写流式计算程序,实时从Kafka消费数据,打印到控制台上。
- 1)、第一步、启动Kafka消息队列,安装node2.itcast.cn,使用CM界面启动
- 2)、启动MySQL数据库和Canal采集CRM系统业务数据
使用VMWare 启动node1.itcast.cn虚拟机,使用root用户(密码123456)登录
1) 启动MySQL数据库
# 查看容器
[root@node1 ~]# docker ps -a
8b5cd2152ed9 mysql:5.7 0.0.0.0:3306->3306/tcp mysql
# 启动容器
[root@node1 ~]# docker start mysql
myoracle
# 容器状态
[root@node1 ~]# docker ps
8b5cd2152ed9 mysql:5.7 Up 6 minutes 0.0.0.0:3306->3306/tcp mysql
2) 启动CanalServer服务
# 查看容器
[root@node1 ~]# docker ps -a
28888fad98c9 canal/canal-server:v1.1.2 0.0.0.0:11111->11111/tcp canal-server
# 启动容器
[root@node1 ~]# docker start canal-server
myoracle
# 容器状态
[root@node1 ~]# docker ps
28888fad98c9 canal/canal-server:v1.1.2 Up 2 minutes 0.0.0.0:11111->11111/tcp canal-server
# 进入容器
[root@node1 ~]# docker exec -it canal-server /bin/bash
[root@28888fad98c9 admin]#
# 进入CanalServer启动脚本目录
[root@28888fad98c9 admin]# cd canal-server/bin/
# 重启CanalServer服务
[root@28888fad98c9 bin]# ./restart.sh
# 退出容器
[root@28888fad98c9 bin]# exit
- 3)、启动流式应用程序,对MySQL数据库中CRM系统表数据进行更新和删除
测试运行流式计算程序时,检查本地Checkpoint目录是否存在,如果存在,将其删除。
可以启动Oracle数据库和OGG服务,测试是否消费数据,此处省略。
12-[掌握]-实时ETL开发之实时业务数据测试
任务:运行数据模拟生成器程序,实时向CRM系统或Logistics物流系统插入数据,Canal和OGG采集,流式程序实时消费,以实时CRM系统为例,实时向CRM系统写入数据
- 1)、查看CRM系统数据模拟生成器程序【
MockCrmDataApp
】,修改【isClean=true
】,先删除表中数据,再实时插入数据。
运行数据模拟生成器程序,实时产生数据。
- 2)、运行流式计算程序,查看控制台界面,实时消费Kafka数据
针对物流系统Logistics来说,可以采取同样方式实时产生数据,进行消费。
运行模拟数据生成器:
MockLogisticsDataApp
,吸怪【isClean=true
】表示先清空表的数据,再删除。
- 点赞
- 收藏
- 关注作者
评论(0)