GaussDB(DWS)分布式计算的倾斜优化
数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,造成以下危害:
- 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。
- 计算上的倾斜会严重影响系统性能,由于倾斜节点所需要运算的数据量远大于其它节点,导致倾斜节点计算,降低作业整体性能。
- 数据倾斜还严重影响了MPP架构的扩展性。由于在存储或者计算时,往往会将相同值的数据放到同一节点,当出现大量计算倾斜时,系统瓶颈受限于倾斜节点的容量或者性能,扩容无法达到线性或者近似线性的扩招效果
GaussDB(DWS)通过多分布列的形式可以有效的解决存储倾斜的场景,本文主要介绍GaussDB(DWS)针对数据计算倾斜的解决方案和实际效果
1. 预置条件
CREATE TABLE t1(a int, b int) DISTRIBUTE BY HASH(a);
CREATE TABLE t2(a int, b int) DISTRIBUTE BY HASH(a);
INSERT INTO t1 VALUES (generate_series(1, 1000), generate_series(1, 1000));
INSERT INTO t1 SELECT a+1000*2, b+1000*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2, b+1000*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2, b+1000*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2, b+1000*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2, b+1000*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2, b+1000*2*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2*2, b+1000*2*2*2*2*2*2*2 FROM t1;
INSERT INTO t2 SELECT a, 1 FROM t1;
ANALYZE t1;
ANALYZE t2;
2. 执行概述
以如下SQL为例,分析普通执行机制和倾斜优化执行机制的差异
SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
2.1 非倾斜优化执行机制
GaussDB(DWS)默认采取倾斜优化执行机制。在生成执行计划阶段,优化器根据统计信息识别join列上是否存在倾斜情况,如果存在倾斜,则自动生成倾斜优化计划。同时GaussDB(DWS)提供了配置参数skew_option,设置此参数为off之后,优化器会skip倾斜优化计划。
postgres=# SET skew_option TO off;
SET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
id | operation | A-time | A-rows | E-rows | Peak Memory | E-memory | A-width | E-width | E-costs
----+--------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+----------
1 | -> Aggregate | 371.797 | 1 | 1 | 10KB | | | 8 | 14928.22
2 | -> Streaming (type: GATHER) | 371.693 | 4 | 4 | 79KB | | | 8 | 14928.22
3 | -> Aggregate | [236.825, 350.166] | 4 | 4 | [10KB, 10KB] | 1MB | | 8 | 14918.22
4 | -> Hash Join (5,7) | [236.817, 298.880] | 128000 | 128000 | [4KB, 5KB] | 1MB | | 0 | 14838.21
5 | -> Streaming(type: BROADCAST) | [82.726, 144.617] | 512000 | 512000 | [53KB, 53KB] | 2MB | | 4 | 13168.21
6 | -> Seq Scan on t2 | [15.528, 25.093] | 128000 | 128000 | [12KB, 12KB] | 1MB | | 4 | 469.00
7 | -> Hash | [35.826, 69.033] | 128000 | 128000 | [1MB, 1MB] | 16MB | [20,20] | 4 | 470.00
8 | -> Seq Scan on t1 | [17.163, 35.748] | 128000 | 128000 | [12KB, 12KB] | 1MB | | 4 | 470.00
Predicate Information (identified by plan id)
---------------------------------------------
4 --Hash Join (5,7)
Hash Cond: (t2.b = t1.b)
Memory Information (identified by plan id)
--------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 1MB
Datanode:
Max Query Peak Memory: 4MB
Min Query Peak Memory: 4MB
7 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 1130kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 1120kB
User Define Profiling
----------------------------------------------------------------
Segment Id: 3 Track name: Datanode build connection
(actual time=[0.759, 15.526], calls=[1, 1])
Segment Id: 3 Track name: Datanode wait connection
(actual time=[0.001, 0.638], calls=[1, 1])
Plan Node id: 2 Track name: coordinator get datanode connection
(actual time=[0.032, 0.032], calls=[1, 1])
====== Query Summary =====
-------------------------------------------------------------------------------
Datanode executor start time [dn_6003_6004, dn_6001_6002]: [2.036 ms,16.202 ms]
Datanode executor end time [dn_6007_6008, dn_6001_6002]: [0.606 ms,1.310 ms]
System available mem: 3112960KB
Query Max mem: 3112960KB
Query estimated mem: 6690KB
Coordinator executor start time: 0.529 ms
Coordinator executor run time: 372.044 ms
Coordinator executor end time: 0.436 ms
Planner runtime: 1.156 ms
Query Id: 72339069014639413
Total runtime: 373.079 ms
(49 rows)
正常机制下,t1和t2表都会按照b列做数据重分布,重分布之后在各个DN上进行JOIN操作。但是表t2的数据比较特殊,t2的字段b上值为1比例高达100%,重分布之后值1所在DN上的计算量会比承载所有的计算量,这种就是非常极端的计算倾斜的场景,我们一般称之为计算倾斜。计算倾斜一般会导致两个问题
1)重负载DN上资源开销比较大,导致集群负载不均衡
2)重负载DN上计算耗时较长,短板效应导致SQL执行时间变长
针对这种场景GaussDB(DWS)进行了特殊的倾斜执行优化
2.2 倾斜优化执行机制
postgres=# RESET skew_option;
RESET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | operation | A-time | A-rows | E-rows | Peak Memory | E-memory | A-width | E-width | E-costs
----+----------------------------------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+---------
1 | -> Aggregate | 182.837 | 1 | 1 | 10KB | | | 8 | 7309.01
2 | -> Streaming (type: GATHER) | 182.748 | 4 | 4 | 79KB | | | 8 | 7309.01
3 | -> Aggregate | [158.711, 169.947] | 4 | 4 | [10KB, 10KB] | 1MB | | 8 | 7299.01
4 | -> Hash Join (5,7) | [147.187, 156.273] | 128000 | 128000 | [5KB, 5KB] | 1MB | | 0 | 7219.00
5 | -> Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN) | [16.308, 23.164] | 128000 | 128000 | [53KB, 53KB] | 2MB | | 4 | 3189.00
6 | -> Seq Scan on t2 | [14.062, 17.578] | 128000 | 128000 | [12KB, 12KB] | 1MB | | 4 | 469.00
7 | -> Hash | [87.374, 92.295] | 128003 | 128000 | [1MB, 1MB] | 16MB | [20,20] | 4 | 3190.00
8 | -> Streaming(type: PART REDISTRIBUTE PART BROADCAST) | [51.194, 56.189] | 128003 | 128000 | [61KB, 61KB] | 2MB | | 4 | 3190.00
9 | -> Seq Scan on t1 | [13.989, 15.170] | 128000 | 128000 | [12KB, 12KB] | 1MB | | 4 | 470.00
Predicate Information (identified by plan id)
--------------------------------------------------------
4 --Hash Join (5,7)
Hash Cond: (t2.b = t1.b)
Skew Join Optimized by Statistic
5 --Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)
Skew Filter: (b = 1)
8 --Streaming(type: PART REDISTRIBUTE PART BROADCAST)
Skew Filter: (b = 1)
Memory Information (identified by plan id)
--------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 1MB
Datanode:
Max Query Peak Memory: 6MB
Min Query Peak Memory: 6MB
7 --Hash
Max Buckets: 32768 Max Batches: 1 Max Memory Usage: 1131kB
Min Buckets: 32768 Min Batches: 1 Min Memory Usage: 1120kB
User Define Profiling
----------------------------------------------------------------
Segment Id: 3 Track name: Datanode build connection
(actual time=[2.924, 7.101], calls=[1, 1])
Segment Id: 3 Track name: Datanode wait connection
(actual time=[0.000, 1.478], calls=[1, 1])
Plan Node id: 2 Track name: coordinator get datanode connection
(actual time=[0.025, 0.025], calls=[1, 1])
====== Query Summary =====
------------------------------------------------------------------------------
Datanode executor start time [dn_6005_6006, dn_6003_6004]: [3.544 ms,8.513 ms]
Datanode executor end time [dn_6005_6006, dn_6001_6002]: [0.450 ms,0.576 ms]
System available mem: 3112960KB
Query Max mem: 3112960KB
Query estimated mem: 8802KB
Coordinator executor start time: 0.665 ms
Coordinator executor run time: 183.161 ms
Coordinator executor end time: 0.607 ms
Planner runtime: 1.517 ms
Query Id: 72339069014639134
Total runtime: 184.519 ms
针对计算倾斜场景,在数据重分布的时候对倾斜值进行特殊优化。此查询中
- JOIN的外表t2的字段b在值1上有倾斜
- 内外表除了1之外的值按照正常机制进行分布和计算
- 对于外表(t2)上重分布字段b为1的值,进行roundrobin分布(均匀打散到各个DN上)
- 对于内表(t1)上重分布字段b为1的值,进行broadcast分布(每个DN都保留一份)
注:除非t1表是复制分布,否则PART ROUNDROBIN和PART BROADCAST总是配对出现
通过这种分布机制,可以把外表重分布字段b为1的值的计算压力均摊到各个DN上,从而避免计算倾斜问题。从上述也可以看出比较明显的性能优化效果(373.079 ms-> 184.519 ms )
想了解GuassDB(DWS)更多信息,欢迎微信搜索“GaussDB DWS”关注微信公众号,和您分享最新最全的PB级数仓黑科技,后台还可获取众多学习资料哦~
- 点赞
- 收藏
- 关注作者
评论(0)