技术实践-HBase增量数据迁移的方法
概览
● 本文主要是想谈一下如何给HBase做增量数据的迁移,也就是迁移实时数据。在之前的博文HBase实用技巧:一种全量+增量数据的迁移方法-云社区-华为云 (huaweicloud.com)中提到HBase增量数据迁移可以使用Replication的方式去做,但是在实际搬迁时,要给原集群设置Replication可能需要重启,这样会影响业务,我们需要做到不停机迁移才行。
WAL原理
正常情况下,HBase新增的数据都是有日志记录的,数据在落盘成HFile之前,任何一个Put和Delete操作都是记录日志并存放在WALs目录中,日志中包含了所有已经写入Memstore但还未Flush到HFile的更改(edits)。
默认情况下每个RegionServer只会写一个日志文件,该RS管理的所有region都在向这一个日志文件写入Put和Delete记录,直到日志文件大小达到128MB(由hbase.regionserver.hlog.blocksize设置)后roll出一个新的日志文件,总共可以roll出32个日志文件(由hbase.regionserver.maxlogs设置)。
如果日志文件未写满128MB,RegionServer间隔1小时也会roll出新一个新日志文件(由hbase.regionserver.logroll.period设置)。
当日志文件中涉及的所有region的记录都flush成HFile后,这个日志文件就会转移至oldWals目录下归档, Master没间隔10分钟(hbase.master.cleaner.interval)会检查oldWALs目录下的过期日志文件,当文件过期时会被Master清理掉,(日志过期时间由hbase.master.logcleaner.ttl控制)。
RegionServer默认间隔1小时(由hbase.regionserver.optionalcacheflushinterval设置)会对它管理的region做一次flush动作,所以WALs目录中一直会有新的日志文件生成,并伴随着老的日志文件移动到oldWALs目录中。
迁移方式
一、迁移oldWALs目录中的文件,使用WALPlayer回放
由于日志文件文件最终移动到oldWALs目录下,只需要写个脚本,定时检查oldWALs目录下是否有新文件生成,如果有文件,则move至其他目录,并使用WALPlayer工具对这个目录进行回放。
优点:无代码开发量,仅需脚本实现
缺点:无法做到实时,因为从数据写入到最后到达oldWAL目录会间隔很长时间。
二、开发独立工具,解析日志文件,写入目的集群
在网上查找迁移方法的时候了解到了阿里开发了一个专门的HBase迁移工具,可以实现不停机。通过阅读其设计BDS - HBase数据迁移同步方案的设计与实践了解到阿里开发了应用去读取HBase的WAL日志文件并回放数据至目的集群。
优点:可以做到实时;
缺点:需要一定的代码开发量;
要做出这样一个工具,需要了解上面说的WAL文件归档的原理以及日志回放工具WALPlayer,下面简单说一下可以怎么去实现。
独立工具实现
这里简单说明下如何去做这样一个工具,只介绍读取WAL方面,任务编排就不描述了
-
定时扫描WALs目录获取所有的日志文件,这里按ServerName去分组获取,每个分组内根据WAL文件上的时间戳排序;
● 获取所有RS的ServerName
ClusterStatus clusterStatus = admin.getClusterStatus(); Collection<ServerName> serverNames = clusterStatus.getServers();
● 根据ServerName去组成Path获取日志
Path rsWalPath = new Path(walPath, serverName.getServerName()); List<FileStatus> hlogs = getFiles(fs, rsWalPath, Long.MIN_VALUE, Long.MAX_VALUE);
● getFiles()参考HBase源码中WALInputFormat.java中的实现,可以指定时间范围去取日志文件
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); FileStatus[] files = fs.listStatus(dir); if (files == null) return Collections.emptyList(); for (FileStatus file : files) { if (file.isDirectory()) { // recurse into sub directories result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); } else { String name = file.getPath().toString(); int idx = name.lastIndexOf('.'); if (idx > 0) { try { long fileStartTime = Long.parseLong(name.substring(idx+1)); if (fileStartTime <= endTime) { LOG.info("Found: " + name); result.add(file); } } catch (NumberFormatException x) { idx = 0; } } if (idx == 0) { LOG.warn("File " + name + " does not appear to be an WAL file. Skipping..."); } } } return result; }
-
对于取到的每一个WAL文件,当做一个任务Task执行迁移,这个task主要有下面一些步骤
● 使用WALFactory为每个日志文件创建一个Reader
WAL.Reader walReader = WALFactory.createReader(fileSystem, curWalPath, conf);
● 通过Reader去读取key和edit,并记录下position,为了加快写入速度,这里可以优化为读取多个entry
WAL.Entry entry = walReader.next(); WALKey walKey = entry.getKey(); WALEdit walEdit = entry.getEdit(); long curPos = reader.getPosition();
● 记录position的目的是为了Reader的reset以及seek,因为这个原始WAL文件还正在写入的时候,我们的Reader速度很可能大于原WAL的写入速度,当Reader读到底的时候,需要等待一段时间reset然后再重新读取entry
WAL.Entry nextEntry = reader.next(); if (nextEntry == null) { LOG.info("Next entry is null, sleep 10000ms."); Thread.sleep(5000); curPos = reader.getPosition(); reader.reset(); reader.seek(curPos); continue; }
● 在读取WAL的过程中很可能会遇到日志转移到oldWALs目录下,这个时候捕获到FileNotFoundException时,需要重新生成一个oldWALs目录下Reader,然后设置curPos继续读取文件,这个时候如果再次读取到文件最后的时候,就可以关闭Reader了,因为oldWALs中的日志文件是固定大小的,不会再有追加数据。
这里需要注意的是这个参数hbase.master.logcleaner.ttl不能设置过小,否则会出现这个在oldWALs目录下的日志文件还没读取完被清理掉了。
Path oldWALPath = new Path(oldWalPath, walFileName); WAL.Reader reader = WALFactory.createReader(fileSystem, oldWALPath, conf); reader.seek(curPos)
● 根据通过WAL.Reader可以读取到walKey,walEdit进而解析出Cell并写入目的集群,这个可以参考WALPlay的map()方法
- 点赞
- 收藏
- 关注作者
评论(0)