ClickHouse源码分析:物化视图流程分析
【摘要】 主要分析物化视图的建立和数据插入流程,其中涉及到的更底层的数据读写流程,需要单独再仔细分析下。本次分析基于v21.3.4.25-lts版本代码。
主要分析物化视图的建立和数据插入流程,其中涉及到的更底层的数据读写流程,需要单独再仔细分析下。本次分析基于v21.3.4.25-lts版本代码。
以下面的例子为例。
CREATE TABLE local_table
(
did UInt64,
uid UInt64 DEFAULT (rand(1)%2)*did,
device_type String,
oper_type UInt8,
when DateTime,
duration Float32,
pt_d Date DEFAULT toDate(when)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/local_table', '{replica}')
PARTITION BY toYYYYMM(pt_d)
ORDER BY (uid, device_type, oper_type, pt_d);
CREATE MATERIALIZED VIEW view_table
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/view_table', '{replica}')
PARTITION BY toYYYYMM(pt_d)
ORDER BY did
AS SELECT
device_type,
oper_type,
pt_d,
did,
countState(did) AS did_count_state,
uniqExactState(did) AS did_uniqExact_state,
countState(uid) AS uid_count_state,
uniqExactState(uid) AS uid_uniqExact_state,
avgState(duration) AS duration_avg_state,
sumState(duration) AS duration_sum_state
FROM local_table
GROUP BY pt_d, device_type, oper_type, did;
insert into local_table values(1,2,'1',1,now(),1,toDate(now()))
物化视图类和对应引擎的注册
Server::main
registerStorages
registerStorageMergeTree
name = "ReplicatedAggregatingMergeTree"
Creator.creator_fn = StoragePtr create(args) // StorageReplicatedMergeTree::create
registerStorageMaterializedView
name = "MaterializedView"
Creator.creator_fn = [](const StorageFactory::Arguments & args) { return StorageMaterializedView::create(args) }
建表流程
InterpreterCreateQuery::execute
InterpreterCreateQuery::createTable
setProperties // 设置属性,如:列信息columns_list
InterpreterCreateQuery::doCreateTable
StorageFactory::instance().get // 第一次进来,根据query确定引擎为MaterializedView。 根据registerStorageMaterializedView中注册的函数创建IStorage
StorageMaterializedView::create(args) // shared_ptr_helper的create
StorageMaterializedView::StorageMaterializedView
getSelectQueryFromASTForMatView // 提取inner_query, select依赖的table id
std::make_shared<ASTCreateQuery>() // 构建创建inner table的ASTCreateQuery,主要是storage和columns
InterpreterCreateQuery::execute // 创建inner table
createTable
doCreateTable
StorageFactory::instance().get
StorageReplicatedMergeTree::create
StorageReplicatedMergeTree::StorageReplicatedMergeTree
DatabaseOnDisk::createTable
getObjectDefinitionFromCreateQuery // 提取元数据内容
out.next() // 元数据保存到文件中(临时文件)
commitCreateTable // 添加映射(uuid和表的映射)及重命名文件
StorageReplicatedMergeTree::startup // 启动一些后台线程
target_table_id = inner table/TO表 的storageid
DatabaseCatalog::addDependency // <select中的table, 物化视图>的映射关系。至此,原始表 --> 物化视图 --> inner table/ TO 表
DatabaseOnDisk::createTable
...
StorageMaterializedView::startup // IStorage::startup空函数,啥也不做
InterpreterCreateQuery::fillTableIfNeeded // 建立物化视图时加了populate,则需要插入数据
数据插入流程
TCPHandler::runImpl
executeQuery // executeQueryImpl,总结来说就是构造了IBlockOutputStream,数据流向为:
ast = parseQuery
interpreter = InterpreterFactory::get(ast) // 实例化InterpreterInsertQuery
InterpreterInsertQuery::InterpreterInsertQuery
InterpreterInsertQuery::execute // 执行外层的insert
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>
// 遍历原始表关联的物化视图 begin
insert = std::make_unique<ASTInsertQuery> // 构造insert,需要将数据插入到inner table
InterpreterSelectQuery // 根据inner query构造,获取as select的列,用于insert
InterpreterInsertQuery::execute // 向inner table插入数据
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>
// 无关联的物化视图,不用再继续嵌套
output = storage->write // StorageReplicatedMergeTree::write,inner 表的write
std::make_shared<ReplicatedMergeTreeBlockOutputStream> // ReplicatedMergeTreeBlockOutputStream构造
replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get())
out = std::make_shared<AddingDefaultBlockOutputStream>(out, ...)
out_wrapper = std::make_shared<CountingBlockOutputStream>(out)
// 遍历原始表关联的物化视图 end
output = storage->write // StorageReplicatedMergeTree::write,原始表的write
replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get())
out = std::make_shared<AddingDefaultBlockOutputStream>(out, ...)
out_wrapper = std::make_shared<CountingBlockOutputStream>(out)
processInsertQuery // 真正开始插入数据
state.io.out->writePrefix() // 根据前面构造的OutputStream层层调用,最终通过MergeTreeData::delayInsertOrThrowIfNeeded校验是否可以insert
sendData // 向客户端发送表结构
readData // 真正接收数据
readDataNext // 循环读取
receivePacket // 类型为Protocol::Client::Data
receiveData
initBlockInput // state.block_in = NativeBlockInputStream state.maybe_compressed_in = CompressedReadBuffer
state.block_in->read() // NativeBlockInputStream::readImpl 从socket中读取数据
readData // 反序列化数据
state.io.out->write //
...
PushingToViewsBlockOutputStream::write
ReplicatedMergeTreeBlockOutputStream::write // 在原始表中写入
// 遍历关联的物化视图begin
PushingToViewsBlockOutputStream::process // 先select读取数据,再将数据写入到inner表
result_block = in->read() // 读取原始表中的数据
...
PipelineExecutingBlockInputStream::readImpl
view.out->write(result_block)
...
ReplicatedMergeTreeBlockOutputStream::write // 写入到inner表
// 遍历关联的物化视图end
state.io.out->writeSuffix()
其他
在InterpreterInsertQuery::execute流程中,是构造OutputStream呢?
// 第一次进InterpreterInsertQuery::execute
PushingToViewsBlockOutputStream
out = 第二次进的结果,即CountingBlockOutputStream
views.emplace_back(out)
output=ReplicatedMergeTreeBlockOutputStream
replicated_output=ReplicatedMergeTreeBlockOutputStream
AddingDefaultBlockOutputStream
output=PushingToViewsBlockOutputStream
CountingBlockOutputStream
stream=AddingDefaultBlockOutputStream
输出CountingBlockOutputStream
// 第二次进InterpreterInsertQuery::execute
PushingToViewsBlockOutputStream
output=ReplicatedMergeTreeBlockOutputStream
replicated_output=ReplicatedMergeTreeBlockOutputStream
AddingDefaultBlockOutputStream
output=PushingToViewsBlockOutputStream
CountingBlockOutputStream
stream=AddingDefaultBlockOutputStream
输出CountingBlockOutputStream
// 最终的OutputStream:
out=CountingBlockOutputStream --> AddingDefaultBlockOutputStream --> PushingToViewsBlockOutputStream
.views --> CountingBlockOutputStream --> AddingDefaultBlockOutputStream --> PushingToViewsBlockOutputStream --> .output=ReplicatedMergeTreeBlockOutputStream
.output --> ReplicatedMergeTreeBlockOutputStream
向inner表插入数据时,构造InputStream
MaterializingBlockInputStream
children = select->execute().getInputStream() // PipelineExecutingBlockInputStream
SquashingBlockInputStream
children = MaterializingBlockInputStream
in = ConvertingBlockInputStream
children = SquashingBlockInputStream
最终的IntputStream:
in=ConvertingBlockInputStream --> SquashingBlockInputStream --> MaterializingBlockInputStream --> PipelineExecutingBlockInputStream
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)