GaussDB(DWS)分布式计算的倾斜优化

举报
譡里个檔 发表于 2021/04/01 15:52:29 2021/04/01
【摘要】 简单介绍GaussDB(DWS)倾斜优化机制

数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,造成以下危害:

  • 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。
  • 计算上的倾斜会严重影响系统性能,由于倾斜节点所需要运算的数据量远大于其它节点,导致倾斜节点计算,降低作业整体性能。
  • 数据倾斜还严重影响了MPP架构的扩展性。由于在存储或者计算时,往往会将相同值的数据放到同一节点,当出现大量计算倾斜时,系统瓶颈受限于倾斜节点的容量或者性能,扩容无法达到线性或者近似线性的扩招效果

GaussDB(DWS)通过多分布列的形式可以有效的解决存储倾斜的场景,本文主要介绍GaussDB(DWS)针对数据计算倾斜的解决方案和实际效果


1. 预置条件

CREATE TABLE t1(a int, b int) DISTRIBUTE BY HASH(a);
CREATE TABLE t2(a int, b int) DISTRIBUTE BY HASH(a);

INSERT INTO t1 VALUES (generate_series(1, 1000), generate_series(1, 1000));
INSERT INTO t1 SELECT a+1000*2, b+1000*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2, b+1000*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2, b+1000*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2, b+1000*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2, b+1000*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2, b+1000*2*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2*2, b+1000*2*2*2*2*2*2*2 FROM t1;
INSERT INTO t2 SELECT a, 1 FROM t1;

ANALYZE t1;
ANALYZE t2;


2. 执行概述

以如下SQL为例,分析普通执行机制和倾斜优化执行机制的差异

SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;



2.1 非倾斜优化执行机制

GaussDB(DWS)默认采取倾斜优化执行机制。在生成执行计划阶段,优化器根据统计信息识别join列上是否存在倾斜情况,如果存在倾斜,则自动生成倾斜优化计划。同时GaussDB(DWS)提供了配置参数skew_option,设置此参数为off之后,优化器会skip倾斜优化计划。

postgres=# SET skew_option TO off;
SET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
                                                                    QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
  id |                 operation                  |       A-time       | A-rows | E-rows | Peak Memory  | E-memory | A-width | E-width | E-costs
 ----+--------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+----------
   1 | ->  Aggregate                              | 371.797            |      1 |      1 | 10KB         |          |         |       8 | 14928.22
   2 |    ->  Streaming (type: GATHER)            | 371.693            |      4 |      4 | 79KB         |          |         |       8 | 14928.22
   3 |       ->  Aggregate                        | [236.825, 350.166] |      4 |      4 | [10KB, 10KB] | 1MB      |         |       8 | 14918.22
   4 |          ->  Hash Join (5,7)               | [236.817, 298.880] | 128000 | 128000 | [4KB, 5KB]   | 1MB      |         |       0 | 14838.21
   5 |             ->  Streaming(type: BROADCAST) | [82.726, 144.617]  | 512000 | 512000 | [53KB, 53KB] | 2MB      |         |       4 | 13168.21
   6 |                ->  Seq Scan on t2          | [15.528, 25.093]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 469.00
   7 |             ->  Hash                       | [35.826, 69.033]   | 128000 | 128000 | [1MB, 1MB]   | 16MB     | [20,20] |       4 | 470.00
   8 |                ->  Seq Scan on t1          | [17.163, 35.748]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 470.00

 Predicate Information (identified by plan id)
 ---------------------------------------------
   4 --Hash Join (5,7)
         Hash Cond: (t2.b = t1.b)

              Memory Information (identified by plan id)
 --------------------------------------------------------------------
 Coordinator Query Peak Memory:
         Query Peak Memory: 1MB
 Datanode:
         Max Query Peak Memory: 4MB
         Min Query Peak Memory: 4MB
   7 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 1130kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 1120kB

                      User Define Profiling
 ----------------------------------------------------------------
 Segment Id: 3  Track name: Datanode build connection
  (actual time=[0.759, 15.526], calls=[1, 1])
 Segment Id: 3  Track name: Datanode wait connection
  (actual time=[0.001, 0.638], calls=[1, 1])
 Plan Node id: 2  Track name: coordinator get datanode connection
  (actual time=[0.032, 0.032], calls=[1, 1])

                           ====== Query Summary =====
 -------------------------------------------------------------------------------
 Datanode executor start time [dn_6003_6004, dn_6001_6002]: [2.036 ms,16.202 ms]
 Datanode executor end time [dn_6007_6008, dn_6001_6002]: [0.606 ms,1.310 ms]
 System available mem: 3112960KB
 Query Max mem: 3112960KB
 Query estimated mem: 6690KB
 Coordinator executor start time: 0.529 ms
 Coordinator executor run time: 372.044 ms
 Coordinator executor end time: 0.436 ms
 Planner runtime: 1.156 ms
 Query Id: 72339069014639413
 Total runtime: 373.079 ms
(49 rows)

