Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档

举报
赵延东的一亩三分地 发表于 2023/03/25 12:02:02 2023/03/25
【摘要】 在生产环境中,系统可能会定期从与业务相关的关系型数据库向Hadoop导入数据,导入数仓后进行后续离线分析。故我们此时不可能再将所有数据重新导一遍,此时我们就需要增量数据导入这一模式了。 增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式),增量导入使用到的核心参数主要是: –check-column

本文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。

一、知识储备

在生产环境中,系统可能会定期从与业务相关的关系型数据库向Hadoop导入数据,导入数仓后进行后续离线分析。故我们此时不可能再将所有数据重新导一遍,此时我们就需要增量数据导入这一模式了。
增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式),增量导入使用到的核心参数主要是:
–check-column
用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.
注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列
–incremental
用来指定增量导入的模式,两种模式分别为Append和Lastmodified
–last-value
指定上一次导入中检查列指定字段最大值
接下来通过具体实验来详细说明

1、Append模式增量导入

重要参数:
–incremental append
基于递增列的增量导入(将递增列值大于阈值的所有数据增量导入Hadoop)
–check-column
递增列(int)
–last-value
阈值(int)
举个简单例子,在oracle库scott用户下有一张员工表(inr_app),表中有:自增主键员工编号(empno),员工名(ename),员工职位(job),员工薪资(sal)这几个属性,如下:

--在oracle库scott下创建一个这样的表
create table inr_app as 
select rownum as empno, ename, job, sal
  from emp a
 where job is not null
and rownum<=5;
--查询:
select * from inr_app;
EMPNO	ENAME	JOB	        SAL
1	    er	    CLERK	    800.00
2	    ALLEN	SALESMAN	1600.00
3	    WARD	SALESMAN	1250.00
4	    JONES	MANAGER    	2975.00
5	    MARTIN	SALESMAN	1250.00

我们需要将新进员工也导入hadoop以供公司人力部门做分析,此时我们需要将这个表数据导入到hive,也就是增量导入前的一次全量导入:

--在hive创建表:
create table INR_APP
(
  empno int,
  ename string,
  job   string,
  sal   float
);
hive> show tables;
OK
inr_app
inr_emp
ora_hive
Time taken: 0.166 seconds, Fetched: 3 row(s)
--接下来执行全量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_APP -m 1 --hive-import --hive-database oracle
--查询hive表
hive> select * from inr_app;
OK
1	er	CLERK	800.0
2	ALLEN	SALESMAN	1600.0
3	WARD	SALESMAN	1250.0
4	JONES	MANAGER	2975.0
5	MARTIN	SALESMAN	1250.0
Time taken: 0.179 seconds, Fetched: 5 row(s)

过了一段时间后,公司又新来一批员工,我们需要将新员工也导入到hadoop供有关部门分析,此时我们只需要指定–incremental 参数为append,–last-value参数为5可。表示只从id大于5后开始导入:

