技术实践-HBase增量数据迁移的方法

举报
Lettle whale 发表于 2021/04/20 16:06:33 2021/04/20
【摘要】 简单说一下HBase数据过程中的增量迁移

概览

● 本文主要是想谈一下如何给HBase做增量数据的迁移,也就是迁移实时数据。在之前的博文HBase实用技巧:一种全量+增量数据的迁移方法-云社区-华为云 (huaweicloud.com)中提到HBase增量数据迁移可以使用Replication的方式去做,但是在实际搬迁时,要给原集群设置Replication可能需要重启,这样会影响业务,我们需要做到不停机迁移才行。

WAL原理

正常情况下,HBase新增的数据都是有日志记录的,数据在落盘成HFile之前,任何一个Put和Delete操作都是记录日志并存放在WALs目录中,日志中包含了所有已经写入Memstore但还未Flush到HFile的更改(edits)。

默认情况下每个RegionServer只会写一个日志文件,该RS管理的所有region都在向这一个日志文件写入Put和Delete记录,直到日志文件大小达到128MB(由hbase.regionserver.hlog.blocksize设置)后roll出一个新的日志文件,总共可以roll出32个日志文件(由hbase.regionserver.maxlogs设置)。

如果日志文件未写满128MB,RegionServer间隔1小时也会roll出新一个新日志文件(由hbase.regionserver.logroll.period设置)。

当日志文件中涉及的所有region的记录都flush成HFile后,这个日志文件就会转移至oldWals目录下归档, Master没间隔10分钟(hbase.master.cleaner.interval)会检查oldWALs目录下的过期日志文件,当文件过期时会被Master清理掉,(日志过期时间由hbase.master.logcleaner.ttl控制)。

RegionServer默认间隔1小时(由hbase.regionserver.optionalcacheflushinterval设置)会对它管理的region做一次flush动作,所以WALs目录中一直会有新的日志文件生成,并伴随着老的日志文件移动到oldWALs目录中。

迁移方式

一、迁移oldWALs目录中的文件,使用WALPlayer回放

由于日志文件文件最终移动到oldWALs目录下,只需要写个脚本,定时检查oldWALs目录下是否有新文件生成,如果有文件,则move至其他目录,并使用WALPlayer工具对这个目录进行回放。

优点:无代码开发量,仅需脚本实现

缺点:无法做到实时,因为从数据写入到最后到达oldWAL目录会间隔很长时间。

二、开发独立工具,解析日志文件,写入目的集群

在网上查找迁移方法的时候了解到了阿里开发了一个专门的HBase迁移工具,可以实现不停机。通过阅读其设计BDS - HBase数据迁移同步方案的设计与实践了解到阿里开发了应用去读取HBase的WAL日志文件并回放数据至目的集群。

优点:可以做到实时;

缺点:需要一定的代码开发量;

要做出这样一个工具,需要了解上面说的WAL文件归档的原理以及日志回放工具WALPlayer,下面简单说一下可以怎么去实现。

独立工具实现

这里简单说明下如何去做这样一个工具,只介绍读取WAL方面,任务编排就不描述了

  1. 定时扫描WALs目录获取所有的日志文件,这里按ServerName去分组获取,每个分组内根据WAL文件上的时间戳排序;

    ● 获取所有RS的ServerName

    ClusterStatus clusterStatus = admin.getClusterStatus();
    Collection<ServerName> serverNames = clusterStatus.getServers();
    

    ● 根据ServerName去组成Path获取日志

    Path rsWalPath = new Path(walPath, serverName.getServerName());
    List<FileStatus> hlogs = getFiles(fs, rsWalPath, Long.MIN_VALUE, Long.MAX_VALUE);
    

    ● getFiles()参考HBase源码中WALInputFormat.java中的实现,可以指定时间范围去取日志文件

    private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
        throws IOException {
      List<FileStatus> result = new ArrayList<FileStatus>();
      LOG.debug("Scanning " + dir.toString() + " for WAL files");
    
      FileStatus[] files = fs.listStatus(dir);
      if (files == null) return Collections.emptyList();
      for (FileStatus file : files) {
        if (file.isDirectory()) {
          // recurse into sub directories
          result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
        } else {
          String name = file.getPath().toString();
          int idx = name.lastIndexOf('.');
          if (idx > 0) {
            try {
              long fileStartTime = Long.parseLong(name.substring(idx+1));
              if (fileStartTime <= endTime) {
                LOG.info("Found: " + name);
                result.add(file);
              }
            } catch (NumberFormatException x) {
              idx = 0;
            }
          }
          if (idx == 0) {
            LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
          }
        }
      }
      return result;
    }
    
  2. 对于取到的每一个WAL文件,当做一个任务Task执行迁移,这个task主要有下面一些步骤

    ● 使用WALFactory为每个日志文件创建一个Reader

    WAL.Reader walReader = WALFactory.createReader(fileSystem, curWalPath, conf)

    ● 通过Reader去读取key和edit,并记录下position,为了加快写入速度,这里可以优化为读取多个entry

    WAL.Entry entry = walReader.next();
    WALKey walKey = entry.getKey();
    WALEdit walEdit = entry.getEdit();
    long curPos = reader.getPosition();
    

    ● 记录position的目的是为了Reader的reset以及seek,因为这个原始WAL文件还正在写入的时候,我们的Reader速度很可能大于原WAL的写入速度,当Reader读到底的时候,需要等待一段时间reset然后再重新读取entry

    WAL.Entry nextEntry = reader.next();
    if (nextEntry == null) {
        LOG.info("Next entry is null, sleep 10000ms.");
        Thread.sleep(5000);
        curPos = reader.getPosition();
        reader.reset();
        reader.seek(curPos);
        continue;
    }
    

    ● 在读取WAL的过程中很可能会遇到日志转移到oldWALs目录下,这个时候捕获到FileNotFoundException时,需要重新生成一个oldWALs目录下Reader,然后设置curPos继续读取文件,这个时候如果再次读取到文件最后的时候,就可以关闭Reader了,因为oldWALs中的日志文件是固定大小的,不会再有追加数据。

    这里需要注意的是这个参数hbase.master.logcleaner.ttl不能设置过小,否则会出现这个在oldWALs目录下的日志文件还没读取完被清理掉了。

    Path oldWALPath = new Path(oldWalPath, walFileName);
    WAL.Reader reader = WALFactory.createReader(fileSystem, oldWALPath, conf);
    reader.seek(curPos)
    

    ● 根据通过WAL.Reader可以读取到walKey,walEdit进而解析出Cell并写入目的集群,这个可以参考WALPlay的map()方法

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。