Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
本文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。
一、知识储备
在生产环境中,系统可能会定期从与业务相关的关系型数据库向Hadoop导入数据,导入数仓后进行后续离线分析。故我们此时不可能再将所有数据重新导一遍,此时我们就需要增量数据导入这一模式了。
增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式),增量导入使用到的核心参数主要是:
–check-column
用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.
注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列
–incremental
用来指定增量导入的模式,两种模式分别为Append和Lastmodified
–last-value
指定上一次导入中检查列指定字段最大值
接下来通过具体实验来详细说明
1、Append模式增量导入
重要参数:
–incremental append
基于递增列的增量导入(将递增列值大于阈值的所有数据增量导入Hadoop)
–check-column
递增列(int)
–last-value
阈值(int)
举个简单例子,在oracle库scott用户下有一张员工表(inr_app),表中有:自增主键员工编号(empno),员工名(ename),员工职位(job),员工薪资(sal)这几个属性,如下:
--在oracle库scott下创建一个这样的表
create table inr_app as
select rownum as empno, ename, job, sal
from emp a
where job is not null
and rownum<=5;
--查询:
select * from inr_app;
EMPNO ENAME JOB SAL
1 er CLERK 800.00
2 ALLEN SALESMAN 1600.00
3 WARD SALESMAN 1250.00
4 JONES MANAGER 2975.00
5 MARTIN SALESMAN 1250.00
我们需要将新进员工也导入hadoop以供公司人力部门做分析,此时我们需要将这个表数据导入到hive,也就是增量导入前的一次全量导入:
--在hive创建表:
create table INR_APP
(
empno int,
ename string,
job string,
sal float
);
hive> show tables;
OK
inr_app
inr_emp
ora_hive
Time taken: 0.166 seconds, Fetched: 3 row(s)
--接下来执行全量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_APP -m 1 --hive-import --hive-database oracle
--查询hive表
hive> select * from inr_app;
OK
1 er CLERK 800.0
2 ALLEN SALESMAN 1600.0
3 WARD SALESMAN 1250.0
4 JONES MANAGER 2975.0
5 MARTIN SALESMAN 1250.0
Time taken: 0.179 seconds, Fetched: 5 row(s)
过了一段时间后,公司又新来一批员工,我们需要将新员工也导入到hadoop供有关部门分析,此时我们只需要指定–incremental 参数为append,–last-value参数为5可。表示只从id大于5后开始导入:
--先给oracle库scott.inr_app插入几条数据:
insert into inr_app values(6,'zhao','DBA',100);
insert into inr_app values(7,'yan','BI',100);
insert into inr_app values(8,'dong','JAVA',100);
commit;
--执行增量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_APP -m 1 --hive-import --hive-database oracle --incremental app
end --check-column EMPNO --last-value 5
Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /hadoop/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
19/03/12 19:45:55 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
19/03/12 19:45:56 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/03/12 19:45:56 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
19/03/12 19:45:56 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
19/03/12 19:45:56 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
19/03/12 19:45:56 INFO manager.SqlManager: Using default fetchSize of 1000
19/03/12 19:45:56 INFO tool.CodeGenTool: Beginning code generation
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/hadoop/hbase/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/hadoop/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/03/12 19:45:57 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:57 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM INR_APP t WHERE 1=0
19/03/12 19:45:57 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /hadoop
Note: /tmp/sqoop-root/compile/9b898359374ea580a390b32da1a37949/INR_APP.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/03/12 19:45:59 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/9b898359374ea580a390b32da1a37949/INR_APP.jar
19/03/12 19:45:59 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:59 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(EMPNO) FROM INR_APP
19/03/12 19:45:59 INFO tool.ImportTool: Incremental import based on column EMPNO
19/03/12 19:45:59 INFO tool.ImportTool: Lower bound value: 5
19/03/12 19:45:59 INFO tool.ImportTool: Upper bound value: 8
19/03/12 19:45:59 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:59 INFO mapreduce.ImportJobBase: Beginning import of INR_APP
19/03/12 19:46:00 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/03/12 19:46:00 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:46:01 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/03/12 19:46:01 INFO client.RMProxy: Connecting to ResourceManager at /192.168.1.66:8032
19/03/12 19:46:04 INFO db.DBInputFormat: Using read commited transaction isolation
19/03/12 19:46:04 INFO mapreduce.JobSubmitter: number of splits:1
19/03/12 19:46:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1552371714699_0010
19/03/12 19:46:05 INFO impl.YarnClientImpl: Submitted application application_1552371714699_0010
19/03/12 19:46:05 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1552371714699_0010/
19/03/12 19:46:05 INFO mapreduce.Job: Running job: job_1552371714699_0010
19/03/12 19:46:13 INFO mapreduce.Job: Job job_1552371714699_0010 running in uber mode : false
19/03/12 19:46:13 INFO mapreduce.Job: map 0% reduce 0%
19/03/12 19:46:21 INFO mapreduce.Job: map 100% reduce 0%
19/03/12 19:46:21 INFO mapreduce.Job: Job job_1552371714699_0010 completed successfully
19/03/12 19:46:21 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=143702
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=44
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=4336
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=4336
Total vcore-milliseconds taken by all map tasks=4336
Total megabyte-milliseconds taken by all map tasks=4440064
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=92
CPU time spent (ms)=2760
Physical memory (bytes) snapshot=211570688
Virtual memory (bytes) snapshot=2133770240
Total committed heap usage (bytes)=106954752
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=44
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Transferred 44 bytes in 20.3436 seconds (2.1628 bytes/sec)
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Retrieved 3 records.
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Publishing Hive/Hcat import job data to Listeners for table INR_APP
19/03/12 19:46:21 INFO util.AppendUtils: Creating missing output directory - INR_APP
19/03/12 19:46:21 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:46:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM INR_APP t WHERE 1=0
19/03/12 19:46:21 WARN hive.TableDefWriter: Column EMPNO had to be cast to a less precise type in Hive
19/03/12 19:46:21 WARN hive.TableDefWriter: Column SAL had to be cast to a less precise type in Hive
19/03/12 19:46:21 INFO hive.HiveImport: Loading uploaded data into Hive
19/03/12 19:46:21 INFO conf.HiveConf: Found configuration file file:/hadoop/hive/conf/hive-site.xml
Logging initialized using configuration in jar:file:/hadoop/hive/lib/hive-common-2.3.2.jar!/hive-log4j2.properties Async: true
19/03/12 19:46:24 INFO SessionState:
Logging initialized using configuration in jar:file:/hadoop/hive/lib/hive-common-2.3.2.jar!/hive-log4j2.properties Async: true
19/03/12 19:46:24 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Created local directory: /hadoop/hive/tmp/root/2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381/_tmp_space.db
19/03/12 19:46:24 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Updating thread name to 2968942b-30b6-49f5-b86c-d71a77963381 main
19/03/12 19:46:24 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO ql.Driver: Compiling command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139): CREATE TABLE IF NOT EXISTS `oracle`.`INR_APP` ( `EMPNO` DOUBLE, `ENAME
` STRING, `JOB` STRING, `SAL` DOUBLE) COMMENT 'Imported by sqoop on 2019/03/12 11:46:21' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE19/03/12 19:46:27 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:27 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:27 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:27 INFO parse.CalcitePlanner: Starting Semantic Analysis
19/03/12 19:46:27 INFO parse.CalcitePlanner: Creating table oracle.INR_APP position=27
19/03/12 19:46:27 INFO ql.Driver: Semantic Analysis Completed
19/03/12 19:46:27 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
19/03/12 19:46:27 INFO ql.Driver: Completed compiling command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139); Time taken: 2.876 seconds
19/03/12 19:46:27 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager
19/03/12 19:46:27 INFO ql.Driver: Executing command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139): CREATE TABLE IF NOT EXISTS `oracle`.`INR_APP` ( `EMPNO` DOUBLE, `ENAME
` STRING, `JOB` STRING, `SAL` DOUBLE) COMMENT 'Imported by sqoop on 2019/03/12 11:46:21' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE19/03/12 19:46:27 INFO sqlstd.SQLStdHiveAccessController: Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext [sessionString=2968942b-30b6-49f5-b86c-d71a7796338
1, clientType=HIVECLI]19/03/12 19:46:27 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
19/03/12 19:46:27 INFO hive.metastore: Mestastore configuration hive.metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.
hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook19/03/12 19:46:27 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:27 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:27 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:27 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:27 INFO ql.Driver: Completed executing command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139); Time taken: 0.096 seconds
OK
19/03/12 19:46:27 INFO ql.Driver: OK
Time taken: 2.982 seconds
19/03/12 19:46:27 INFO CliDriver: Time taken: 2.982 seconds
19/03/12 19:46:27 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:27 INFO session.SessionState: Resetting thread name to main
19/03/12 19:46:27 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:27 INFO session.SessionState: Updating thread name to 2968942b-30b6-49f5-b86c-d71a77963381 main
19/03/12 19:46:27 INFO ql.Driver: Compiling command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2):
LOAD DATA INPATH 'hdfs://192.168.1.66:9000/user/root/INR_APP' INTO TABLE `oracle`.`INR_APP`
19/03/12 19:46:28 INFO ql.Driver: Semantic Analysis Completed
19/03/12 19:46:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
19/03/12 19:46:28 INFO ql.Driver: Completed compiling command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2); Time taken: 0.421 seconds
19/03/12 19:46:28 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager
19/03/12 19:46:28 INFO ql.Driver: Executing command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2):
LOAD DATA INPATH 'hdfs://192.168.1.66:9000/user/root/INR_APP' INTO TABLE `oracle`.`INR_APP`
19/03/12 19:46:28 INFO ql.Driver: Starting task [Stage-0:MOVE] in serial mode
19/03/12 19:46:28 INFO hive.metastore: Closed a connection to metastore, current connections: 0
Loading data to table oracle.inr_app
19/03/12 19:46:28 INFO exec.Task: Loading data to table oracle.inr_app from hdfs://192.168.1.66:9000/user/root/INR_APP
19/03/12 19:46:28 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:28 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:28 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:28 ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
19/03/12 19:46:28 INFO ql.Driver: Starting task [Stage-1:STATS] in serial mode
19/03/12 19:46:28 INFO exec.StatsTask: Executing stats task
19/03/12 19:46:28 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:28 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:28 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:28 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:29 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:29 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:29 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:29 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:29 INFO exec.StatsTask: Table oracle.inr_app stats: [numFiles=2, numRows=0, totalSize=146, rawDataSize=0]
19/03/12 19:46:29 INFO ql.Driver: Completed executing command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2); Time taken: 0.992 seconds
OK
19/03/12 19:46:29 INFO ql.Driver: OK
Time taken: 1.415 seconds
19/03/12 19:46:29 INFO CliDriver: Time taken: 1.415 seconds
19/03/12 19:46:29 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:29 INFO session.SessionState: Resetting thread name to main
19/03/12 19:46:29 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:29 INFO session.SessionState: Deleted directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381 on fs with scheme hdfs
19/03/12 19:46:29 INFO session.SessionState: Deleted directory: /hadoop/hive/tmp/root/2968942b-30b6-49f5-b86c-d71a77963381 on fs with scheme file
19/03/12 19:46:29 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:29 INFO hive.HiveImport: Hive import complete.
19/03/12 19:46:29 INFO hive.HiveImport: Export directory is empty, removing it.
19/03/12 19:46:29 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
19/03/12 19:46:29 INFO tool.ImportTool: --incremental append
19/03/12 19:46:29 INFO tool.ImportTool: --check-column EMPNO
19/03/12 19:46:29 INFO tool.ImportTool: --last-value 8
19/03/12 19:46:29 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
查询hive表
hive> select * from inr_app;
OK
1 er CLERK 800.0
2 ALLEN SALESMAN 1600.0
3 WARD SALESMAN 1250.0
4 JONES MANAGER 2975.0
5 MARTIN SALESMAN 1250.0
6 zhao DBA 100.0
7 yan BI 100.0
8 dong JAVA 100.0
Time taken: 0.165 seconds, Fetched: 8 row(s)
已经增量过来了,我们也可以使用hdfs dfs -cat查看生成的数据文件,生成的数据文件位置在之前配置hadoop环境时已经配置,读者也可以通过自己访问自己环境:IP:50070/explorer.html#/查询
[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/oracle.db/inr_app/part-m-00000_copy_1
6zhaoDBA100
7yanBI100
8dongJAVA100
至于之前全量的数据,也可以看到:
[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/oracle.db/inr_app/part-m-00000
1erCLERK800
2ALLENSALESMAN1600
3WARDSALESMAN1250
4JONESMANAGER2975
5MARTINSALESMAN1250
2、、lastModify增量导入
lastModify增量导入又分为两种模式:
a、–incremental append 附加模式
b、–incremental --merge-key合并模式
接下来继续看实验:
实验一:附加模式
此方式要求原有表中有time字段,它能指定一个时间戳,让Sqoop把该时间戳之后的数据导入至Hadoop(这里为HDFS)。因为后续员工薪资可能状态会变化,变化后time字段时间戳也会变化,此时Sqoop依然会将相同状态更改后的员工信息导入HDFS,因此为导致数据重复。
先在oracle库基于scott.inr_app新建一个带时间列etltime的表inr_las,初始化已有数据时间为sysdate
create table inr_las as select a.empno,
a.ename,
a.job,
a.sal,
sysdate as etltime
from inr_app a;
select * from inr_las;
EMPNO ENAME JOB SAL ETLTIME
1 er CLERK 800.00 2019/3/20 10:42:27
2 ALLEN SALESMAN 1600.00 2019/3/20 10:42:27
3 WARD SALESMAN 1250.00 2019/3/20 10:42:27
4 JONES MANAGER 2975.00 2019/3/20 10:42:27
5 MARTIN SALESMAN 1250.00 2019/3/20 10:42:27
6 zhao DBA 100.00 2019/3/20 10:42:27
7 yan BI 100.00 2019/3/20 10:42:27
8 dong JAVA 100.00 2019/3/20 10:42:27
在hive创建表,这里统一指定列分隔符为’\t’,后面导入也是以此为分隔符:
create table INR_LAS
(
empno int,
ename string,
job string,
sal float,
etltime string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
初始化全量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS -m 1 --hive-import --hive-database oracle --fields-terminated-by '\t' --lines-terminated-by '\n'
查询hive表:
hive> select * from inr_las;
OK
1 er CLERK 800.0 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600.0 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250.0 2019-03-20 10:42:27.0
4 JONES MANAGER 2975.0 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250.0 2019-03-20 10:42:27.0
6 zhao DBA 100.0 2019-03-20 10:42:27.0
7 yan BI 100.0 2019-03-20 10:42:27.0
8 dong JAVA 100.0 2019-03-20 10:42:27.0
Time taken: 0.181 seconds, Fetched: 8 row(s)
这次增量导入我们先使用–incremental lastmodified --last-value --append 看下效果,首先在源端对inr_las表数据做下变更:
update inr_las set sal=1000,etltime=sysdate where empno=6;
commit;
select * from inr_las;
EMPNO ENAME JOB SAL ETLTIME
1 er CLERK 800.00 2019/3/20 10:42:27
2 ALLEN SALESMAN 1600.00 2019/3/20 10:42:27
3 WARD SALESMAN 1250.00 2019/3/20 10:42:27
4 JONES MANAGER 2975.00 2019/3/20 10:42:27
5 MARTIN SALESMAN 1250.00 2019/3/20 10:42:27
6 zhao DBA 1000.00 2019/3/20 10:52:34
7 yan BI 100.00 2019/3/20 10:42:27
8 dong JAVA 100.00 2019/3/20 10:42:27
接下来增量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS --fields-terminated-by '\t' --lines-terminated-by '\n' --hive-import --hive-database oracle --hive-table INR_LAS --incremental append --check-column ETLTIME --last-value '2019-03-20 10:42:27' -m 1 --null-string '\\N' --null-non-string '\\N'
Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.Please set $ACCUMULO_HOME to the root of your Accumulo installation. '2019-03Warning: /hadoop/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
19/03/13 14:46:26 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
查询hive表
hive> select * from inr_las;
OK
1 er CLERK 800.0 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600.0 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250.0 2019-03-20 10:42:27.0
4 JONES MANAGER 2975.0 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250.0 2019-03-20 10:42:27.0
6 zhao DBA 100.0 2019-03-20 10:42:27.0
7 yan BI 100.0 2019-03-20 10:42:27.0
8 dong JAVA 100.0 2019-03-20 10:42:27.0
6 zhao DBA 1000.0 2019-03-20 10:52:34.0
Time taken: 0.171 seconds, Fetched: 9 row(s)
通过上面查询结果可以看到,empno=6的这个员工薪资和etltime记录变更时间都变化后,根据上一次全量初始化后的最大时间来做增量的起始时间去源端oracle查数时候,发现了新的发生变化的数据,然后将它最新状态抽到了hive,采用的追加方式,因此hive里存了两条记录,导致了数据重复,根据时间可以取最新的状态来获取最新数据状态。
实验二:合并模式
接着上面实验环境继续做,这次采用合并模式来看看效果:
--先看下当前的源端oracle数据:
EMPNO ENAME JOB SAL ETLTIME
1 er CLERK 800.00 2019/3/20 10:42:27
2 ALLEN SALESMAN 1600.00 2019/3/20 10:42:27
3 WARD SALESMAN 1250.00 2019/3/20 10:42:27
4 JONES MANAGER 2975.00 2019/3/20 10:42:27
5 MARTIN SALESMAN 1250.00 2019/3/20 10:42:27
6 zhao DBA 1000.00 2019/3/20 10:52:34
7 yan BI 100.00 2019/3/20 10:42:27
8 dong JAVA 200.00 2019/3/21 17:12:46
先把前面的hive表给删了
hive> drop table inr_las;
OK
Time taken: 0.195 seconds
创建为外部表
hive>create table INR_LAS
(
empno int,
ename string,
job string,
sal float,
etltime string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
location '/user/hive/warehouse/exter_inr_las';
OK
Time taken: 0.226 seconds
注意,/user/hive/warehouse/exter_inr_las这个目录在第一次全量初始化时不要存在,它会自己创建,如果存在会报目录已存在错误:
ERROR tool.ImportTool: Import failed: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.1.66:9000/user/hive/warehouse/exter_inr_las alre
ady exists
这时候应该先删除一次这个目录:
[root@hadoop ~]# hadoop fs -rmr /user/hive/warehouse/exter_inr_las
rmr: DEPRECATED: Please use 'rm -r' instead.
19/03/13 22:05:33 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/hive/warehouse/exter_inr_las
接下来全量导入一次:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS -m 1 --target-dir /user/hive/warehouse/exter_inr_las --fiel
ds-terminated-by '\t'Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/03/13 22:05:48 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
19/03/13 22:05:48 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
查看一下hdfs此文件夹下文件:
[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/exter_inr_las/part-m-00000
1 er CLERK 800 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250 2019-03-20 10:42:27.0
4 JONES MANAGER 2975 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250 2019-03-20 10:42:27.0
6 zhao DBA 1000 2019-03-20 10:52:34.0
7 yan BI 100 2019-03-20 10:42:27.0
8 dong JAVA 200 2019-03-21 17:12:46.0
查一下hive表:
hive> select * from inr_las;
OK
1 er CLERK 800.0 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600.0 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250.0 2019-03-20 10:42:27.0
4 JONES MANAGER 2975.0 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250.0 2019-03-20 10:42:27.0
6 zhao DBA 1000.0 2019-03-20 10:52:34.0
7 yan BI 100.0 2019-03-20 10:42:27.0
8 dong JAVA 200.0 2019-03-21 17:12:46.0
Time taken: 0.191 seconds, Fetched: 8 row(s)
接下来修改一下oracle的数据:
update inr_las set sal=400 ,etltime=sysdate where empno=8;
commit;
select * from inr_las;
EMPNO ENAME JOB SAL ETLTIME
1 er CLERK 800.00 2019/3/20 10:42:27
2 ALLEN SALESMAN 1600.00 2019/3/20 10:42:27
3 WARD SALESMAN 1250.00 2019/3/20 10:42:27
4 JONES MANAGER 2975.00 2019/3/20 10:42:27
5 MARTIN SALESMAN 1250.00 2019/3/20 10:42:27
6 zhao DBA 1000.00 2019/3/20 10:52:34
7 yan BI 100.00 2019/3/20 10:42:27
8 dong JAVA 400.00 2019/3/21 17:47:03--已经更改了
接下来做合并模式增量:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS --fields-terminated-by '\t' --lines-terminated-by '\n' --t
arget-dir /user/hive/warehouse/exter_inr_las -m 1 --check-column ETLTIME --incremental lastmodified --merge-key EMPNO --last-value "2019-03-21 17:12:46"Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/03/13 22:18:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
这时候去看下/user/hive/warehouse/exter_inr_las/内容,你会发现part-m-00000变成了part-r-00000,意思是做了reduce:
root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/exter_inr_las/part-r-00000
1 er CLERK 800 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250 2019-03-20 10:42:27.0
4 JONES MANAGER 2975 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250 2019-03-20 10:42:27.0
6 zhao DBA 1000 2019-03-20 10:52:34.0
7 yan BI 100 2019-03-20 10:42:27.0
8 dong JAVA 400 2019-03-21 17:47:03.0
发现empno=8的记录的确做了变更了,增量同步成功,去看下hive表:
hive> select * from inr_las;
OK
1 er CLERK 800.0 2019-03-20 10:42:27.0
2 ALLEN SALESMAN 1600.0 2019-03-20 10:42:27.0
3 WARD SALESMAN 1250.0 2019-03-20 10:42:27.0
4 JONES MANAGER 2975.0 2019-03-20 10:42:27.0
5 MARTIN SALESMAN 1250.0 2019-03-20 10:42:27.0
6 zhao DBA 1000.0 2019-03-20 10:52:34.0
7 yan BI 100.0 2019-03-20 10:42:27.0
8 dong JAVA 400.0 2019-03-21 17:47:03.0
Time taken: 0.196 seconds, Fetched: 8 row(s)
没问题。由于篇幅原因,sqoop job的使用及增量脚本定时同步数据的案例写在了下一篇文章
- 点赞
- 收藏
- 关注作者
评论(0)