--先给oracle库scott.inr_app插入几条数据:
insert into inr_app values(6,'zhao','DBA',100);
insert into inr_app values(7,'yan','BI',100);
insert into inr_app values(8,'dong','JAVA',100);
commit;
--执行增量导入:
[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_APP -m 1 --hive-import --hive-database oracle --incremental app
end --check-column EMPNO --last-value 5 
Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /hadoop/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
19/03/12 19:45:55 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
19/03/12 19:45:56 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/03/12 19:45:56 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
19/03/12 19:45:56 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
19/03/12 19:45:56 INFO oracle.OraOopManagerFactory: Data Connector for Oracle and Hadoop is disabled.
19/03/12 19:45:56 INFO manager.SqlManager: Using default fetchSize of 1000
19/03/12 19:45:56 INFO tool.CodeGenTool: Beginning code generation
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/hadoop/hbase/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/hadoop/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/03/12 19:45:57 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:57 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM INR_APP t WHERE 1=0
19/03/12 19:45:57 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /hadoop
Note: /tmp/sqoop-root/compile/9b898359374ea580a390b32da1a37949/INR_APP.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
19/03/12 19:45:59 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/9b898359374ea580a390b32da1a37949/INR_APP.jar
19/03/12 19:45:59 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:59 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(EMPNO) FROM INR_APP
19/03/12 19:45:59 INFO tool.ImportTool: Incremental import based on column EMPNO
19/03/12 19:45:59 INFO tool.ImportTool: Lower bound value: 5
19/03/12 19:45:59 INFO tool.ImportTool: Upper bound value: 8
19/03/12 19:45:59 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:45:59 INFO mapreduce.ImportJobBase: Beginning import of INR_APP
19/03/12 19:46:00 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
19/03/12 19:46:00 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:46:01 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
19/03/12 19:46:01 INFO client.RMProxy: Connecting to ResourceManager at /192.168.1.66:8032
19/03/12 19:46:04 INFO db.DBInputFormat: Using read commited transaction isolation
19/03/12 19:46:04 INFO mapreduce.JobSubmitter: number of splits:1
19/03/12 19:46:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1552371714699_0010
19/03/12 19:46:05 INFO impl.YarnClientImpl: Submitted application application_1552371714699_0010
19/03/12 19:46:05 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1552371714699_0010/
19/03/12 19:46:05 INFO mapreduce.Job: Running job: job_1552371714699_0010
19/03/12 19:46:13 INFO mapreduce.Job: Job job_1552371714699_0010 running in uber mode : false
19/03/12 19:46:13 INFO mapreduce.Job:  map 0% reduce 0%
19/03/12 19:46:21 INFO mapreduce.Job:  map 100% reduce 0%
19/03/12 19:46:21 INFO mapreduce.Job: Job job_1552371714699_0010 completed successfully
19/03/12 19:46:21 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=143702
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=87
		HDFS: Number of bytes written=44
		HDFS: Number of read operations=4
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Other local map tasks=1
		Total time spent by all maps in occupied slots (ms)=4336
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=4336
		Total vcore-milliseconds taken by all map tasks=4336
		Total megabyte-milliseconds taken by all map tasks=4440064
	Map-Reduce Framework
		Map input records=3
		Map output records=3
		Input split bytes=87
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=92
		CPU time spent (ms)=2760
		Physical memory (bytes) snapshot=211570688
		Virtual memory (bytes) snapshot=2133770240
		Total committed heap usage (bytes)=106954752
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=44
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Transferred 44 bytes in 20.3436 seconds (2.1628 bytes/sec)
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Retrieved 3 records.
19/03/12 19:46:21 INFO mapreduce.ImportJobBase: Publishing Hive/Hcat import job data to Listeners for table INR_APP
19/03/12 19:46:21 INFO util.AppendUtils: Creating missing output directory - INR_APP
19/03/12 19:46:21 INFO manager.OracleManager: Time zone has been set to GMT
19/03/12 19:46:21 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM INR_APP t WHERE 1=0
19/03/12 19:46:21 WARN hive.TableDefWriter: Column EMPNO had to be cast to a less precise type in Hive
19/03/12 19:46:21 WARN hive.TableDefWriter: Column SAL had to be cast to a less precise type in Hive
19/03/12 19:46:21 INFO hive.HiveImport: Loading uploaded data into Hive
19/03/12 19:46:21 INFO conf.HiveConf: Found configuration file file:/hadoop/hive/conf/hive-site.xml

Logging initialized using configuration in jar:file:/hadoop/hive/lib/hive-common-2.3.2.jar!/hive-log4j2.properties Async: true
19/03/12 19:46:24 INFO SessionState: 
Logging initialized using configuration in jar:file:/hadoop/hive/lib/hive-common-2.3.2.jar!/hive-log4j2.properties Async: true
19/03/12 19:46:24 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Created local directory: /hadoop/hive/tmp/root/2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381/_tmp_space.db
19/03/12 19:46:24 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO session.SessionState: Updating thread name to 2968942b-30b6-49f5-b86c-d71a77963381 main
19/03/12 19:46:24 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:24 INFO ql.Driver: Compiling command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139): CREATE TABLE IF NOT EXISTS `oracle`.`INR_APP` ( `EMPNO` DOUBLE, `ENAME
` STRING, `JOB` STRING, `SAL` DOUBLE) COMMENT 'Imported by sqoop on 2019/03/12 11:46:21' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE19/03/12 19:46:27 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:27 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:27 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:27 INFO parse.CalcitePlanner: Starting Semantic Analysis
19/03/12 19:46:27 INFO parse.CalcitePlanner: Creating table oracle.INR_APP position=27
19/03/12 19:46:27 INFO ql.Driver: Semantic Analysis Completed
19/03/12 19:46:27 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
19/03/12 19:46:27 INFO ql.Driver: Completed compiling command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139); Time taken: 2.876 seconds
19/03/12 19:46:27 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager
19/03/12 19:46:27 INFO ql.Driver: Executing command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139): CREATE TABLE IF NOT EXISTS `oracle`.`INR_APP` ( `EMPNO` DOUBLE, `ENAME
` STRING, `JOB` STRING, `SAL` DOUBLE) COMMENT 'Imported by sqoop on 2019/03/12 11:46:21' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE19/03/12 19:46:27 INFO sqlstd.SQLStdHiveAccessController: Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext [sessionString=2968942b-30b6-49f5-b86c-d71a7796338
1, clientType=HIVECLI]19/03/12 19:46:27 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
19/03/12 19:46:27 INFO hive.metastore: Mestastore configuration hive.metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.
hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook19/03/12 19:46:27 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:27 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:27 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:27 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:27 INFO ql.Driver: Completed executing command(queryId=root_20190312114624_6679c12a-4224-4bcd-a8be-f7d4ae56a139); Time taken: 0.096 seconds
OK
19/03/12 19:46:27 INFO ql.Driver: OK
Time taken: 2.982 seconds
19/03/12 19:46:27 INFO CliDriver: Time taken: 2.982 seconds
19/03/12 19:46:27 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:27 INFO session.SessionState: Resetting thread name to  main
19/03/12 19:46:27 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:27 INFO session.SessionState: Updating thread name to 2968942b-30b6-49f5-b86c-d71a77963381 main
19/03/12 19:46:27 INFO ql.Driver: Compiling command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2): 
LOAD DATA INPATH 'hdfs://192.168.1.66:9000/user/root/INR_APP' INTO TABLE `oracle`.`INR_APP`
19/03/12 19:46:28 INFO ql.Driver: Semantic Analysis Completed
19/03/12 19:46:28 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
19/03/12 19:46:28 INFO ql.Driver: Completed compiling command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2); Time taken: 0.421 seconds
19/03/12 19:46:28 INFO ql.Driver: Concurrency mode is disabled, not creating a lock manager
19/03/12 19:46:28 INFO ql.Driver: Executing command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2): 
LOAD DATA INPATH 'hdfs://192.168.1.66:9000/user/root/INR_APP' INTO TABLE `oracle`.`INR_APP`
19/03/12 19:46:28 INFO ql.Driver: Starting task [Stage-0:MOVE] in serial mode
19/03/12 19:46:28 INFO hive.metastore: Closed a connection to metastore, current connections: 0
Loading data to table oracle.inr_app
19/03/12 19:46:28 INFO exec.Task: Loading data to table oracle.inr_app from hdfs://192.168.1.66:9000/user/root/INR_APP
19/03/12 19:46:28 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:28 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:28 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:28 ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
19/03/12 19:46:28 INFO ql.Driver: Starting task [Stage-1:STATS] in serial mode
19/03/12 19:46:28 INFO exec.StatsTask: Executing stats task
19/03/12 19:46:28 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:28 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:28 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:28 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:29 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:29 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.1.66:9083
19/03/12 19:46:29 INFO hive.metastore: Opened a connection to metastore, current connections: 1
19/03/12 19:46:29 INFO hive.metastore: Connected to metastore.
19/03/12 19:46:29 INFO exec.StatsTask: Table oracle.inr_app stats: [numFiles=2, numRows=0, totalSize=146, rawDataSize=0]
19/03/12 19:46:29 INFO ql.Driver: Completed executing command(queryId=root_20190312114627_748c136c-1446-43df-a819-728becae7df2); Time taken: 0.992 seconds
OK
19/03/12 19:46:29 INFO ql.Driver: OK
Time taken: 1.415 seconds
19/03/12 19:46:29 INFO CliDriver: Time taken: 1.415 seconds
19/03/12 19:46:29 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:29 INFO session.SessionState: Resetting thread name to  main
19/03/12 19:46:29 INFO conf.HiveConf: Using the default value passed in for log id: 2968942b-30b6-49f5-b86c-d71a77963381
19/03/12 19:46:29 INFO session.SessionState: Deleted directory: /tmp/hive/root/2968942b-30b6-49f5-b86c-d71a77963381 on fs with scheme hdfs
19/03/12 19:46:29 INFO session.SessionState: Deleted directory: /hadoop/hive/tmp/root/2968942b-30b6-49f5-b86c-d71a77963381 on fs with scheme file
19/03/12 19:46:29 INFO hive.metastore: Closed a connection to metastore, current connections: 0
19/03/12 19:46:29 INFO hive.HiveImport: Hive import complete.
19/03/12 19:46:29 INFO hive.HiveImport: Export directory is empty, removing it.
19/03/12 19:46:29 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
19/03/12 19:46:29 INFO tool.ImportTool:  --incremental append
19/03/12 19:46:29 INFO tool.ImportTool:   --check-column EMPNO
19/03/12 19:46:29 INFO tool.ImportTool:   --last-value 8
19/03/12 19:46:29 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')

