attach partition from 和 move partition to

陈方业 发表于 2021/10/15 17:03:21 2021/10/15
【摘要】 attach partition from 和 move partition to在数据迁移过程中,我们常常使用到clickhouse的这两种DDL将源表的数据迁移到目标表,这两块代码入口函数分别是:StorageReplicatedMergeTree::replacePartitionFrom和StorageReplicatedMergeTree::movePartitionToTabl...

attach partition from 和 move partition to

在数据迁移过程中,我们常常使用到clickhouse的这两种DDL将源表的数据迁移到目标表,这两块代码入口函数分别是:StorageReplicatedMergeTree::replacePartitionFrom和StorageReplicatedMergeTree::movePartitionToTable

代码流程分析主要以StorageReplicatedMergeTree::replacePartitionFrom为例,函数内根据partition_id获取到相应partition下所有的part,遍历获取到part的checksum,生成对应的hash值(hash值最终会作为zookeeper上的一个lock目录,例如:/clickhouse/tables/1/DTDB/lineorder_2/blocks/202101_replace_from_1C9BA91501B16CFEC92DE5381CCB27E1)

    /// Clone parts into destination table.

    for (const auto & src_part : src_all_parts)
    {
        if (!dest_table_storage->canReplacePartition(src_part))
            throw Exception(
                "Cannot move partition '" + partition_id + "' because part '" + src_part->name + "' has inconsistent granularity with table",
                ErrorCodes::LOGICAL_ERROR);

        String hash_hex = src_part->checksums.getTotalChecksumHex();
        String block_id_path;

        auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
        if (!lock)
        {
            LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex);
            continue;
        }

        UInt64 index = lock->getNumber();
        MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
        auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);

        src_parts.emplace_back(src_part);
        dst_parts.emplace_back(dst_part);
        ephemeral_locks.emplace_back(std::move(*lock));
        block_id_paths.emplace_back(block_id_path);
        part_checksums.emplace_back(hash_hex);
    }

allocateBlockNumber往zookeeper上创建对应的hash目录,并以此来过滤attach partition过程中重复的part,也即,如果第二次attach partition的时候,clickhouse发现zookeeper上无法allocateBlockNumber,则认为该part之前已经执行过attach,打印info日志后退出该part的拷贝(这个流程在move partition中应该是一致的,只是zookeeper上的lock路径改成了tmp_move_from_)。

同节点的attach partition直接执行cloneAndLoadDataPartOnSameDisk完成数据迁移,这个过程效率很高,千亿级数据在5分钟的级别完成。本节点完成数据迁移后也会往zookeeper上提交一个log,让其他副本也能完成迁移过程。副本执行入口函数:StorageReplicatedMergeTree::executeReplaceRange

    static const String TMP_PREFIX = "tmp_replace_from_";

    auto obtain_part = [&] (PartDescriptionPtr & part_desc)
    {
        if (part_desc->src_table_part)
        {

            if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
                throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);

            part_desc->res_part = cloneAndLoadDataPartOnSameDisk(
                part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot);
        }
        else if (!part_desc->replica.empty())
        {
            String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
            ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
            auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
            auto [user, password] = global_context.getInterserverCredentials();
            String interserver_scheme = global_context.getInterserverScheme();

            if (interserver_scheme != address.scheme)
                throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);

            String detached_path;
            part_desc->res_part = fetcher.fetchPart(
                metadata_snapshot, part_desc->found_new_part_name, source_replica_path, detached_path,
                address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");

            /// TODO: check columns_version of fetched part

            ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
        }
        else
            throw Exception("There is no receipt to produce part " + part_desc->new_part_name + ". This is bug", ErrorCodes::LOGICAL_ERROR);
    };

StorageReplicatedMergeTree::executeReplaceRange首先会检测源表中是否有相应的part,如果有这本地copy,如果没有则从其他副本fetch。attach partition和move partition的行为可以进行增量数据的同步,但是如果源表有不断insert和merge过程,在第二次attach partition的时候可能会导致目标表的数据增加。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。