ClickHouse源码分析:逻辑计划和Pipeline

举报
ZhjDayDayUp 发表于 2021/12/06 22:25:30 2021/12/06
【摘要】 基于clickhouse的v21.3.4.35-lts版本,分析了sql执行的逻辑计划树和Pipeline的生成和执行过程。

基于clickhouse的版本:v21.3.4.35-lts

# 以TCP连接为例 select id from query_cache;
TCPHandler::runImpl
    BlockIO executeQuery // 解析query
        executeQueryImpl
            ast = parseQuery // 解析生成ast
                ...
            interpreter = InterpreterFactory::get // 根据ast实例化interpreter
            interpreter->execute() // InterpreterSelectWithUnionQuery::execute
                buildQueryPlan // 构建query paln 如 InterpreterSelectQuery::buildQueryPlan
                    InterpreterSelectQuery::executeImpl
                        executeFetchColumns
                            storage->read // StorageMergeTree::read --> MergeTreeDataSelectExecutor::read
                                StorageMergeTree::read
                                    MergeTreeDataSelectExecutor::read
                                        MergeTreeDataSelectExecutor::readFromParts // 根据一定条件选择需要读取的part,这里我们过滤掉如何选择的流程,直接去看怎么构建query plan
                                            spreadMarkRangesAmongStreams // 构建queryplan
                                                MergeTreeThreadSelectBlockInputProcessor
                                                unitePipes // 多个pipe绑定为一个pipe
                                                createPlanFromPipe // step = std::make_unique<ReadFromStorageStep>; plan->addStep(step); processor为MergeTreeThreadSelectBlockInputProcessor
                            query_plan.addStep // 增加step SettingQuotaAndLimits
                        executeExpression // 增加step ExpressionStep
                        executeProjection // 增加step ExpressionStep
                buildQueryPipeline // 构建pipeline
                    updatePipeline // ITransformingStep::updatePipeline-->ExpressionStep::transformPipeline-->addSimpleTransform-->collected_processors指向的processors和pipe.processors都增加processor; 即ExpressionTransform
            pipeline.addSimpleTransform // 增加processor LimitsCheckingTransform
    processOrdinaryQueryWithProcessors
        sendData(header); // 先向client发送header
        PullingAsyncPipelineExecutor executor(pipeline); // 构建一个执行器。 make_shared<LazyOutputFormat>
        PullingAsyncPipelineExecutor::pull
            threadFunction
                PipelineExecutor::execute
                    PipelineExecutor::executeImpl
                        initializeExecution
                            prepareProcessor
                                processor->prepare // 执行pushdata和pulldata操作
                        executeSingleThread
                            executeStepImpl // 根据线程号,弹出任务
                                addJob // 初始化node->job
                                node->job()
                                    executeJob
                                        processor->work() // 各个processor执行
                                            ...
    state.io.onFinish() // log和状态记录

类图如下

逻辑计划和Pipeline.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。