ClickHouse源码分析:物化视图流程分析

进击的阿张 发表于 2021/12/20 21:30:45 2021/12/20
【摘要】 主要分析物化视图的建立和数据插入流程,其中涉及到的更底层的数据读写流程,需要单独再仔细分析下。本次分析基于v21.3.4.25-lts版本代码。

主要分析物化视图的建立和数据插入流程,其中涉及到的更底层的数据读写流程,需要单独再仔细分析下。本次分析基于v21.3.4.25-lts版本代码。
以下面的例子为例。


CREATE TABLE local_table
(
    did UInt64,
    uid UInt64 DEFAULT (rand(1)%2)*did,
    device_type String,
    oper_type UInt8,
    when DateTime,
    duration Float32,
    pt_d Date DEFAULT toDate(when)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/local_table', '{replica}')
PARTITION BY toYYYYMM(pt_d)
ORDER BY (uid, device_type, oper_type, pt_d);


CREATE MATERIALIZED VIEW view_table
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/view_table', '{replica}')
PARTITION BY toYYYYMM(pt_d)
ORDER BY did
AS SELECT
    device_type,
    oper_type,
    pt_d,
    did,
    countState(did) AS did_count_state,
    uniqExactState(did) AS did_uniqExact_state,
    countState(uid) AS uid_count_state,
    uniqExactState(uid) AS uid_uniqExact_state,
    avgState(duration) AS duration_avg_state,
    sumState(duration) AS duration_sum_state
FROM local_table
GROUP BY pt_d, device_type, oper_type, did;

insert into local_table values(1,2,'1',1,now(),1,toDate(now()))

物化视图类和对应引擎的注册

Server::main
    registerStorages
        registerStorageMergeTree
            name = "ReplicatedAggregatingMergeTree"
            Creator.creator_fn = StoragePtr create(args) // StorageReplicatedMergeTree::create
        registerStorageMaterializedView
            name = "MaterializedView"
            Creator.creator_fn = [](const StorageFactory::Arguments & args) { return StorageMaterializedView::create(args) }

建表流程

InterpreterCreateQuery::execute
    InterpreterCreateQuery::createTable
        setProperties // 设置属性,如:列信息columns_list
        InterpreterCreateQuery::doCreateTable
            StorageFactory::instance().get // 第一次进来,根据query确定引擎为MaterializedView。 根据registerStorageMaterializedView中注册的函数创建IStorage
                StorageMaterializedView::create(args) // shared_ptr_helper的create
                    StorageMaterializedView::StorageMaterializedView
                        getSelectQueryFromASTForMatView // 提取inner_query, select依赖的table id
                        std::make_shared<ASTCreateQuery>() // 构建创建inner table的ASTCreateQuery,主要是storage和columns
                        InterpreterCreateQuery::execute // 创建inner table
                            createTable
                                doCreateTable
                                    StorageFactory::instance().get
                                        StorageReplicatedMergeTree::create 
                                            StorageReplicatedMergeTree::StorageReplicatedMergeTree
                                    DatabaseOnDisk::createTable
                                        getObjectDefinitionFromCreateQuery // 提取元数据内容
                                        out.next() // 元数据保存到文件中(临时文件)
                                        commitCreateTable // 添加映射(uuid和表的映射)及重命名文件
                                    StorageReplicatedMergeTree::startup // 启动一些后台线程
                        target_table_id = inner table/TO表 的storageid
                        DatabaseCatalog::addDependency // <select中的table, 物化视图>的映射关系。至此,原始表 --> 物化视图 --> inner table/ TO 表
            DatabaseOnDisk::createTable
                ...
            StorageMaterializedView::startup // IStorage::startup空函数,啥也不做
        InterpreterCreateQuery::fillTableIfNeeded // 建立物化视图时加了populate,则需要插入数据

数据插入流程

TCPHandler::runImpl
    executeQuery // executeQueryImpl,总结来说就是构造了IBlockOutputStream,数据流向为:
        ast = parseQuery
        interpreter = InterpreterFactory::get(ast) // 实例化InterpreterInsertQuery
            InterpreterInsertQuery::InterpreterInsertQuery
        InterpreterInsertQuery::execute // 执行外层的insert
            BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>
                // 遍历原始表关联的物化视图 begin
                    insert = std::make_unique<ASTInsertQuery> // 构造insert,需要将数据插入到inner table
                    InterpreterSelectQuery // 根据inner query构造,获取as select的列,用于insert
                    InterpreterInsertQuery::execute // 向inner table插入数据
                        BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>
                            // 无关联的物化视图,不用再继续嵌套
                            output = storage->write // StorageReplicatedMergeTree::write,inner 表的write
                                std::make_shared<ReplicatedMergeTreeBlockOutputStream> // ReplicatedMergeTreeBlockOutputStream构造
                            replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get())
                        out = std::make_shared<AddingDefaultBlockOutputStream>(out, ...)
                        out_wrapper = std::make_shared<CountingBlockOutputStream>(out)
                // 遍历原始表关联的物化视图 end
                output = storage->write // StorageReplicatedMergeTree::write,原始表的write   
                replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get())
            out = std::make_shared<AddingDefaultBlockOutputStream>(out, ...)
            out_wrapper = std::make_shared<CountingBlockOutputStream>(out)
    
    processInsertQuery // 真正开始插入数据
        state.io.out->writePrefix() // 根据前面构造的OutputStream层层调用,最终通过MergeTreeData::delayInsertOrThrowIfNeeded校验是否可以insert
        sendData // 向客户端发送表结构
        readData // 真正接收数据
            readDataNext // 循环读取
                receivePacket // 类型为Protocol::Client::Data
                    receiveData
                        initBlockInput // state.block_in = NativeBlockInputStream  state.maybe_compressed_in = CompressedReadBuffer
                        state.block_in->read() // NativeBlockInputStream::readImpl  从socket中读取数据
                            readData // 反序列化数据
                        state.io.out->write // 
                            ...
                                PushingToViewsBlockOutputStream::write
                                    ReplicatedMergeTreeBlockOutputStream::write // 在原始表中写入
                                    // 遍历关联的物化视图begin
                                        PushingToViewsBlockOutputStream::process // 先select读取数据,再将数据写入到inner表
                                            result_block = in->read() // 读取原始表中的数据
                                                ...
                                                    PipelineExecutingBlockInputStream::readImpl
                                            view.out->write(result_block)
                                                ...
                                                    ReplicatedMergeTreeBlockOutputStream::write // 写入到inner表
                                    // 遍历关联的物化视图end
        state.io.out->writeSuffix()

