数据集成之sqoop介绍

举报
米兰的小铁匠 发表于 2020/11/24 12:49:32 2020/11/24
【摘要】 用户需要将分散在各个系统的数据统一处理,将源中的数据统一传输到大数据平台。 需要将不同的数据汇聚、清洗、转换。此时就需要一款支持hadoop和与关系型数据库之间的数据的导入导出工具,即数据迁移工具。此时sqoop(sql to hadoop)就诞生了,sqoop是一个在结构化数据、半结构化数据、和非结构非数据的高效的数据转换工具,其构建起传统db和hadoop之间的桥梁。主要用于批量数据处理。

背景

      用户需要将分散在各个系统的数据统一处理,将源中的数据统一传输到大数据平台。 需要将不同的数据汇聚、清洗、转换。此时就需要一款支持hadoop和与关系型数据库之间的数据的导入导出工具,即数据迁移工具。此时sqoop(sql to hadoop)就诞生了,sqoop是一个在结构化数据、半结构化数据、和非结构非数据的高效的数据转换工具,其构建起传统db和hadoop之间的桥梁。主要用于批量数据处理。sqoop支持增量和全量导入导出。 本文基于sqoop1.99.7版本进行介绍。

什么是sqoop

      sqoop将数据迁移任务进行细分,将数据迁移中的数据源、数据传输配置、数据传输任务提取抽象,经过抽象的核心概念Connector、link、job、driver。
      connector the registered connector implementation will provide logic to read from or write to a data source that it represents. A connector can have one or more links associated with it.如果你用开源版本之外的数据源,可以自己扩展开发connector模块以适配。
      link connector是和数据源相关的, 而link是和某个具体的任务相关的,针对特定的数据源,配置信息。其定义了从一个数据源读出和写入的信息。
      driver提供对job任务的运行信息,管理sqoop作业的生命周期。

      job是从一个数据源读出,写到另一个数据源的过程(A sqoop job holds the from and to parts for transferring data form the From data source to the To data source. both the from and the to are identified by their corresponding connector link ids)。 本文将结合mapreduce详细讲解这一流程。

 sqoop2架构

      sqoop2是一种CS架构,客户端包括sqoop-client和sqoop-shell,服务端包括sqoop-server,server端相当于一个web服务应用,其核心是利用将数据转换程序转换成mapreduce任务,加快传输速度,并发进行数据的ETL流程。

     

      sqoop的client 负责搜索connector,创建link、job,与server通信,并提交job,构建url连接服务器,构建sqoopResourceRequest对象与server进行通信,获取job的运行信息。同时client端缓存了connector driver、resouce等信息,负责用户和server进行交互。

      sqoop的server端启动后,可以接受从client发送过来的http请求,进而完成client提交过来的作业,server负责与hadoop通信进行数据迁移。sqoop server端运行在tomcat中,启动后会初始化多个manager角色,譬如jobmanager,其对提交的任务进行管理,读取配置信息,实例化提交和执行引擎,并通过两个线程清洗和更新配置信息,AduitLoggerManger 其负责系统升级过程,记录日志等操作。AuthotizationManger,其负责授权操作,决定用户可以执行什么操作。AuthoenticationManager,其负责认证管理,启动时从配置文件中读取身份验证的信息。鉴权授权模块,开源的sqoop没有具体的实现,需要根据需求自己集成相关模块。SqoopConfiguration,负责保存配置信息,启动一个线程定期检查更新,并维护一个与其他manager观察者队列,变动时发送通知。RepositoryManger,与作业相关的job、link、submission等信息会通过该角色更新与保存,其后端db默认是derby,当然其是一个可插拔组件,你可以根据需求更换成mysql等成熟的关系型数据库。

      sqoop server中几个关键manager的交互如下图。

    


 mapreduce如何在sqoop中使用的

  sqoop通过jobmanager将任务转换成mapreduce作业并行执行,加快速度,用户提交sqoop任务后,在sqoop中的jobmanager会将sqoop作业转换成MRjob,使用SqoopInputFormat抽取作业,将对应的extractor转化成SqoopMapper。


SqoopInputFormat会调用Partitioner类的getPartitions方法生成Partition列表封装到SqoopSplit中。每个Partition分片会交给一个Mapper任务执行。每个Mapper启动一个extractor线程从数据源端抽取相应的partition数据和loader线程load数据到目的端。并通过SqoopMapDataWriter进行序列化和反序列化操作。

sqoop执行流程

       一个sqoop作业从客户端向服务端提交请求、发送请求、鉴权、授权、生成作业、提交作业、etl流程、返回结果的流程如下图所示:

sqoop使用方式介绍

创建连接
   create link –c generic –jdbc-connector  --name  sqltest
   create link –c hdfs-connector  --name    hdfstest
创建job
   create  job  -f sqltest  -t  hdfstest – partition column
启动job  客户端提交给服务端
   start  job  -n  test-job  -s
   
