深入浅出Sqoop之Java Client API 使用指南

举报
Leef724 发表于 2020/05/12 16:27:36 2020/05/12
【摘要】 Sqoop Java Client API 使用指南简介Sqoop是一个在Hadoop和关系型数据库之间被用来传输数据的工具,官网地址sqoop.apache.org,当前最新版本是2.0,本文根据官网文档,基于1.99.7版本,简单介绍其Java Client API使用。接下来要介绍的所有方法都在sqoop源码中的SqoopClient类中被打包。迁移流程sqoop的迁移流程简单来说就是...

Sqoop Java Client API 使用指南

简介

Sqoop是一个在Hadoop和关系型数据库之间被用来传输数据的工具,官网地址sqoop.apache.org,当前最新版本是2.0,本文根据官网文档,基于1.99.7版本,简单介绍其Java Client API使用。接下来要介绍的所有方法都在sqoop源码中的SqoopClient类中被打包。


迁移流程

sqoop的迁移流程简单来说就是将源端连接器的数据通过sqoop迁移到目的端,这个数据可能是mysql的一张表,也可能是hdfs上一个文件。具体来说可分为以下三步:

  • 根据sqoop指定的connector name,分别创建源端(From)和目的端(To)对应的Link Object

  • 根据From和To连接器的名字,创建一个sqoop作业;

  • 根据作业名,启动作业。


开发指南

本文主要介绍API开发指南,已假定你已准备好jdk环境,maven环境,以及sqoop server。

1. 设置 Project Dependencies

<dependency>
   <groupId>org.apache.sqoop</groupId>
   <artifactId>sqoop-client</artifactId>
   <version>${requestedVersion}</version>
</dependency>

2. 初始化

根据默认的url初始化一个SqoopClient对象

String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);

如果想要修改url可调用SqoopClient的setServerUrl方法。

3. 连接器

连接器(Connectors)提供了一个可以与数据源交互的平台,一个连接器可以有一个或者多个链接(Link)去连接指定的数据源,如mysql,hive等。因此,Sqoop的迁移过程就可以看做是利用From端的Link读取数据,在由To端的Link写入数据。SqoopClient中提供了创建、修改和删除Link的接口。用户可以根据自己需要去使用。

下面将通过代码简单来介绍这个过程

1) 根据Connector name创建Link

MLink link = Client.createLink("connectorName");
link.setName("mysql_link");
link.setCreationUser("Ruby");

MLink是一个Model,表示一个Link Object,它又包含了一个MLinkConfig模型,用以保存数据源的连接信息,具体可以查看sqoop源码common

2) 利用这个新建的Link去连接mysql数据库

MLinkConfig linkConfig = link.getConnectorLinkConfig();
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/xxx");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("root");
linkConfig.getStringInput("linkConfig.password").setValue("root");

3) 保存连接

// save the link object that was filled
Status status = client.saveLink(link);
if(status.canProceed()) {
  System.out.println("Created Link with Link Name : " + link.getName());
} else {
  System.out.println("Something went wrong creating the link");
}

这里的status.canProceed()返回ture当且仅当连接配置参数验证通过。

4)获取连接

  • getLink("linkName"),返回指定Link

  • getLinks(),以一个list返回所有创建的Links。

4. 作业

一个sqoop作业需要有一个From端的Link和一个To端的Link,每个Link的name都是唯一的。新建作业时,首先指定FromLinkName和ToLinkName,然后分别设置MFromJobConfigToJobConfig,例如源端的数据库名表名,目的端的数据库名和表名。此外,每个作业还要设置driver configs,例如指定job提交到MapReduce后,需要设置多少个mappers。

下面将用代码详细介绍这个过程:

新建作业

1) 新建作业,指定作业

String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Creating dummy job object
MJob job = client.createJob("fromLinkName", "toLinkName");
job.setName("Vampire");
job.setCreationUser("Buffy");

2)设置From的作业配置

// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");

3)设置To的作业配置

// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");

4)设置driver Config

// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

5)保存job

Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Name: "+ job.getName());
} else {
   System.out.println("Something went wrong creating the job");
}

其中status有三种状态

  • OK: 状态正常,没有任何告警

  • WARNING:个别参数验证不通过,但不致命

  • ERROR:参数验证失败,无法启动作业

6)获取作业

  • getJob("jobName"),返回指定作业

  • getJobs(),以一个list返回所有创建的作业。

启动作业

根据作业名启动作业,如果作业成功启动,状态返回BOOTING或者RUNNING 1)启动作业

//Job start
MSubmission submission = client.startJob("jobName");
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
   System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoop job id :" + submission.getExternalId());
System.out.println("Job link : " + submission.getExternalLink());

2) 打印作业的统计信息

Counters counters = submission.getCounters();
if(counters != null) {
  System.out.println("Counters:");
  for(CounterGroup group : counters) {
     System.out.print("\t");
     System.out.println(group.getName());
     for(Counter counter : group) {
       System.out.print("\t\t");
       System.out.print(counter.getName());
       System.out.print(": ");
       System.out.println(counter.getValue());
     }
  }
}
if(submission.getExceptionInfo() != null) {
  System.out.println("Exception info : " +submission.getExceptionInfo());
}

2)查看作业状态

//Check job status for a running job
MSubmission submission = client.getJobStatus("jobName");
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
  System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}

3)停止作业

//Stop a running job
submission.stopJob("jobName");
Remark

上述提交到作业的过程是异步的,如果想同步提交作业,可以使用startJob(jobName, callback, pollTime)方法,其中callback可以为null如果你不需要提交状态

参考文献

https://github.com/apache/sqoop/tree/branch1.99.7/docs/src/site/sphinx/dev/ConnectorDevelopment.rst


【版权声明】本文为华为云社区用户翻译文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容, 举报邮箱:cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。