其他

在InterpreterInsertQuery::execute流程中,是构造OutputStream呢?

// 第一次进InterpreterInsertQuery::execute
PushingToViewsBlockOutputStream
out = 第二次进的结果,即CountingBlockOutputStream
views.emplace_back(out)
output=ReplicatedMergeTreeBlockOutputStream
replicated_output=ReplicatedMergeTreeBlockOutputStream

AddingDefaultBlockOutputStream
output=PushingToViewsBlockOutputStream

CountingBlockOutputStream
stream=AddingDefaultBlockOutputStream

输出CountingBlockOutputStream


// 第二次进InterpreterInsertQuery::execute
PushingToViewsBlockOutputStream
output=ReplicatedMergeTreeBlockOutputStream
replicated_output=ReplicatedMergeTreeBlockOutputStream

AddingDefaultBlockOutputStream
output=PushingToViewsBlockOutputStream

CountingBlockOutputStream
stream=AddingDefaultBlockOutputStream

输出CountingBlockOutputStream

// 最终的OutputStream:
out=CountingBlockOutputStream --> AddingDefaultBlockOutputStream --> PushingToViewsBlockOutputStream
                                                                                                    .views --> CountingBlockOutputStream --> AddingDefaultBlockOutputStream --> PushingToViewsBlockOutputStream  --> .output=ReplicatedMergeTreeBlockOutputStream
                                                                                                    .output --> ReplicatedMergeTreeBlockOutputStream

向inner表插入数据时,构造InputStream

MaterializingBlockInputStream
children = select->execute().getInputStream() // PipelineExecutingBlockInputStream

SquashingBlockInputStream
children = MaterializingBlockInputStream

in = ConvertingBlockInputStream
children = SquashingBlockInputStream

最终的IntputStream:
in=ConvertingBlockInputStream --> SquashingBlockInputStream --> MaterializingBlockInputStream --> PipelineExecutingBlockInputStream

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。