正常机制下,t1t2表都会按照b列做数据重分布,重分布之后在各个DN上进行JOIN操作。但是表t2的数据比较特殊,t2的字段b上值为1比例高达100%重分布之后值1所在DN上的计算量会比承载所有的计算量,这种就是非常极端的计算倾斜的场景,我们一般称之为计算倾斜。计算倾斜一般会导致两个问题

1)重负载DN上资源开销比较大,导致集群负载不均衡

2)重负载DN上计算耗时较长,短板效应导致SQL执行时间变长

针对这种场景GaussDB(DWS)进行了特殊的倾斜执行优化


2.2 倾斜优化执行机制


postgres=# RESET skew_option;
RESET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
                                                                                 QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  id |                              operation                               |       A-time       | A-rows | E-rows | Peak Memory  | E-memory | A-width | E-width | E-costs
 ----+----------------------------------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+---------
   1 | ->  Aggregate                                                        | 182.837            |      1 |      1 | 10KB         |          |         |       8 | 7309.01
   2 |    ->  Streaming (type: GATHER)                                      | 182.748            |      4 |      4 | 79KB         |          |         |       8 | 7309.01
   3 |       ->  Aggregate                                                  | [158.711, 169.947] |      4 |      4 | [10KB, 10KB] | 1MB      |         |       8 | 7299.01
   4 |          ->  Hash Join (5,7)                                         | [147.187, 156.273] | 128000 | 128000 | [5KB, 5KB]   | 1MB      |         |       0 | 7219.00
   5 |             ->  Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)   | [16.308, 23.164]   | 128000 | 128000 | [53KB, 53KB] | 2MB      |         |       4 | 3189.00
   6 |                ->  Seq Scan on t2                                    | [14.062, 17.578]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 469.00
   7 |             ->  Hash                                                 | [87.374, 92.295]   | 128003 | 128000 | [1MB, 1MB]   | 16MB     | [20,20] |       4 | 3190.00
   8 |                ->  Streaming(type: PART REDISTRIBUTE PART BROADCAST) | [51.194, 56.189]   | 128003 | 128000 | [61KB, 61KB] | 2MB      |         |       4 | 3190.00
   9 |                   ->  Seq Scan on t1                                 | [13.989, 15.170]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 470.00

      Predicate Information (identified by plan id)
 --------------------------------------------------------
   4 --Hash Join (5,7)
         Hash Cond: (t2.b = t1.b)
         Skew Join Optimized by Statistic
   5 --Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)
         Skew Filter: (b = 1)
   8 --Streaming(type: PART REDISTRIBUTE PART BROADCAST)
         Skew Filter: (b = 1)

              Memory Information (identified by plan id)
 --------------------------------------------------------------------
 Coordinator Query Peak Memory:
         Query Peak Memory: 1MB
 Datanode:
         Max Query Peak Memory: 6MB
         Min Query Peak Memory: 6MB
   7 --Hash
         Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 1131kB
         Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 1120kB

                      User Define Profiling
 ----------------------------------------------------------------
 Segment Id: 3  Track name: Datanode build connection
  (actual time=[2.924, 7.101], calls=[1, 1])
 Segment Id: 3  Track name: Datanode wait connection
  (actual time=[0.000, 1.478], calls=[1, 1])
 Plan Node id: 2  Track name: coordinator get datanode connection
  (actual time=[0.025, 0.025], calls=[1, 1])

                           ====== Query Summary =====
 ------------------------------------------------------------------------------
 Datanode executor start time [dn_6005_6006, dn_6003_6004]: [3.544 ms,8.513 ms]
 Datanode executor end time [dn_6005_6006, dn_6001_6002]: [0.450 ms,0.576 ms]
 System available mem: 3112960KB
 Query Max mem: 3112960KB
 Query estimated mem: 8802KB
 Coordinator executor start time: 0.665 ms
 Coordinator executor run time: 183.161 ms
 Coordinator executor end time: 0.607 ms
 Planner runtime: 1.517 ms
 Query Id: 72339069014639134
 Total runtime: 184.519 ms

针对计算倾斜场景,在数据重分布的时候对倾斜值进行特殊优化。此查询中

  1. JOIN的外表t2的字段b在值1上有倾斜
  2. 内外表除了1之外的值按照正常机制进行分布和计算
  3.  对于外表(t2)上重分布字段b为1的值,进行roundrobin分布(均匀打散到各个DN上)
  4. 对于内表(t1)上重分布字段b为1的值,进行broadcast分布(每个DN都保留一份)

注:除非t1表是复制分布,否则PART ROUNDROBINPART BROADCAST总是配对出现


通过这种分布机制,可以把外表重分布字段b1的值的计算压力均摊到各个DN上,从而避免计算倾斜问题。从上述也可以看出比较明显的性能优化效果(373.079 ms-> 184.519 ms )


想了解GuassDB(DWS)更多信息,欢迎微信搜索“GaussDB DWS”关注微信公众号,和您分享最新最全的PB级数仓黑科技,后台还可获取众多学习资料哦~

图标--大尾标.png






【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。