GaussDB(DWS)的stream执行机制

举报
wangxiaojuan8 发表于 2020/06/19 10:27:47 2020/06/19
【摘要】 GaussDB 的Stream机制

1. 前言

  • 适用版本:【8.1.0(及以上)】

介绍GaussDB(DWS)分布式架构的stream算子三种模式:Gather、Redistribute、Broadcast。

2. GaussDB(DWS)架构简介

  近期刚刚接触GaussDB(DWS),它是一个shared nothing的分布式架构,下图是一个简化的GaussDB(DWS)架构图。

    

  其中各组件的功能如下:

  • CN(Coordinator Node)为对外服务和协调节点,负责接受客户端连接以及对用户SQL命令解析下发,并收集DN节点执行的结果进行汇总,将结果展现给客户端

  • DN(Datanode)为内部数据节点,承载内部数据存储及计算单元的功能。

  • GTM(Global Transaction Manager)负责集群全局事务控制,其与Datanode均有本地主备双机功能。

3. 分布式执行框架

  对应于分布式架构,GaussDB(DWS)提供了分布式执行框架,这也是GaussDB(DWS)中最核心的技术,旨在充分利用DN的资源,尽量将计算下推到DN进行,避免CN成为瓶颈,以提升查询效率和系统扩展性。

  分布式执行框架的技术特点如下:

  • CN负责查询请求的解析、基于代价进行优化以及向DN进行任务下发,并收集DN节点执行的结果进行汇总

  • DN上运行执行计划进程,基于本节点存储的数据执行任务

  • 执行过程中每个算子都是接收下级算子的数据输入,并向上级算子输出数据。是一个生产者—消费者的流水线工作模型

  针对分布式框架,GaussDB(DWS)增加了支撑分布式计划的Stream算子。Stream算子有三种类型:

  • Gather StreamN:1):每个源节点都将其数据发送给目标节点

            


  • Redistribute StreamN:N):每个源节点将其数据根据连接条件计算Hash值,根据重新计算的Hash值进行分布,发给对应的目标节点

    

  • Broadcast Stream1:N):有一个源节点将其数据发给N个目标节点

    

  下面通过tpch中的Q1和Q2为例,使用explain查看这两个SQL的执行计划来认识一下这几个算子。

4. 查看执行计划

4.1 查看与Gather Stream和Redistribute Stream算子相关的执行计划

  以tpch的Q1为例说明,Q1的SQL语句如下

SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看
EXPLAIN
SELECT
         l_returnflag,
         l_linestatus,
         SUM(l_quantity) as sum_qty,
         SUM (l_extendedprice) as sum_base_price,
         SUM (l_extendedprice * (1 - l_discount)) as sum_disc_price,
         SUM (l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
         AVG(l_quantity) as avg_qty,
         AVG (l_extendedprice) as avg_price,
         AVG (l_discount) as avg_disc,
         COUNT(*) as count_order
FROM
         row_engine.lineitem
WHERE
         l_shipdate <= DATE '1998-12-01' - INTERVAL '3 day'
GROUP BY
         l_returnflag,
         l_linestatus
ORDER BY
         l_returnflag,
         l_linestatus;

      执行结果如下:

 id |                 operation                  | E-rows  | E-memory | E-width |  E-costs  
----+--------------------------------------------+---------+----------+---------+-----------
  1 | ->  Streaming (type: GATHER)               |       5 |          |     257 | 146811.70
  2 |    ->  Sort                                |       5 | 16MB     |     257 | 146811.20
  3 |       ->  HashAggregate                    |       6 | 16MB     |     257 | 146811.19
  4 |          ->  Streaming(type: REDISTRIBUTE) |      18 | 2MB      |     257 | 146810.87
  5 |             ->  HashAggregate              |      18 | 16MB     |     257 | 146809.83
  6 |                ->  Seq Scan on lineitem    | 6001622 | 1MB      |      25 | 66788.09
(6 rows)

                   Predicate Information (identified by plan id)                    
------------------------------------------------------------------------------------
   3 --HashAggregate
         Skew Agg Optimized by Statistic
   6 --Seq Scan on lineitem
         Filter: (l_shipdate <= '1998-11-28 00:00:00'::timestamp without time zone)
(4 rows)

   ====== Query Summary =====    
---------------------------------
 System available mem: 9252864KB
 Query Max mem: 9473228KB
 Query estimated mem: 3123KB
(3 rows)

  为了便于说明,我在执行计划的每一行的行首加了一个序号。下面对这个计划进行一下说明:

  1. 从第2行的Sort算子开始直到第6行,是CN下发到在DN上执行的部分;

  2. 1是在CN上执行

  3. 执行顺序

    • 首先从id为6的行开始,对lineitem表执行seqscan

    • id为5的行,对下层扫描得到的数据根据GROUP BY分组键执行hashaggregate操作

    • id为4的行,是一个Redistribute Stream算子,它将在DN上的local数据执行agg后结果,重分布给其他DN

    • id为3的行,各DN收到其他DN上的数据,重新做agg

    • id为2的行,对下层agg结果进行排序

    • id为1的行,是一个Gather Stream算子,CN节点收到DN返回的结果,汇集后将最终结果展示给客户端

   

   在这个执行计划中,我们看到了两个和Stream算子相关的节点:

  1. Gather Stream,用于CN收集DN的结果

  2. Redistribute Stream,用于将DN上的数据重分布给其他DN做HashAgg。

  这里有两个问题:

  1. Q1查询是一个单表查询,为什么需要在DN之间重分布数据呢?

    tpch的Q1是对lineitem表的单表查询,进行分组聚集计算。因为lineitem表是按照l_orderkey列作为分布列,将数据分布到各DN的。而聚集操作分组键是l_returnflagl_linestatus,而不 是分布键l_orderkey,所以,属于同一个分组的数据分布在不同的DN上,这就需要各DN重新按照分组键作为分布键,将数据分发给其他DN,使得各DN上拥有相同分组键的数据完成聚集操作。

  2. 从以上的计划中,可以看到在DN上先做了hashagg,再又重分发给其他DN,再次做hashagg,如下图中的右侧。为什么不是先把数据重分布发给其他DN后,做一次hashagg即可,如下图的左侧

    

   这两个计划的区别在于:如果scan后数据量非常大,而聚集后数据量比较小的情况,就适合用第二个计划,可以减少DN之间的数据流动,降低网络开销。从上面Q1的执行计划中,可以明显看到,Seq Scan on lineitem对应的E-rows为6001622,seqscan后的数据估计有6001622行,重分布之前HashAggregate的的结果数据估计只有18行,数据量大大减少。根据代价估算,重分布前做一次hashagg的代价,比直接重分布scan的结果的代价要小很多,故选择了重分布前先做一次hashagg。


4.2 查看与Gather Stream和Broadcast Stream算子相关的执行计划

    以tpch的Q2为例说明,Q2的SQL语句如下

SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看
EXPLAIN
    SELECT
	s_acctbal,
	s_name,
	n_name,
	p_partkey,
	p_mfgr,
	s_address,
	s_phone,
	s_comment
    FROM
	row_engine.part,
	row_engine.supplier,
	row_engine.partsupp,
	row_engine.nation,
	row_engine.region
    WHERE
	p_partkey = ps_partkey
	AND s_suppkey = ps_suppkey
	AND p_size = 15
	AND p_type LIKE 'SMALL%'
	AND s_nationkey = n_nationkey
	AND n_regionkey = r_regionkey
	AND r_name = 'EUROPE '
	AND ps_supplycost = (
		SELECT
			MIN(ps_supplycost)
		FROM
			row_engine.partsupp,
			row_engine.supplier,
			row_engine.nation,
			row_engine.region
		WHERE
			p_partkey = ps_partkey
			AND s_suppkey = ps_suppkey
			AND s_nationkey = n_nationkey
			AND n_regionkey = r_regionkey
			AND r_name = 'EUROPE '
	)
ORDER BY
	s_acctbal DESC,
	n_name,
	s_name,
	p_partkey
LIMIT 100;

   执行结果如下:

  id |                                            operation                                             | E-rows | E-memory | E-width | E-costs  
   ----+--------------------------------------------------------------------------------------------------+--------+----------+---------+----------
  1 | ->  Limit                                                                                        |      3 |          |     192 | 23979.08
  2 |    ->  Streaming (type: GATHER)                                                                  |      3 |          |     192 | 23979.08
  3 |       ->  Limit                                                                                  |      3 | 1MB      |     192 | 23978.89
  4 |          ->  Sort                                                                                |      3 | 16MB     |     192 | 23978.89
  5 |             ->  Nested Loop (6,10)                                                               |      3 | 1MB      |     192 | 23978.88
  6 |                ->  Nested Loop (7,9)                                                             |      5 | 1MB      |      30 | 2.48
  7 |                   ->  Streaming(type: BROADCAST)                                                 |      3 | 2MB      |       4 | 1.30
  8 |                      ->  Seq Scan on region                                                      |      1 | 1MB      |       4 | 1.02
  9 |                   ->  Seq Scan on nation                                                         |     25 | 1MB      |      34 | 1.08
  10 |                ->  Materialize                                                                   |      3 | 16MB     |     170 | 23976.35
  11 |                   ->  Streaming(type: REDISTRIBUTE)                                              |      3 | 2MB      |     170 | 23976.34
  12 |                      ->  Nested Loop (13,14)                                                     |      3 | 1MB      |     170 | 23976.14
  13 |                         ->  Seq Scan on supplier                                                 |  10000 | 1MB      |     144 | 108.33
  14 |                         ->  Materialize                                                          |      3 | 16MB     |      34 | 23767.83
  15 |                            ->  Streaming(type: REDISTRIBUTE)                                     |      3 | 2MB      |      34 | 23767.82
  16 |                               ->  Hash Join (17,21)                                              |      3 | 1MB      |      34 | 23767.71
  17 |                                  ->  Hash Join (18,19)                                           |   2538 | 1MB      |      44 | 11916.85
  18 |                                     ->  Seq Scan on partsupp                                     | 800000 | 1MB      |      14 | 8532.67
  19 |                                     ->  Hash                                                     |    651 | 16MB     |      30 | 2373.01
  20 |                                        ->  Seq Scan on part                                      |    651 | 1MB      |      30 | 2373.01
  21 |                                  ->  Hash                                                        |    507 | 16MB     |      36 | 11841.97
  22 |                                     ->  Subquery Scan on subquery                                |    507 | 1MB      |      36 | 11841.97
  23 |                                        ->  HashAggregate                                         |    507 | 16MB     |      42 | 11840.28
  24 |                                           ->  Streaming(type: REDISTRIBUTE)                      |    507 | 2MB      |      10 | 11837.75
  25 |                                              ->  Hash Join (26,31)                               |    507 | 1MB      |      10 | 11833.54
  26 |                                                 ->  Streaming(type: REDISTRIBUTE)                |   2538 | 2MB      |      14 | 11687.60
  27 |                                                    ->  Hash Semi Join (28, 29)                   |   2538 | 1MB      |      14 | 11617.80
  28 |                                                       ->  Seq Scan on partsupp                   | 800000 | 1MB      |      14 | 8532.67
  29 |                                                       ->  Hash                                   |    651 | 16MB     |       4 | 2373.01
  30 |                                                          ->  Seq Scan on part                    |    651 | 1MB      |       4 | 2373.01
  31 |                                                 ->  Hash                                         |   2001 | 16MB     |       4 | 126.89
  32 |                                                    ->  Hash Join (33,34)                         |   2000 | 1MB      |       4 | 126.89
  33 |                                                       ->  Seq Scan on supplier                   |  10000 | 1MB      |       8 | 108.33
  34 |                                                       ->  Hash                                   |     15 | 16MB     |       4 | 2.97
  35 |                                                          ->  Streaming(type: BROADCAST)          |     15 | 2MB      |       4 | 2.97
  36 |                                                             ->  Hash Join (37,38)                |      5 | 1MB      |       4 | 2.43
  37 |                                                                ->  Seq Scan on nation            |     25 | 1MB      |       8 | 1.08
  38 |                                                                ->  Hash                          |      3 | 16MB     |       4 | 1.30
  39 |                                                                   ->  Streaming(type: BROADCAST) |      3 | 2MB      |       4 | 1.30
  40 |                                                                      ->  Seq Scan on region      |      1 | 1MB      |       4 | 1.02
(40 rows)


   在这个计划中我们看到对region表的扫描结果使用Broadcast方式广播给其他DN节点,然后在各DN上和nation表做Nestloop连接。

   那么什么时候用Broadcast Stream,什么时候用Redistribute Stream呢?

   一般来说,Broadcast Stream比较适用于表要广播的表数据量比较小的情况,从上面的计划可以看到Streaming(type: BROADCAST)对应的E-rows 为3,即要广播的数据仅有3条。但是对于小表作为外连接的NonNullable-side时,是不适合用Broadcast Stream的。因为小表全表广播到所有DN后,和Nullable-side的表的数据进行外连接,因为Nullable-side的表在DN上存储的只是集群中该表的部分数据,而NonNullable-side表是所有数据,导致外连接的结果中Nullable-side的NULL行数增加。DN的结果再发给CN进行汇总后,会使整个结果的数据中结果增多。

5. 总结

文章对当前DWS的shared nothing分布式架构和执行流程进行了初步的介绍,包括CN、DN、GTM在架构中扮演的角色和功能。
在DWS分布式架构中使用的Stream算子主要分为3种类型:Gather Stream(每个源节点都将其数据发送给目标节点)、Redistribute Stream(每个源节点将其数据根据连接条件计算Hash值,根据重新计算的Hash值进行分布,发给对应的目标节点)和Broadcast Stream(由一个源节点将其数据发给N个目标节点)。
通过查看2个查询的执行计划说明在语句场景下为何要做Redistribute、何时做Redistribute合适,并且对比了Redistribute和Broadcast使用场景。


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。