为什么groupBitmap比uniqExact快?
为什么groupBitmap比uniqExact快?
在clickhouse中对比groupBitmap和uniqExact的执行速度,应该放到分布式查询中对比。我们99%的使用场景应该是分布式查询,同时如果把groupBitmap和uniqExact放到local表中执行,可能并不能明显看出执行速度上的差距,因为groupBitmap和uniqExact都是聚合函数,单节点的聚合耗时可能差别不大。
分布式查询耗时对比
测试环境:2分片*2 c3.2xlarge.4 8 vCPUs | 32 GB | 1000 GB 高IO * clickhouse 21.3.4.25
同样是使用ssb的lineorder表测试,在10亿级别高基维场景中观察SQL执行耗时:
内存消耗:
在分布式查询中,uniqExact不仅会比groupBitmap更加耗时,而且占用很大的内存空间。
执行trace日志分析耗时
我们通过客户端增加–send_logs_level trace配置可以将两个分片的执行日志截取到一个文件,对比两个聚合函数的执行日志找出耗时函数:Aggregator::mergeBlocks
[ClickHouseeSjN0001] 2021.11.23 20:14:28.854812 [ 505746 ] {4fcfa0e6-19ce-4f88-a32a-e210400d7bd7} <Trace> Aggregator: Merging partially aggregated blocks (bucket = -1).
[ClickHouseeSjN0001] 2021.11.23 20:14:31.455889 [ 505746 ] {4fcfa0e6-19ce-4f88-a32a-e210400d7bd7} <Debug> MemoryTracker: Current memory usage (for query): 5.01 GiB.
[ClickHouseeSjN0001] 2021.11.23 20:14:34.111041 [ 505746 ] {4fcfa0e6-19ce-4f88-a32a-e210400d7bd7} <Debug> MemoryTracker: Current memory usage (for query): 6.01 GiB.
[ClickHouseeSjN0001] 2021.11.23 20:14:50.403995 [ 505746 ] {4fcfa0e6-19ce-4f88-a32a-e210400d7bd7} <Trace> Aggregator: Merged partially aggregated blocks. 1 rows, 8.00 B. in 21.549123775 sec. (0.0464055991529521 rows/sec., 0.37 B/sec.)
[ClickHouseeSjN0001] 2021.11.23 20:14:50.404062 [ 505746 ] {4fcfa0e6-19ce-4f88-a32a-e210400d7bd7} <Trace> Aggregator: Destroying aggregate states
进一步增加日志结合代码流程分析得知:
uniqExact使用的是HashSetTable::merge, groupBitmap使用的是RoaringBitmapWithSmallSet::merge来合并来自其他servers的数据block,代码流程分布如下:
HashSetTable
void merge(const Self & rhs)
{
if (!this->hasZero() && rhs.hasZero())
{
this->setHasZero();
++this->m_size;
}
for (size_t i = 0; i < rhs.grower.bufSize(); ++i)
if (!rhs.buf[i].isZero(*this))
this->insert(rhs.buf[i].getValue());
}
RoaringBitmapWithSmallSet
void merge(const RoaringBitmapWithSmallSet & r1)
{
if (r1.isLarge())
{
if (isSmall())
toLarge();
*rb |= *r1.rb;
}
else
{
for (const auto & x : r1.small)
add(x.getValue());
}
}
uniqExact是使用一个hash表来合并聚合数据,利用hash表的唯一特性来精确去重,在10亿级别的for循环中十分耗时(gdb打印了rhs.grower.bufSize()的值为20亿,耗时2分钟左右),而RoaringBitmapWithSmallSet的merge在高基维场景下只做了一次或运算,耗时基本可以忽略。前面我们提到在单节点的local表执行中,不能直观看出两个聚合函数的耗时差距,是因为在local表的执行中不存在mergeBlocks的过程。local查询和分布式查询的行为不同是在执行计划生成阶段就已经决定,InterpreterSelectQuery::executeImpl根据表达式的状态不同,执行了不同的if分支。表达式状态简单表示如下:
表达式处理的两个阶段
struct ExpressionAnalysisResult
{
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
}
只有在second_stage && expressions.need_aggregate的状态下,才会调用InterpreterSelectQuery::executeMergeAggregated函数生成MergingAggregatedStep
分布式聚合函数的流程
对比本地查询的物理执行计划和分布式查询的物理执行计划:
-- 本地查询物理执行计划:(groupBitmap一致)
EXPLAIN PIPELINE
SELECT
countDistinct(LO_ORDERKEY),
groupBitmap(LO_ORDERKEY)
FROM DTDB.lineorder
SETTINGS count_distinct_implementation = 'uniqExact'
Query id: 5ba11745-80f6-47a2-9599-387b082a793d
┌─explain───────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ Resize 8 → 1 │
│ AggregatingTransform × 8 │
│ StrictResize 8 → 8 │
│ (Expression) │
│ ExpressionTransform × 8 │
│ (SettingQuotaAndLimits) │
│ (ReadFromStorage) │
│ MergeTreeThread × 8 0 → 1 │
└───────────────────────────────────────┘
-- 分布式查询物理执行计划:(groupBitmap一致)
EXPLAIN PIPELINE
SELECT
countDistinct(LO_ORDERKEY),
groupBitmap(LO_ORDERKEY)
FROM DTDB.lineorder_all
SETTINGS count_distinct_implementation = 'uniqExact'
Query id: 6109ee90-a73d-4ca5-ae3e-14309d3852dc
┌─explain─────────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (MergingAggregated) │
│ SortingAggregatedTransform 8 → 1 │
│ MergingAggregatedBucketTransform × 8 │
│ Resize 1 → 8 │
│ GroupingAggregatedTransform 2 → 1 │
│ (SettingQuotaAndLimits) │
│ (Union) │
│ (Expression) │
│ ExpressionTransform │
│ (Aggregating) │
│ Resize 8 → 1 │
│ AggregatingTransform × 8 │
│ StrictResize 8 → 8 │
│ (Expression) │
│ ExpressionTransform × 8 │
│ (SettingQuotaAndLimits) │
│ (ReadFromStorage) │
│ MergeTreeThread × 8 0 → 1 │
│ (ReadFromPreparedSource) │
│ Remote 0 → 1 │
└─────────────────────────────────────────────────────┘
分布式查询比本地查询多了GroupingAggregatedTransform、MergingAggregatedBucketTransform和SortingAggregatedTransform三个流程,其中mergeBlocks就是在MergingAggregatedBucketTransform算子中完成的。
gdb打印了HashSetTable::merge的bt流程帮助分析:
#0 HashSetTable<unsigned int, HashTableCell<unsigned int, HashCRC32<unsigned int>, HashTableNoState>, HashCRC32<unsigned int>, HashTableGrower<4ul>, AllocatorWithStackMemory<Allocator<true, true>, 64ul, 1ul> >::merge
#1 DB::AggregateFunctionUniq<unsigned int, DB::AggregateFunctionUniqExactData<unsigned int> >::merge
#2 DB::Aggregator::mergeWithoutKeyStreamsImpl
#3 DB::Aggregator::mergeBlocks
#4 DB::MergingAggregatedBucketTransform::transform
#5 DB::ISimpleTransform::transform
#6 DB::ISimpleTransform::work
#7 DB::executeJob
#8 DB::PipelineExecutor::addJob
#9 DB::PipelineExecutor::executeStepImpl
#10 DB::PipelineExecutor::executeSingleThread
#11 DB::PipelineExecutor::executeImpl
#12 ThreadPoolImpl<std::__1::thread>::worker
#13 ThreadPoolImpl<std::__1::thread>::scheduleImpl
GroupingAggregatedTransform、MergingAggregatedBucketTransform和SortingAggregatedTransform三个算子都在MergingAggregatedMemoryEfficientTransform.h文件中定义,截取部分英文说明如下:
* 2. Merge aggregation results for distributed query processing.
* Partially aggregated data arrives from different servers, which can be split down or not, into 256 buckets,
* and these buckets are passed to us by the network from each server in sequence, one by one.
* You should also read and merge by the buckets.
简单说明一下,如果是在分布式查询中,分布式SQL执行节点会等待来自不同服务器的半聚合数据,为了内存的使用效率,一般这些数据chunks会拆分成多个buckets发送到执行节点(个人理解这就是two-level),由执行节点进一步将这些半聚合数据通过mergeBlocks操作进行合并输出结果。
GroupingAggregatedTransform是在MergingAggregatedBucketTransform之前的算子,使用std::map<Int32, Chunks> chunks_map;作为中间存储,从input中读取数据并分组到chunks_map中。然后在GroupingAggregatedTransform::pushData中将每个bucket的chunks整合成一个新chunk输出到output,由MergingAggregatedBucketTransform算子进行计算。
问题:transform的prepare和work的作用分别是什么?
以GroupingAggregatedTransform为例,prepare负责一些简单的O(1)计算:将数据读取、简单分组、存储等等,然后由work来计算,并且将最终的结果push到output中。work不读取输入输出端口,只负责处理prepare好的数据。prepare中调用GroupingAggregatedTransform::addChunk时会根据bucket数量判断has_two_level是否为true
MergingAggregatedBucketTransform算子在transform函数中,将GroupingAggregatedTransform输出的chunk中(chunkInfo)的多个chunk取出,转换成多个block后,merge成的单个block,再次转换成一个输出chunk,输出到SortingAggregatedTransform算子进行排序。
clickhouse的pipeline过程读起来感觉有些别捏,transform算子之间以input port和output port相连,chunk作为传输数据单位,在单个transform中会将chunk转成block进行计算。
two-level Aggregation
在分析单个节点的聚合过程中,涉及到两级聚合的概念,不是很明白,下面进一步分析。
-- 执行以下SQL,可以发现two-level的日志
$CLICKHOUSE_HOME/clickhouse client --host 192.168.0.135 --send_logs_level trace -q "select countDistinct(LO_ORDERKEY) from DTDB.lineorder group by LO_ORDERKEY limit 1"
[ecs-c00520436] 2021.12.01 21:02:54.585819 [ 12653 ] {23e8acb2-f7ed-4e57-8760-654f17f5e9fd} <Trace> AggregatingTransform: Aggregating
[ecs-c00520436] 2021.12.01 21:02:54.585850 [ 12653 ] {23e8acb2-f7ed-4e57-8760-654f17f5e9fd} <Trace> Aggregator: Aggregation method: key32
[ecs-c00520436] 2021.12.01 21:02:54.614482 [ 12653 ] {23e8acb2-f7ed-4e57-8760-654f17f5e9fd} <Trace> Aggregator: Converting aggregation data to two-level.
[ecs-c00520436] 2021.12.01 21:02:55.450613 [ 12653 ] {23e8acb2-f7ed-4e57-8760-654f17f5e9fd} <Trace> AggregatingTransform: Aggregated. 8631556 to 2158308 rows (from 32.93 MiB) in 0.865920869 sec. (9968065.569280095 rows/sec., 38.03 MiB/sec.)
-- 如果把group by字段换成LO_SUPPKEY等key32的字段,则出现two-level.
-- 如果把group by字段换成LO_LINENUMBER、LO_DISCOUNT等非key32的字段,则不会出现two-level.
-- 具体的判断逻辑还需进一步确认
单节点聚合调用链
// AggregatingTransform::work
// AggregatingTransform::consume
// Aggregator::executeOnBlock
聚合计算之前Aggregator::chooseAggregationMethod可以选择聚合方法
/** Returns ordinary (not two-level) methods, because we start from them.
* Later, during aggregation process, data may be converted (partitioned) to two-level structure, if cardinality is high.
*/
chooseAggregationMethod并不会直接选择two-level方法,而是选择普通的类似key32、key8这种方法,然后在聚合过程中(下述executeOnBlock)将聚合数据的结果转换成two-level
主要的聚合逻辑在Aggregator::executeOnBlock
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
if (isCancelled())
return true;
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
/// We select one of the aggregation methods and call it.
/// For the case when there are no keys (all aggregate into one row).
if (result.type == AggregatedDataVariants::Type::without_key) // 如果没有group by字段则选择without_key方法进行聚合
{
executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
}
else // 根据group by字段则选择key32/key16/key8等方法进行聚合
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(), \
no_more_keys, overflow_row_ptr);
if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
/* 下面判断是否值得转成两级,如果聚合的结果大小已经超过了配额,则认为是值得将聚合结果转换成两级,设置 worth_convert_to_two_level=true*/
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
/// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
bool worth_convert_to_two_level
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>
(params.group_by_two_level_threshold_bytes));
/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
*/
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level) // 这里将聚合结果转成两级,不是是不是所谓的two-level Aggregation?
result.convertToTwoLevel();
}
// isConvertibleToTwoLevel中对key类型做了限制,因此group by类型为key8、key16时,看不到“Converting aggregation data to two-level”是正常的现象
一句话理解two-level
在单节点执行聚合过程中,会根据group by字段选择不同的聚合方法(key32/key16等等,对应不同的HashTable),聚合出结果时则根据聚合方法和聚合结果占用内存的大小决定是否将聚合数据转换成two-level hashtable;如果是在分布式查询中,不同servers的 two-level数据都会分组成多个bucket传输到执行节点,执行节点将不同severs传输过来的半聚合态数据进行分组合并,进一步聚合。对于two-level数据,分组合并有利于提高内存使用效率,降低执行节点的内存。
——结束
- 点赞
- 收藏
- 关注作者
评论(0)