sqoop导出
sqoop export\
--connect jdbc:mysql://127.0.0.1:3306/db_hadoop \
--username sqoop \ // 用户名
--password sqoop \  // 密码
--table user \   
-- staging-table \ // 临时表
-- columns \ // 指定要导出的列    
-- batch \ //批量导入
--incremental lastImortedDate\    //代表只导入增量数据
--check-column id \     //以主键id作为判断条件
--update-model allowinsert \ // 为空插入  已存在 update(DB层面)
--update-key \ // 更新已有数据     
--input-null-string \ // 字段为空处理
-m 6   // mapper 的数量

sqoop ETL流程

 

 如上图,sqoop的etl流程主要被划分为Initializer、Partitioner、Extractor、Loader、distroyer等几个环节,
 在Initializer阶段初始化资源,与数据源建立连接,获取schema信息。
 在partitioner阶段会划分数据源的数据分片。
 Extractor根据数据分片抽取数据源数据,并经过extract转换成sqoop中间格式数据。如一个mysql源的抽取代码信息

 public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
    Schema schema = context.getSchema();  // 获取schema 信息
    Column[] schemaColumns = schema.getColumnsArray();
    try ( // 根据分片,抽取数据
      GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
      PreparedStatement preparedStatement = createPreparedStatement(context, executor, partition);
      ResultSet resultSet = preparedStatement.executeQuery()
    ) {
      ResultSetMetaData metaData = resultSet.getMetaData();
      int columnCount = metaData.getColumnCount();
      while (resultSet.next()) {
        Object[] array = new Object[columnCount];
        for (int i = 0; i < columnCount; i++) {
          if(resultSet.getObject(i + 1) == null) {
            array[i] = null ;
            continue;
          }
          // check type of the column
          Column schemaColumn = schemaColumns[i];
          switch (schemaColumn.getType()) {
          // 根据不同的数据类型向中间格式进行转换
          case BLOB:
            // convert the blob to byte[]
            Blob blob = resultSet.getBlob(i + 1);
            array[i] = blob.getBytes(1, (int)blob.length());
            break;
          case DATE:
            // convert the sql date to JODA time as prescribed the Sqoop IDF spec
            array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1));
            break;
          default:
            //for anything else
            array[i] = resultSet.getObject(i + 1);
          }
        }
        context.getDataWriter().writeArrayRecord(array); // 写入中间数据
        rowsRead++;
      }
  }

loader到hdfs的流程如下

public void load {
      public Void run() throws Exception {
        HdfsUtils.contextToConfiguration(context.getContext(), conf);
        DataReader reader = context.getDataReader();
        String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY);
        String codecname = getCompressionCodecName(toJobConfig);
        // 输出是否压缩 (省略)
        String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec);
          GenericHdfsWriter filewriter = getWriter(toJobConfig)
          filewriter.initialize(filepath, context.getSchema(), conf, codec);
        Object[] record;
        while ((record = reader.readArrayRecord()) != null) { // 从中间数据中读取数据,转换,写入目的源
          // char spliter = Character.valueOf(((char)toJobConfig.toJobConfig.separator));
          String spliter = toJobConfig.toJobConfig.separator;
          filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(),   toJobConfig.toJobConfig.nullValue,spliter)); // 写入数据
          rowsWritten++;
        }
      }
      filewriter.destroy();
      return null;
    }
  });
  }

存在的问题

      开源的sqoop其实有点鸡肋,如果源系统不能承受比较大的读取压力,不建议使用sqoop工具。
sqoop不适合做实时数据获取,进行数据传输时,sqoop会启动执行mapreduce任务做etl流程,对于流式数据接入,可采用flume。
原生sqoop在数据倾斜场景下处理的不够好,如果能够加入预采样加自动distinct-split的机制会好一些。当然如果能cbo机制的analyze获取数据源统计元数据信息更好。同时flink1.11后推出的cdc模块,可以取代sqoop和canal做流批一体的etl数据集成。

容错与事务

       Sqoop本身由于数据传输并行机制并不支持事务。我们可以在框架和应用本身做一些事务机制。
导出时到DB多个任务并行导出,提交中间结果是可见的,导出任务完成之前,尽量不要启动可以使用导出结果的程序。
导入时保证一致性的最好方法是,保证数据源的一致性快照,在导入时尽量不要对表中现有数据做更新。
目的源为hdfs时MR作业本身容错。
目的源为mysql 可能会出现脏数据,可以先导出到中间表,成功后利用mysql本身的事务机制由中间表写到正式表。


结语

        开源的sqoop支持的数据源比较少,譬如不支持es、redis等,根据这篇文章的描述,让你对接一个新的数据源,你有思路吗? 后续数据集成方面会结合flink1.11的cdc特性和jdbc connector再写一篇数据集成相关的博文。
       当然数据集成数据入湖,华为云数据湖团队已经集成了很成熟的流式集成和批量集成方案,请来华为云EI 数据湖相关产品体验吧。

本文一部分内容参考:
sqoop doc  http://sqoop.apache.org/docs/1.99.7/index.html
sqoop code https://github.com/apache/sqoop


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。