GaussDB(DWS)的stream执行机制
1. 前言
- 适用版本:【8.1.0(及以上)】
介绍GaussDB(DWS)分布式架构的stream算子三种模式:Gather、Redistribute、Broadcast。
2. GaussDB(DWS)架构简介
近期刚刚接触GaussDB(DWS),它是一个shared nothing的分布式架构,下图是一个简化的GaussDB(DWS)架构图。
其中各组件的功能如下:
-
CN(Coordinator Node)为对外服务和协调节点,负责接受客户端连接以及对用户SQL命令解析下发,并收集DN节点执行的结果进行汇总,将结果展现给客户端。
-
DN(Datanode)为内部数据节点,承载内部数据存储及计算单元的功能。
-
GTM(Global Transaction Manager)负责集群全局事务控制,其与Datanode均有本地主备双机功能。
3. 分布式执行框架
对应于分布式架构,GaussDB(DWS)提供了分布式执行框架,这也是GaussDB(DWS)中最核心的技术,旨在充分利用DN的资源,尽量将计算下推到DN进行,避免CN成为瓶颈,以提升查询效率和系统扩展性。
分布式执行框架的技术特点如下:
-
CN负责查询请求的解析、基于代价进行优化以及向DN进行任务下发,并收集DN节点执行的结果进行汇总
-
DN上运行执行计划进程,基于本节点存储的数据执行任务
-
执行过程中每个算子都是接收下级算子的数据输入,并向上级算子输出数据。是一个生产者—消费者的流水线工作模型
针对分布式框架,GaussDB(DWS)增加了支撑分布式计划的Stream算子。Stream算子有三种类型:
-
Gather Stream(N:1):每个源节点都将其数据发送给目标节点
-
Redistribute Stream(N:N):每个源节点将其数据根据连接条件计算Hash值,根据重新计算的Hash值进行分布,发给对应的目标节点
-
Broadcast Stream(1:N):有一个源节点将其数据发给N个目标节点
下面通过tpch中的Q1和Q2为例,使用explain查看这两个SQL的执行计划来认识一下这几个算子。
4. 查看执行计划
4.1 查看与Gather Stream和Redistribute Stream算子相关的执行计划
以tpch的Q1为例说明,Q1的SQL语句如下
SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看
EXPLAIN
SELECT
l_returnflag,
l_linestatus,
SUM(l_quantity) as sum_qty,
SUM (l_extendedprice) as sum_base_price,
SUM (l_extendedprice * (1 - l_discount)) as sum_disc_price,
SUM (l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
AVG(l_quantity) as avg_qty,
AVG (l_extendedprice) as avg_price,
AVG (l_discount) as avg_disc,
COUNT(*) as count_order
FROM
row_engine.lineitem
WHERE
l_shipdate <= DATE '1998-12-01' - INTERVAL '3 day'
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
执行结果如下:
id | operation | E-rows | E-memory | E-width | E-costs
----+--------------------------------------------+---------+----------+---------+-----------
1 | -> Streaming (type: GATHER) | 5 | | 257 | 146811.70
2 | -> Sort | 5 | 16MB | 257 | 146811.20
3 | -> HashAggregate | 6 | 16MB | 257 | 146811.19
4 | -> Streaming(type: REDISTRIBUTE) | 18 | 2MB | 257 | 146810.87
5 | -> HashAggregate | 18 | 16MB | 257 | 146809.83
6 | -> Seq Scan on lineitem | 6001622 | 1MB | 25 | 66788.09
(6 rows)
Predicate Information (identified by plan id)
------------------------------------------------------------------------------------
3 --HashAggregate
Skew Agg Optimized by Statistic
6 --Seq Scan on lineitem
Filter: (l_shipdate <= '1998-11-28 00:00:00'::timestamp without time zone)
(4 rows)
====== Query Summary =====
---------------------------------
System available mem: 9252864KB
Query Max mem: 9473228KB
Query estimated mem: 3123KB
(3 rows)
为了便于说明,我在执行计划的每一行的行首加了一个序号。下面对这个计划进行一下说明:
-
从第2行的Sort算子开始直到第6行,是CN下发到在DN上执行的部分;
-
第1行是在CN上执行
-
执行顺序
-
首先从id为6的行开始,对lineitem表执行seqscan
-
id为5的行,对下层扫描得到的数据根据GROUP BY分组键执行hashaggregate操作
-
id为4的行,是一个Redistribute Stream算子,它将在DN上的local数据执行agg后结果,重分布给其他DN
-
id为3的行,各DN收到其他DN上的数据,重新做agg
-
id为2的行,对下层的agg结果进行排序
-
id为1的行,是一个Gather Stream算子,CN节点收到DN返回的结果,汇集后将最终结果展示给客户端
在这个执行计划中,我们看到了两个和Stream算子相关的节点:
-
Gather Stream,用于CN收集DN的结果
-
Redistribute Stream,用于将DN上的数据重分布给其他DN做HashAgg。
这里有两个问题:
-
Q1查询是一个单表查询,为什么需要在DN之间重分布数据呢?
tpch的Q1是对lineitem表的单表查询,进行分组聚集计算。因为lineitem表是按照l_orderkey列作为分布列,将数据分布到各DN的。而聚集操作分组键是l_returnflag和l_linestatus,而不 是分布键l_orderkey,所以,属于同一个分组的数据分布在不同的DN上,这就需要各DN重新按照分组键作为分布键,将数据分发给其他DN,使得各DN上拥有相同分组键的数据完成聚集操作。
-
从以上的计划中,可以看到在DN上先做了hashagg,再又重分发给其他DN,再次做hashagg,如下图中的右侧。为什么不是先把数据重分布发给其他DN后,做一次hashagg即可,如下图的左侧
这两个计划的区别在于:如果scan后数据量非常大,而聚集后数据量比较小的情况,就适合用第二个计划,可以减少DN之间的数据流动,降低网络开销。从上面Q1的执行计划中,可以明显看到,Seq Scan on lineitem对应的E-rows为6001622,即seqscan后的数据估计有6001622行,重分布之前HashAggregate的的结果数据估计只有18行,数据量大大减少。根据代价估算,重分布前做一次hashagg的代价,比直接重分布scan的结果的代价要小很多,故选择了重分布前先做一次hashagg。
4.2 查看与Gather Stream和Broadcast Stream算子相关的执行计划
以tpch的Q2为例说明,Q2的SQL语句如下
SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看
EXPLAIN
SELECT
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
FROM
row_engine.part,
row_engine.supplier,
row_engine.partsupp,
row_engine.nation,
row_engine.region
WHERE
p_partkey = ps_partkey
AND s_suppkey = ps_suppkey
AND p_size = 15
AND p_type LIKE 'SMALL%'
AND s_nationkey = n_nationkey
AND n_regionkey = r_regionkey
AND r_name = 'EUROPE '
AND ps_supplycost = (
SELECT
MIN(ps_supplycost)
FROM
row_engine.partsupp,
row_engine.supplier,
row_engine.nation,
row_engine.region
WHERE
p_partkey = ps_partkey
AND s_suppkey = ps_suppkey
AND s_nationkey = n_nationkey
AND n_regionkey = r_regionkey
AND r_name = 'EUROPE '
)
ORDER BY
s_acctbal DESC,
n_name,
s_name,
p_partkey
LIMIT 100;
执行结果如下:
id | operation | E-rows | E-memory | E-width | E-costs
----+--------------------------------------------------------------------------------------------------+--------+----------+---------+----------
1 | -> Limit | 3 | | 192 | 23979.08
2 | -> Streaming (type: GATHER) | 3 | | 192 | 23979.08
3 | -> Limit | 3 | 1MB | 192 | 23978.89
4 | -> Sort | 3 | 16MB | 192 | 23978.89
5 | -> Nested Loop (6,10) | 3 | 1MB | 192 | 23978.88
6 | -> Nested Loop (7,9) | 5 | 1MB | 30 | 2.48
7 | -> Streaming(type: BROADCAST) | 3 | 2MB | 4 | 1.30
8 | -> Seq Scan on region | 1 | 1MB | 4 | 1.02
9 | -> Seq Scan on nation | 25 | 1MB | 34 | 1.08
10 | -> Materialize | 3 | 16MB | 170 | 23976.35
11 | -> Streaming(type: REDISTRIBUTE) | 3 | 2MB | 170 | 23976.34
12 | -> Nested Loop (13,14) | 3 | 1MB | 170 | 23976.14
13 | -> Seq Scan on supplier | 10000 | 1MB | 144 | 108.33
14 | -> Materialize | 3 | 16MB | 34 | 23767.83
15 | -> Streaming(type: REDISTRIBUTE) | 3 | 2MB | 34 | 23767.82
16 | -> Hash Join (17,21) | 3 | 1MB | 34 | 23767.71
17 | -> Hash Join (18,19) | 2538 | 1MB | 44 | 11916.85
18 | -> Seq Scan on partsupp | 800000 | 1MB | 14 | 8532.67
19 | -> Hash | 651 | 16MB | 30 | 2373.01
20 | -> Seq Scan on part | 651 | 1MB | 30 | 2373.01
21 | -> Hash | 507 | 16MB | 36 | 11841.97
22 | -> Subquery Scan on subquery | 507 | 1MB | 36 | 11841.97
23 | -> HashAggregate | 507 | 16MB | 42 | 11840.28
24 | -> Streaming(type: REDISTRIBUTE) | 507 | 2MB | 10 | 11837.75
25 | -> Hash Join (26,31) | 507 | 1MB | 10 | 11833.54
26 | -> Streaming(type: REDISTRIBUTE) | 2538 | 2MB | 14 | 11687.60
27 | -> Hash Semi Join (28, 29) | 2538 | 1MB | 14 | 11617.80
28 | -> Seq Scan on partsupp | 800000 | 1MB | 14 | 8532.67
29 | -> Hash | 651 | 16MB | 4 | 2373.01
30 | -> Seq Scan on part | 651 | 1MB | 4 | 2373.01
31 | -> Hash | 2001 | 16MB | 4 | 126.89
32 | -> Hash Join (33,34) | 2000 | 1MB | 4 | 126.89
33 | -> Seq Scan on supplier | 10000 | 1MB | 8 | 108.33
34 | -> Hash | 15 | 16MB | 4 | 2.97
35 | -> Streaming(type: BROADCAST) | 15 | 2MB | 4 | 2.97
36 | -> Hash Join (37,38) | 5 | 1MB | 4 | 2.43
37 | -> Seq Scan on nation | 25 | 1MB | 8 | 1.08
38 | -> Hash | 3 | 16MB | 4 | 1.30
39 | -> Streaming(type: BROADCAST) | 3 | 2MB | 4 | 1.30
40 | -> Seq Scan on region | 1 | 1MB | 4 | 1.02
(40 rows)
在这个计划中我们看到对region表的扫描结果使用Broadcast方式广播给其他DN节点,然后在各DN上和nation表做Nestloop连接。
那么什么时候用Broadcast Stream,什么时候用Redistribute Stream呢?
一般来说,Broadcast Stream比较适用于表要广播的表数据量比较小的情况,从上面的计划可以看到Streaming(type: BROADCAST)对应的E-rows 为3,即要广播的数据仅有3条。但是对于小表作为外连接的NonNullable-side时,是不适合用Broadcast Stream的。因为小表全表广播到所有DN后,和Nullable-side的表的数据进行外连接,因为Nullable-side的表在DN上存储的只是集群中该表的部分数据,而NonNullable-side表是所有数据,导致外连接的结果中Nullable-side的NULL行数增加。DN的结果再发给CN进行汇总后,会使整个结果的数据中结果增多。
5. 总结
文章对当前DWS的shared nothing分布式架构和执行流程进行了初步的介绍,包括CN、DN、GTM在架构中扮演的角色和功能。
在DWS分布式架构中使用的Stream算子主要分为3种类型:Gather Stream(每个源节点都将其数据发送给目标节点)、Redistribute Stream(每个源节点将其数据根据连接条件计算Hash值,根据重新计算的Hash值进行分布,发给对应的目标节点)和Broadcast Stream(由一个源节点将其数据发给N个目标节点)。
通过查看2个查询的执行计划说明在语句场景下为何要做Redistribute、何时做Redistribute合适,并且对比了Redistribute和Broadcast使用场景。
- 点赞
- 收藏
- 关注作者
评论(0)