查询hive表

hive> select * from inr_app;
OK
1	er	CLERK	800.0
2	ALLEN	SALESMAN	1600.0
3	WARD	SALESMAN	1250.0
4	JONES	MANAGER	2975.0
5	MARTIN	SALESMAN	1250.0
6	zhao	DBA	100.0
7	yan	BI	100.0
8	dong	JAVA	100.0
Time taken: 0.165 seconds, Fetched: 8 row(s)

已经增量过来了,我们也可以使用hdfs dfs -cat查看生成的数据文件,生成的数据文件位置在之前配置hadoop环境时已经配置,读者也可以通过自己访问自己环境:IP:50070/explorer.html#/查询

[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/oracle.db/inr_app/part-m-00000_copy_1
6zhaoDBA100
7yanBI100
8dongJAVA100

至于之前全量的数据,也可以看到:

[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/oracle.db/inr_app/part-m-00000
1erCLERK800
2ALLENSALESMAN1600
3WARDSALESMAN1250
4JONESMANAGER2975
5MARTINSALESMAN1250

2、、lastModify增量导入

lastModify增量导入又分为两种模式:
a、–incremental append 附加模式
b、–incremental --merge-key合并模式

接下来继续看实验:

实验一:附加模式

此方式要求原有表中有time字段,它能指定一个时间戳,让Sqoop把该时间戳之后的数据导入至Hadoop(这里为HDFS)。因为后续员工薪资可能状态会变化,变化后time字段时间戳也会变化,此时Sqoop依然会将相同状态更改后的员工信息导入HDFS,因此为导致数据重复。
先在oracle库基于scott.inr_app新建一个带时间列etltime的表inr_las,初始化已有数据时间为sysdate

create table inr_las as select a.empno,
                               a.ename,
                               a.job,
                               a.sal,
                               sysdate as etltime
                               from inr_app a;
select * from inr_las;
EMPNO	ENAME	JOB        	SAL	        ETLTIME
1	    er	    CLERK	    800.00	2019/3/20 10:42:27
2	    ALLEN	SALESMAN	1600.00	2019/3/20 10:42:27
3	    WARD	SALESMAN	1250.00	2019/3/20 10:42:27
4	    JONES	MANAGER    	2975.00	2019/3/20 10:42:27
5	    MARTIN	SALESMAN	1250.00	2019/3/20 10:42:27
6	    zhao	DBA	        100.00	2019/3/20 10:42:27
7	    yan	     BI	        100.00	2019/3/20 10:42:27
8	    dong	JAVA	    100.00	2019/3/20 10:42:27

在hive创建表,这里统一指定列分隔符为’\t’,后面导入也是以此为分隔符:

create table INR_LAS
(
  empno int,
  ename string,
  job   string,
  sal   float,
  etltime string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

初始化全量导入:

[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS -m 1 --hive-import --hive-database oracle  --fields-terminated-by '\t' --lines-terminated-by '\n'

查询hive表:

hive> select * from inr_las;
OK
1	er	CLERK	800.0	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600.0	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250.0	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975.0	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250.0	2019-03-20 10:42:27.0
6	zhao	DBA	100.0	2019-03-20 10:42:27.0
7	yan	BI	100.0	2019-03-20 10:42:27.0
8	dong	JAVA	100.0	2019-03-20 10:42:27.0
Time taken: 0.181 seconds, Fetched: 8 row(s)

这次增量导入我们先使用–incremental lastmodified --last-value --append 看下效果,首先在源端对inr_las表数据做下变更:

update inr_las set sal=1000,etltime=sysdate where empno=6;
commit;
select * from inr_las;
EMPNO	ENAME	JOB        	SAL	        ETLTIME
1	    er	    CLERK	    800.00	2019/3/20 10:42:27
2	    ALLEN	SALESMAN	1600.00	2019/3/20 10:42:27
3	    WARD	SALESMAN	1250.00	2019/3/20 10:42:27
4	    JONES	MANAGER    	2975.00	2019/3/20 10:42:27
5	    MARTIN	SALESMAN	1250.00	2019/3/20 10:42:27
6	    zhao	DBA	        1000.00	2019/3/20 10:52:34
7	    yan	     BI	        100.00	2019/3/20 10:42:27
8	    dong	JAVA	    100.00	2019/3/20 10:42:27

接下来增量导入:

[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS --fields-terminated-by '\t' --lines-terminated-by '\n' --hive-import --hive-database oracle --hive-table INR_LAS --incremental append --check-column ETLTIME --last-value '2019-03-20 10:42:27' -m 1 --null-string '\\N' --null-non-string '\\N'
Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.Please set $ACCUMULO_HOME to the root of your Accumulo installation. '2019-03Warning: /hadoop/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
19/03/13 14:46:26 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7

查询hive表

hive> select * from inr_las;
OK
1	er	CLERK	800.0	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600.0	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250.0	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975.0	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250.0	2019-03-20 10:42:27.0
6	zhao	DBA	100.0	2019-03-20 10:42:27.0
7	yan	BI	100.0	2019-03-20 10:42:27.0
8	dong	JAVA	100.0	2019-03-20 10:42:27.0
6	zhao	DBA	1000.0	2019-03-20 10:52:34.0
Time taken: 0.171 seconds, Fetched: 9 row(s)

通过上面查询结果可以看到,empno=6的这个员工薪资和etltime记录变更时间都变化后,根据上一次全量初始化后的最大时间来做增量的起始时间去源端oracle查数时候,发现了新的发生变化的数据,然后将它最新状态抽到了hive,采用的追加方式,因此hive里存了两条记录,导致了数据重复,根据时间可以取最新的状态来获取最新数据状态。

实验二:合并模式

接着上面实验环境继续做,这次采用合并模式来看看效果:

    --先看下当前的源端oracle数据:
    EMPNO    	ENAME	    JOB	            SAL	ETLTIME
1	            er	    CLERK	    800.00	2019/3/20 10:42:27
2	            ALLEN	SALESMAN	1600.00	2019/3/20 10:42:27
3	            WARD	SALESMAN	1250.00	2019/3/20 10:42:27
4	            JONES	MANAGER    	2975.00	2019/3/20 10:42:27
5	            MARTIN	SALESMAN	1250.00	2019/3/20 10:42:27
6	            zhao	DBA        	1000.00	2019/3/20 10:52:34
7	            yan	    BI	        100.00	2019/3/20 10:42:27
8	            dong	JAVA	    200.00	2019/3/21 17:12:46

先把前面的hive表给删了

hive> drop table inr_las;
OK
Time taken: 0.195 seconds

创建为外部表

hive>create table INR_LAS
(
  empno int,
  ename string,
  job   string,
  sal   float,
  etltime string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
location '/user/hive/warehouse/exter_inr_las'; 
OK
Time taken: 0.226 seconds

注意,/user/hive/warehouse/exter_inr_las这个目录在第一次全量初始化时不要存在,它会自己创建,如果存在会报目录已存在错误:

ERROR tool.ImportTool: Import failed: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.1.66:9000/user/hive/warehouse/exter_inr_las alre
ady exists

这时候应该先删除一次这个目录:

[root@hadoop ~]# hadoop fs -rmr /user/hive/warehouse/exter_inr_las
rmr: DEPRECATED: Please use 'rm -r' instead.
19/03/13 22:05:33 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/hive/warehouse/exter_inr_las

接下来全量导入一次:

[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS -m 1 --target-dir /user/hive/warehouse/exter_inr_las --fiel
ds-terminated-by '\t'Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/03/13 22:05:48 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
19/03/13 22:05:48 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.

查看一下hdfs此文件夹下文件:

[root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/exter_inr_las/part-m-00000
1	er	CLERK	800	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250	2019-03-20 10:42:27.0
6	zhao	DBA	1000	2019-03-20 10:52:34.0
7	yan	BI	100	2019-03-20 10:42:27.0
8	dong	JAVA	200	2019-03-21 17:12:46.0

查一下hive表:

hive> select * from inr_las;
OK
1	er	CLERK	800.0	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600.0	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250.0	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975.0	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250.0	2019-03-20 10:42:27.0
6	zhao	DBA	1000.0	2019-03-20 10:52:34.0
7	yan	BI	100.0	2019-03-20 10:42:27.0
8	dong	JAVA	200.0	2019-03-21 17:12:46.0
Time taken: 0.191 seconds, Fetched: 8 row(s)

接下来修改一下oracle的数据:

update inr_las set sal=400 ,etltime=sysdate where empno=8;
commit;
select * from inr_las;
EMPNO    	ENAME	    JOB	            SAL	ETLTIME
1	            er	    CLERK	    800.00	2019/3/20 10:42:27
2	            ALLEN	SALESMAN	1600.00	2019/3/20 10:42:27
3	            WARD	SALESMAN	1250.00	2019/3/20 10:42:27
4	            JONES	MANAGER    	2975.00	2019/3/20 10:42:27
5	            MARTIN	SALESMAN	1250.00	2019/3/20 10:42:27
6	            zhao	DBA        	1000.00	2019/3/20 10:52:34
7	            yan	    BI	        100.00	2019/3/20 10:42:27
8	            dong	JAVA	    400.00	2019/3/21 17:47:03--已经更改了

接下来做合并模式增量:

[root@hadoop ~]# sqoop import --connect jdbc:oracle:thin:@192.168.1.6:1521:orcl --username scott --password tiger --table INR_LAS --fields-terminated-by '\t' --lines-terminated-by '\n'  --t
arget-dir /user/hive/warehouse/exter_inr_las -m 1 --check-column ETLTIME --incremental lastmodified --merge-key EMPNO --last-value "2019-03-21 17:12:46"Warning: /hadoop/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
19/03/13 22:18:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7


这时候去看下/user/hive/warehouse/exter_inr_las/内容,你会发现part-m-00000变成了part-r-00000,意思是做了reduce:

root@hadoop ~]# hdfs dfs -cat /user/hive/warehouse/exter_inr_las/part-r-00000
1	er	CLERK	800	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250	2019-03-20 10:42:27.0
6	zhao	DBA	1000	2019-03-20 10:52:34.0
7	yan	BI	100	2019-03-20 10:42:27.0
8	dong	JAVA	400	2019-03-21 17:47:03.0

发现empno=8的记录的确做了变更了,增量同步成功,去看下hive表:

hive> select * from inr_las;
OK
1	er	CLERK	800.0	2019-03-20 10:42:27.0
2	ALLEN	SALESMAN	1600.0	2019-03-20 10:42:27.0
3	WARD	SALESMAN	1250.0	2019-03-20 10:42:27.0
4	JONES	MANAGER	2975.0	2019-03-20 10:42:27.0
5	MARTIN	SALESMAN	1250.0	2019-03-20 10:42:27.0
6	zhao	DBA	1000.0	2019-03-20 10:52:34.0
7	yan	BI	100.0	2019-03-20 10:42:27.0
8	dong	JAVA	400.0	2019-03-21 17:47:03.0
Time taken: 0.196 seconds, Fetched: 8 row(s)

没问题。由于篇幅原因,sqoop job的使用及增量脚本定时同步数据的案例写在了下一篇文章

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。