attach partition from 和 move partition to
attach partition from 和 move partition to
在数据迁移过程中,我们常常使用到clickhouse的这两种DDL将源表的数据迁移到目标表,这两块代码入口函数分别是:StorageReplicatedMergeTree::replacePartitionFrom和StorageReplicatedMergeTree::movePartitionToTable
代码流程分析主要以StorageReplicatedMergeTree::replacePartitionFrom为例,函数内根据partition_id获取到相应partition下所有的part,遍历获取到part的checksum,生成对应的hash值(hash值最终会作为zookeeper上的一个lock目录,例如:/clickhouse/tables/1/DTDB/lineorder_2/blocks/202101_replace_from_1C9BA91501B16CFEC92DE5381CCB27E1)
/// Clone parts into destination table.
for (const auto & src_part : src_all_parts)
{
if (!dest_table_storage->canReplacePartition(src_part))
throw Exception(
"Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path;
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
{
LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
continue;
}
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
allocateBlockNumber往zookeeper上创建对应的hash目录,并以此来过滤attach partition过程中重复的part,也即,如果第二次attach partition的时候,clickhouse发现zookeeper上无法allocateBlockNumber,则认为该part之前已经执行过attach,打印info日志后退出该part的拷贝(这个流程在move partition中应该是一致的,只是zookeeper上的lock路径改成了tmp_move_from_)。
同节点的attach partition直接执行cloneAndLoadDataPartOnSameDisk完成数据迁移,这个过程效率很高,千亿级数据在5分钟的级别完成。本节点完成数据迁移后也会往zookeeper上提交一个log,让其他副本也能完成迁移过程。副本执行入口函数:StorageReplicatedMergeTree::executeReplaceRange
static const String TMP_PREFIX = "tmp_replace_from_";
auto obtain_part = [&] (PartDescriptionPtr & part_desc)
{
if (part_desc->src_table_part)
{
if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
part_desc->res_part = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot);
}
else if (!part_desc->replica.empty())
{
String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
String detached_path;
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, part_desc->found_new_part_name, source_replica_path, detached_path,
address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
}
else
throw Exception("There is no receipt to produce part " + part_desc->new_part_name + ". This is bug", ErrorCodes::LOGICAL_ERROR);
};
StorageReplicatedMergeTree::executeReplaceRange首先会检测源表中是否有相应的part,如果有这本地copy,如果没有则从其他副本fetch。attach partition和move partition的行为可以进行增量数据的同步,但是如果源表有不断insert和merge过程,在第二次attach partition的时候可能会导致目标表的数据增加。
- 点赞
- 收藏
- 关注作者
评论(0)