GaussDB(DWS)的stream执行机制
GaussDB(DWS)架构简介
近期刚刚接触GaussDB(DWS),它是一个shared nothing的分布式架构,下图是一个简化的GaussDB(DWS)架构图。
其中各组件的功能如下:
CN(Coordinator Node)为对外服务和协调节点,负责接受客户端连接以及对用户SQL命令解析下发,并收集DN节点执行的结果进行汇总,将结果展现给客户端。
DN(Datanode)为内部数据节点,承载内部数据存储及计算单元的功能。
GTM(Global Transaction Manager)负责集群全局事务控制,其与Datanode均有本地主备双机功能。
分布式执行框架
对应于分布式架构,GaussDB(DWS)提供了分布式执行框架,这也是GaussDB(DWS)中最核心的技术,旨在充分利用DN的资源,尽量将计算下推到DN进行,避免CN成为瓶颈,以提升查询效率和系统扩展性。
分布式执行框架的技术特点如下:
CN负责查询请求的解析、基于代价进行优化以及向DN进行任务下发,并收集DN节点执行的结果进行汇总
DN上运行执行计划进程,基于本节点存储的数据执行任务
执行过程中每个算子都是接收下级算子的数据输入,并向上级算子输出数据。是一个生产者—消费者的流水线工作模型
针对分布式框架,GaussDB(DWS)增加了支撑分布式计划的Stream算子。Stream算子有三种类型:
Gather Stream(N:1):每个源节点都将其数据发送给目标节点
Redistribute Stream(N:N):每个源节点将其数据根据连接条件计算Hash值,根据重新计算的Hash值进行分布,发给对应的目标节点
Broadcast Stream(1:N):有一个源节点将其数据发给N个目标节点
下面通过tpch中的Q1和Q2为例,使用explain查看这两个SQL的执行计划来认识一下这几个算子。
查看执行计划
查看与Gather Stream和Redistribute Stream算子相关的执行计划
以tpch的Q1为例说明,Q1的SQL语句如下
SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看 EXPLAIN SELECT l_returnflag, l_linestatus, SUM(l_quantity) as sum_qty, SUM (l_extendedprice) as sum_base_price, SUM (l_extendedprice * (1 - l_discount)) as sum_disc_price, SUM (l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, AVG(l_quantity) as avg_qty, AVG (l_extendedprice) as avg_price, AVG (l_discount) as avg_disc, COUNT(*) as count_order FROM row_engine.lineitem WHERE l_shipdate <= DATE '1998-12-01' - INTERVAL '3 day' GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus;
执行结果如下:
id | operation | E-rows | E-memory | E-width | E-costs ----+--------------------------------------------+---------+----------+---------+----------- 1 | -> Streaming (type: GATHER) | 5 | | 257 | 146811.70 2 | -> Sort | 5 | 16MB | 257 | 146811.20 3 | -> HashAggregate | 6 | 16MB | 257 | 146811.19 4 | -> Streaming(type: REDISTRIBUTE) | 18 | 2MB | 257 | 146810.87 5 | -> HashAggregate | 18 | 16MB | 257 | 146809.83 6 | -> Seq Scan on lineitem | 6001622 | 1MB | 25 | 66788.09 (6 rows) Predicate Information (identified by plan id) ------------------------------------------------------------------------------------ 3 --HashAggregate Skew Agg Optimized by Statistic 6 --Seq Scan on lineitem Filter: (l_shipdate <= '1998-11-28 00:00:00'::timestamp without time zone) (4 rows) ====== Query Summary ===== --------------------------------- System available mem: 9252864KB Query Max mem: 9473228KB Query estimated mem: 3123KB (3 rows)
为了便于说明,我在执行计划的每一行的行首加了一个序号。下面对这个计划进行一下说明:
从第2行的Sort算子开始直到第6行,是CN下发到在DN上执行的部分;
第1行是在CN上执行
执行顺序
首先从id为6的行开始,对lineitem表执行seqscan
id为5的行,对下层扫描得到的数据根据GROUP BY分组键执行hashaggregate操作
id为4的行,是一个Redistribute Stream算子,它将在DN上的local数据执行agg后结果,重分布给其他DN
id为3的行,各DN收到其他DN上的数据,重新做agg
id为2的行,对下层的agg结果进行排序
id为1的行,是一个Gather Stream算子,CN节点收到DN返回的结果,汇集后将最终结果展示给客户端
在这个执行计划中,我们看到了两个和Stream算子相关的节点:
Gather Stream,用于CN收集DN的结果
Redistribute Stream,用于将DN上的数据重分布给其他DN做HashAgg。
这里有两个问题:
Q1查询是一个单表查询,为什么需要在DN之间重分布数据呢?
tpch的Q1是对lineitem表的单表查询,进行分组聚集计算。因为lineitem表是按照l_orderkey列作为分布列,将数据分布到各DN的。而聚集操作分组键是l_returnflag和l_linestatus,而不 是分布键l_orderkey,所以,属于同一个分组的数据分布在不同的DN上,这就需要各DN重新按照分组键作为分布键,将数据分发给其他DN,使得各DN上拥有相同分组键的数据完成聚集操作。
从以上的计划中,可以看到在DN上先做了hashagg,再又重分发给其他DN,再次做hashagg,如下图中的右侧。为什么不是先把数据重分布发给其他DN后,做一次hashagg即可,如下图的左侧
这两个计划的区别在于:如果scan后数据量非常大,而聚集后数据量比较小的情况,就适合用第二个计划,可以减少DN之间的数据流动,降低网络开销。从上面Q1的执行计划中,可以明显看到,Seq Scan on lineitem对应的E-rows为6001622,即seqscan后的数据估计有6001622行,重分布之前HashAggregate的的结果数据估计只有18行,数据量大大减少。根据代价估算,重分布前做一次hashagg的代价,比直接重分布scan的结果的代价要小很多,故选择了重分布前先做一次hashagg。
查看与Gather Stream和Broadcast Stream算子相关的执行计划
以tpch的Q2为例说明,Q2的SQL语句如下
SET explain_perf_mode=pretty; --以Oracle-like风格打印执行计划,便于查看 EXPLAIN SELECT s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment FROM row_engine.part, row_engine.supplier, row_engine.partsupp, row_engine.nation, row_engine.region WHERE p_partkey = ps_partkey AND s_suppkey = ps_suppkey AND p_size = 15 AND p_type LIKE 'SMALL%' AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'EUROPE ' AND ps_supplycost = ( SELECT MIN(ps_supplycost) FROM row_engine.partsupp, row_engine.supplier, row_engine.nation, row_engine.region WHERE p_partkey = ps_partkey AND s_suppkey = ps_suppkey AND s_nationkey = n_nationkey AND n_regionkey = r_regionkey AND r_name = 'EUROPE ' ) ORDER BY s_acctbal DESC, n_name, s_name, p_partkey LIMIT 100;
执行结果如下:
id | operation | E-rows | E-memory | E-width | E-costs ----+--------------------------------------------------------------------------------------------------+--------+----------+---------+---------- 1 | -> Limit | 3 | | 192 | 23979.08 2 | -> Streaming (type: GATHER) | 3 | | 192 | 23979.08 3 | -> Limit | 3 | 1MB | 192 | 23978.89 4 | -> Sort | 3 | 16MB | 192 | 23978.89 5 | -> Nested Loop (6,10) | 3 | 1MB | 192 | 23978.88 6 | -> Nested Loop (7,9) | 5 | 1MB | 30 | 2.48 7 | -> Streaming(type: BROADCAST) | 3 | 2MB | 4 | 1.30 8 | -> Seq Scan on region | 1 | 1MB | 4 | 1.02 9 | -> Seq Scan on nation | 25 | 1MB | 34 | 1.08 10 | -> Materialize | 3 | 16MB | 170 | 23976.35 11 | -> Streaming(type: REDISTRIBUTE) | 3 | 2MB | 170 | 23976.34 12 | -> Nested Loop (13,14) | 3 | 1MB | 170 | 23976.14 13 | -> Seq Scan on supplier | 10000 | 1MB | 144 | 108.33 14 | -> Materialize | 3 | 16MB | 34 | 23767.83 15 | -> Streaming(type: REDISTRIBUTE) | 3 | 2MB | 34 | 23767.82 16 | -> Hash Join (17,21) | 3 | 1MB | 34 | 23767.71 17 | -> Hash Join (18,19) | 2538 | 1MB | 44 | 11916.85 18 | -> Seq Scan on partsupp | 800000 | 1MB | 14 | 8532.67 19 | -> Hash | 651 | 16MB | 30 | 2373.01 20 | -> Seq Scan on part | 651 | 1MB | 30 | 2373.01 21 | -> Hash | 507 | 16MB | 36 | 11841.97 22 | -> Subquery Scan on subquery | 507 | 1MB | 36 | 11841.97 23 | -> HashAggregate | 507 | 16MB | 42 | 11840.28 24 | -> Streaming(type: REDISTRIBUTE) | 507 | 2MB | 10 | 11837.75 25 | -> Hash Join (26,31) | 507 | 1MB | 10 | 11833.54 26 | -> Streaming(type: REDISTRIBUTE) | 2538 | 2MB | 14 | 11687.60 27 | -> Hash Semi Join (28, 29) | 2538 | 1MB | 14 | 11617.80 28 | -> Seq Scan on partsupp | 800000 | 1MB | 14 | 8532.67 29 | -> Hash | 651 | 16MB | 4 | 2373.01 30 | -> Seq Scan on part | 651 | 1MB | 4 | 2373.01 31 | -> Hash | 2001 | 16MB | 4 | 126.89 32 | -> Hash Join (33,34) | 2000 | 1MB | 4 | 126.89 33 | -> Seq Scan on supplier | 10000 | 1MB | 8 | 108.33 34 | -> Hash | 15 | 16MB | 4 | 2.97 35 | -> Streaming(type: BROADCAST) | 15 | 2MB | 4 | 2.97 36 | -> Hash Join (37,38) | 5 | 1MB | 4 | 2.43 37 | -> Seq Scan on nation | 25 | 1MB | 8 | 1.08 38 | -> Hash | 3 | 16MB | 4 | 1.30 39 | -> Streaming(type: BROADCAST) | 3 | 2MB | 4 | 1.30 40 | -> Seq Scan on region | 1 | 1MB | 4 | 1.02 (40 rows)
在这个计划中我们看到对region表的扫描结果使用Broadcast方式广播给其他DN节点,然后在各DN上和nation表做Nestloop连接。
那么什么时候用Broadcast Stream,什么时候用Redistribute Stream呢?
一般来说,Broadcast Stream比较适用于表要广播的表数据量比较小的情况,从上面的计划可以看到Streaming(type: BROADCAST)对应的E-rows 为3,即要广播的数据仅有3条。但是对于小表作为外连接的NonNullable-side时,是不适合用Broadcast Stream的。因为小表全表广播到所有DN后,和Nullable-side的表的数据进行外连接,因为Nullable-side的表在DN上存储的只是集群中该表的部分数据,而NonNullable-side表是所有数据,导致外连接的结果中Nullable-side的NULL行数增加。DN的结果再发给CN进行汇总后,会使整个结果的数据中结果增多。
- 点赞
- 收藏
- 关注作者
评论(0)