大数据物流项目:实时增量ETL存储Kudu(七)
Logistics_Day07:实时增量ETL存储Kudu
01-[复习]-上次课程内容回顾
主要讲解:
Kudu 存储引擎
,类似HBase数据库,存储数据,诞生目的:取代HDFS和HBase,既能够实现随机读写数据,又能够批量加载分析。
- 1)、针对海量数据随机读写,实现HBase数据库功能
- 2)、针对海量数据批量加载,尤其列式存储
Parquet
Kudu框架诞生之初,考虑与分析引擎集成整合,Cloudera公司开源框架:Impala(基于内存分析引擎)和Apache Spark计算引擎集成。==
Kudu
是存储引擎,OLAP分析数据库,准实时分析==
通过思维导图,Kudu内容提纲:
多多熟悉Kudu API使用,无论Java Client API还是与Spark集成。
02-[了解]-第7天:课程内容提纲
主要讲解:物流项目开发环境搭建和编写流式计算程序公共接口:编写流式程序,实时从Kafka消费采集业务数据,对其进行ETL转换处理,最终存储到存储引擎(Kudu、Es、ClickHouse)。
- 1)、数据源source:从Kafka中消费不同业务数据存储数据topic
- 物流系统Logistics:使用OGG采集,存储JSON字符串
- CRM系统:使用Canal采集,存储JSON字符串
- 2)、数据转换Transformation:将获取JSON字符串进行解析,封装实体类JavaBean对象中
物流项目来说,进行数据实时ETL操作,进行封装抽象,采用Scala编程,模拟实时产生的数据,进行测试。
03-[理解]-项目准备之开发环境初始化
由于开发项目时,在Windows系统开发,主要编写Spark程序,涉及使用HADOOP中HDFS文件系统API,在Windows开发时,需要配置:
winutils.exe
和hadoop.dll
。Windows binaries for Hadoop versions:https://github.com/cdarlint/winutils
- 1)、配置
HADOOP_HOME
比如,讲师解压目录
配置Windows系统环境变量:HADOOP_HOME
- 2)、
hadoop.dll
拷贝
注意:配置完成以后,建议重启电脑;当然,如果你不配置的话,运行Spark程序时,可能会报错。
04-[理解]-项目初始化之创建Maven工程及模块
首先,
创建Maven工程和模块
,再进行添加依赖和创建包和导入工具类。
创建完Maven工程以后,截图如下所示:
- 1)、创建项目Maven Parent父工程,删除工程的
src
目录
配置Maven仓库:安装目录、setting配置文件和repository目录
- 2)、创建
logistics-common
公共模块
- 3)、创建
logistics-etl
实时ETL处理模块
- 4)、创建
logistics-offline
离线指标计算模块
05-[理解]-项目初始化之导入POM依赖
接下来:将父工程和各个Maven Module添加pom文件依赖
- 1)、父工程【
itcast-logistics-parent
】依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>mvnrepository</id>
<url>https://mvnrepository.com/</url>
<!--<layout>default</layout>-->
</repository>
<repository>
<id>elastic.co</id>
<url>https://artifacts.elastic.co/maven</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- SDK -->
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<!-- Junit -->
<junit.version>4.12</junit.version>
<!-- HTTP Version -->
<http.version>4.5.11</http.version>
<!-- Hadoop -->
<hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
<!-- Spark -->
<spark.version>2.4.0-cdh6.2.1</spark.version>
<!-- <spark.version>2.4.0</spark.version>-->
<!-- Spark Graph Visual -->
<gs.version>1.3</gs.version>
<breeze.version>1.0</breeze.version>
<jfreechart.version>1.5.0</jfreechart.version>
<!-- Parquet -->
<parquet.version>1.9.0-cdh6.2.1</parquet.version>
<!-- Kudu -->
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
<!-- Hive -->
<hive.version>2.1.1-cdh6.2.1</hive.version>
<!-- Kafka -->
<!--<kafka.version>2.1.0-cdh6.2.1</kafka.version>-->
<kafka.version>2.1.0</kafka.version>
<!-- ClickHouse -->
<clickhouse.version>0.2.2</clickhouse.version>
<!-- ElasticSearch -->
<es.version>7.6.1</es.version>
<!-- JSON Version -->
<fastjson.version>1.2.62</fastjson.version>
<!-- Apache Commons Version -->
<commons-io.version>2.6</commons-io.version>
<commons-lang3.version>3.10</commons-lang3.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<!-- JDBC Drivers Version-->
<ojdbc.version>12.2.0.1</ojdbc.version>
<mysql.version>5.1.44</mysql.version>
<!-- Other -->
<jtuple.version>1.2</jtuple.version>
<!-- Maven Plugins Version -->
<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
<maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- JDBC -->
<dependency>
<groupId>com.oracle.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
<systemPath>D:/BigdataUser/jdbc-drivers/ojdbc8-12.2.0.1.jar</systemPath>
<scope>system</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- Http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${http.version}</version>
</dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<!-- Graph Visual -->
<dependency>
<groupId>org.graphstream</groupId>
<artifactId>gs-core</artifactId>
<version>${gs.version}</version>
</dependency>
<dependency>
<groupId>org.graphstream</groupId>
<artifactId>gs-ui</artifactId>
<version>${gs.version}</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.version}</artifactId>
<version>${breeze.version}</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze-viz_${scala.version}</artifactId>
<version>${breeze.version}</version>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>${jfreechart.version}</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<!-- Clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- ElasticSearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>${es.version}</version>
</dependency>
<!-- Alibaba Json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>${commons-beanutils.version}</version>
</dependency>
<!-- Other -->
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
<version>${jtuple.version}</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
-->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
</dependencyManagement>
由于Oracle数据库驱动包,在Maven仓库中是没有,可以设置驱动在系统本地存储:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O1RRtLh7-1652004162158)(/img/1616033938957.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y2BnFvB8-1652004162158)(/img/1616033881167.png)]
- 2)、公共模块【
logistics-common
】依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- JDBC -->
<dependency>
<groupId>com.oracle.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<scope>system</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Java Tuples -->
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
</dependency>
<!-- Alibaba Json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
</dependency>
<!-- Graph Visual -->
<dependency>
<groupId>org.graphstream</groupId>
<artifactId>gs-core</artifactId>
</dependency>
<dependency>
<groupId>org.graphstream</groupId>
<artifactId>gs-ui</artifactId>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze-viz_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
</dependency>
<!-- Kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
</dependency>
<!-- Clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
<!-- ElasticSearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
</dependency>
-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 3)、实时ETL模块【
logistics-etl
】依赖
<repositories>
<repository>
<id>mvnrepository</id>
<url>https://mvnrepository.com/</url>
<layout>default</layout>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>elastic.co</id>
<url>https://artifacts.elastic.co/maven</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>cn.itcast.logistics</groupId>
<artifactId>logistics-common</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- Structured Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- Other -->
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
</dependency>
<!-- kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 4)、离线指标计算模块【
logistics-offline
】依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>cn.itcast.logistics</groupId>
<artifactId>logistics-common</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- Structured Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
</dependency>
<!-- Other -->
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
当工程Project和模块Module添加pom依赖以后,刷新这个工程,添加相关依赖jar包,最好在每个模块下,创建测试类,运行程序看是否成功。
按照Maven管理工程目录结构,创建相应目录,如上图所示,编写测试程序
CommonAppTest
06-[掌握]-项目初始化之导入数据生成器模块
任务:将项目模拟生成数据 模块导入至MavenProject工程中,具体步骤如下所述:
- 1)、解压【
logistics-generate.zip
】模块到Maven Project
目录【D:\Logistics_New\itcast-logistics-parent
】下
- 2)、显示导入模块到Maven Project工程中
选择,前面解压的模块,点击一步,直到结束
- 3)、在Maven Project工程
pom.xml
文件中,手动添加该模块为父工程的子模块。
至此结束,将项目数据模拟生成器模块导入至Maven Projet OK。
- 4)、初始化操作:将
table-data
目录一定设置为资源目录
相关代码功能说明:
- 点赞
- 收藏
- 关注作者
评论(0)