GaussDB(DWS) 集群通信系列二:stream线程池设计

举报
半岛里有个小铁盒 发表于 2024/02/06 09:58:07 2024/02/06
【摘要】 GaussDB(DWS)分布式架构的Stream算子作为SQL join操作时频繁发生的执行算子,共存在三种模式:Gather、Redistribute、Broadcast,分别负责CN节点GATHER数据,DN节点REDISTRIBUTE和BROACAST数据。大集群高并发场景下,Stream算子过多可能会导致通信的性能瓶颈,引起性能劣化(2000个stream同时启动,进程初始化耗时从ms级劣

GaussDB(DWS) 集群通信系列二:stream线程池设计

1.前言

适用版本:【8.1.0(及以上)】

GaussDB(DWS)分布式架构的Stream算子作为SQL join操作时频繁发生的执行算子,共存在三种模式:Gather、Redistribute、Broadcast,分别负责CN节点GATHER数据,DN节点REDISTRIBUTE和BROACAST数据。大集群高并发场景下,Stream算子过多可能会导致通信的性能瓶颈,引起性能劣化(2000个stream同时启动,进程初始化耗时从ms级劣化到s级),因此需要尽可能减少Stream算子。但是在某些现场环境下,存在数据倾斜、join查询不包含必要分布键等客观情况,Stream算子无法有效减少,为多表join场景下的查询时延保障带来挑战。因此GaussDB(DWS)对于线程初始化->线程任务执行->线程退出执行的流程方面做了stream线程池优化,减少了线程初始化与线程退出所带来的开销。

2.实现原理

stream线程是临时线程,随query启动和退出,负责stream算子的执行,stream线程初始化和退出都会争抢锁等进程级资源,在stream线程个数无法进一步优化的场景下,需要设计有效方案以减少stream线程初始化和退出的时间代价,将进程初始化耗时稳定在ms级,保障数据库的确定性时延查询。Stream线程池的核心思想是等stream线程执行完计划任务,保留必要且可复用的线程信息,将线程放入线程池中

img

线程池中的线程执行过程如上图所示,其具体步骤为:

  • 步骤一:线程信息初始化
  • 步骤二:线程待唤醒后轻量级初始化(query级初始化)
  • 步骤三:线程任务执行
  • 步骤四:线程清理
  • 返回步骤二:继续等待下条query执行

在返回步骤二时,当线程等待超时、超出线程池容量(最大stream线程个数)、异常时线程已不可用,需要销毁。

其中步骤一中在线程初始化时,需要执行的操作有:线程创建、创建相关内存上下文、信号处理函数注册、内存追踪信息初始化、初始化GUC选项等操作;

步骤二中在线程轻量级/查询级初始化时,需要执行的操作有恢复GUC参数、初始化BackendParams、重置GUC参数等操作。

img

stream线程池为了高效管理线程的出/入池操作,采用无锁队列实现。定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer、线程唤醒所需的锁和条件变量。

当线程还未被创建时,初始化一定数量的ThreadSlot数量以预留stream线程,这些ThreadSlot被保存在数组threadSlots中。当stream线程执行完毕,需要将stream线程放置到表征可复用线程的无锁队列,称之为idleRing;当线程因为超时、异常等原因不再复用,需要退出时,将stream线程对应的ThreadSlot放置到表征未创建线程的无锁队列,称之为emptyRing。

idleRing的作用是为了快速获取并复用线程池中的线程,emptyRing的作用是快速获取一个未被使用的ThreadSlot结构,以创建一个新的stream线程。由于stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配。对于设计线程池而言,每一个database都应该对应一个idleRing。

综上所述,基于无锁队列的stream线程池设计如下所示:

img

从上图可以看出,一个线程池包含预留stream线程结构的threadSlots、一个表征未创建线程的无锁队列emptyRing和表征可复用线程的无锁队列idleRing,由于每个database对应一个idleRing,因此多个idleRing被组织为链表结构。

3.具体实现机制

3.1 数据结构设计

  • 定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer,StreamProducer是父线程向子线程传递的唯一结构、线程唤醒所需的锁和条件变量。

    typedef struct
    {
        int status;
        uint32 idx;
        ThreadId tid;
        Oid dbOid;
        StreamProducer* streamObj;
        pthread_mutex_t m_mutex;
        pthread_cond_t m_cond;
    } ThreadSlot;
    
  • 定义结构体StreamThreadPool表征线程池,其中size表示线程池中拟预留的ThreadSlot个数,ThreadSlot被保存在threadSlots数组中;无锁队列emptyRing用来保存未创建线程的ThreadSlot,对应地,idleRing用来保存空闲的已创建stream线程的ThreadSlot。结构如下所示:

    class StreamThreadPool: public BaseObject
    {
    public:
        StreamThreadPool();
        void Init(int num);                            				     // streamThreadPool init   
        int Call(StreamProducer* obj);                   			     // 获取idle线程 或 create 新线程
        bool Wait();                                                      // idle线程等待唤醒或者超时退出
        ThreadSlot* GetLocalSlot();                                       // get streamThreadSlot
        void SetLocalSlot(int slotIdx);                                   // set streamThreadSlot
        StreamPool* GetLocalPool();                                       // 获取streamDBPool 或 新建一个
        ThreadSlot* PopSlot();                                            // 从idleRing/emptyRing获取一slot
        void PushToEmpty(ThreadSlot* slot);                               // 将slot直接放入emptyRing
        void PushToIdle(StreamPool* pool, ThreadSlot* slot);              // 将slot直接放入idleRing
        void LocalPushToIdle();                                           // 根据状态,将slot放入idleRing
        void LocalPushToEmpty();                                          // 根据状态,将slot放入emptyRing
        int CleanStreamPool(const char *dbName, cleanOption cleanMode);   // 根据db信息清线程
        void CleanInAllStreamPool(int desNum);                            // 调整线程池中stream线程个数
        int GetStreamNum();                                               // 获取线程池中stream线程个数
        bool Release();                                                   // 判断超时线程是否需要清理
        bool TimeoutClean();                                              // 清理超时idle线程
    
    private:
        int size;
        ThreadSlot* threadSlots;
        ArrayLockFreeQueue emptyRing;
        StreamPool* PoolListHead;
    }
    
  • 定义结构体StreamPool,由于stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配,所以一个emptyRing和一个database相匹配,保存在链表PoolListHead中。

    typedef struct StreamPool
    {
        Oid dbOid;
        ArrayLockFreeQueue idleRing;
        struct StreamPool* next;
    } StreamPool;
    

综上,我们可以得到各结构间组织的直观图,如下所示:

img

上图中threadSlots可以放在idleRing(蓝色)、emptyRing(绿色)和运行空间(黄色)中。

3.2 stream线程状态转移DFA设计

每一个记录线程信息的结构ThreadSlot中都保存了线程当前的状态status,记录线程状态的目的是为了保障线程执行过程的有序控制,也可以通过状态的互斥避免threadSlot不会被两个线程同时使用。

stream线程状态转移用确定性有限状态机(DFA,definite automata)表征,共包含4个状态:

STREAM_SLOT_EXIT、STREAM_SLOT_IDLE、STREAM_SLOT_HOLD和STREAM_SLOT_RUN状态。其物理含义如下:

  • STREAM_SLOT_EXIT:线程退出状态,表示线程未被创建或线程已退出;
  • STREAM_SLOT_IDLE:线程可复用状态,表示线程在idleRing中,可以被复用;
  • STREAM_SLOT_HOLD:线程临时独占状态,表示线程在做进入下一个状态的准备工作;
  • STREAM_SLOT_RUN:线程运行状态,表示线程正在执行任务。

状态间转移条件如下所示,图中粗箭头表示状态机主循环部分:

img

与状态对应的,是slot所处的位置,slot所处的位置有三处,分别是idleRing、emptyRing和运行空间,slot从无锁队列中拿出,运行时所处的位置,我们称之为运行空间。各状态所处的位置情况如下所示:

  • STREAM_SLOT_EXIT:idleRing(idle线程超时)、emptyRing(初始化或者FATAL);
  • STREAM_SLOT_IDLE:idleRing
  • STREAM_SLOT_HOLD:运行空间(从无锁队列中取出)、idleRing(idle线程超时或中断);
  • STREAM_SLOT_RUN:运行空间。

Slot的位置变化和状态转移的关系如下,图中粗箭头表示状态机主循环部分:

img

根据各状态所处的位置情况,从idleRing中取出的slot可能有三种状态:EXIT、IDLE、HOLD。当取出IDLE状态的slot,说明线程可复用;当取出EXIT状态的slot,说明线程已退出,此时需要将slot转存到emptyRing;当取出HOLD状态,说明线程正在被使用,此时需要放回idleRing。

EmptyRing中slot的状态只能是EXIT,运行空间中slot的状态要么是HOLD(刚取出还未运行),要么是RUN(正在运行),不再赘述

3.3 单个stream线程执行流程

Stream线程池中stream线程整体执行流程如下图所示:

img

stream线程初始化仅初始化一次,执行完query之后,便将连接归还到连接池里,循环执行上图中黄色部分的语句,如果有异常则线程退出,连接销毁,slot归还至emptyRing;如果正常执行结束,将连接中内容清理,避免下个连接误用,并将slot归还至idleRing等待下个连接复用。

那么stream线程复用时如何保持参数的一致性呢,对应上图中的set GUC params阶段。父线程保存自己的guc_variables在syncGucVariables中,syncGucVariables是需要传递给stream的结构用以保证父子线程guc参数的一致。然后父线程在初始化streamProducer时将syncGucVariables保存在该结构中传递。Stream线程根据streamProducer初始化自己的syncGucVariables变量,首先reset所有的guc变量,然后根据syncGucVariables修正自己的variables。

4.外部接口

4.1 GUC参数

  • max_stream_pool:设置stream线程池能够容纳stream线程的最大个数。该参数8.1.2及以上版本支持。默认值为65535。设置为-1表示不开启stream线程池。该参数支持reload更新,更新规则:设置max_stream_pool小于当前可用线程个数,支持线程个数实时减少;当设置max_stream_pool大于当前idle线程个数,将由业务驱动线程个数的增加

4.2 视图

  • pg_thread_wait_status:展示了集群所有CN/DN进程内的所有线程的实时 等待状态,是定位集群通信问题最重要的视图

    其中对于wait_status列状态说明如下:

    • wait stream task:空闲的stream线程;

    • wait node:等待其他DN的数据,需要关注对端状态;

    • flush data:发送数据给其他DN时因为对端buffer满而阻塞;

    • wait cmd:DN上空闲的postgres线程,等待CN的下一个query;

    • none:未定义状态,极有可能是阻塞原因;

    • synchronize quit:同步退出状态,自身任务已完成,在等待同一个query的其他线程一起退出;

5.通过表象看stream线程池逻辑

【场景一】集群基础行为场景——建立多数据库场景

  1. Create database ***;(建立多库)
  2. 分别执行带stream算子的查询;

例:create table test_01(c1 int, c2 int)with(orientation=column) distribute by hash(c1);
insert into test_01 select generate_series(1,100), generate_series(1,100);analyze test_01;
select * from test_01 a, test_01 b, test_01 c, test_01 d, test_01 e, test_01 f where a.c2 =b.c2 and c.c2 = d.c2 and e.c2=f.c2 limit 100;

  1. 查询结束,查pgxc_thread_wait_status看DN节点:预期stream线程状态为wait thread cond。且多database之间stream线程不复用。

【场景二】集群基础行为场景——建立多用户场景

  1. Create user ***;(建立多用户)
  2. 分别执行带stream算子的查询;(参考场景一示例)
  3. 查询结束,查pgxc_thread_wait_status看DN节点:预期stream线程状态为wait thread cond。且多user之间stream线程可以复用。

例:用户一执行完查询,视图中显示共有四个stream线程在线程池,用户二执行同样查询返回正确结果,视图中的stream线程个数不变,且线程号也是一致的,则说明复用。

【场景三】集群基础行为场景——线程清理场景

  1. 调整guc参数max_stream_pool的值,观测是否生效;预期:当设置max_stream_pool小于当前idle线程个数,支持线程个数实时减少;当设置max_stream_pool大于当前idle线程个数,将由业务驱动线程个数的增加,但是不会超过max_stream_pool。
  2. 执行clean connection(ALL force),查看stream线程是否被清理;预期:该database的stream线程被完全清理。
  3. 执行drop database命令,查看stream线程是否被清理;预期:该database的stream线程被完全清理。

6.总结

本文详细介绍了stream连接池及其原理,让我们更好的理解GaussDB(DWS)集群通信中数据交互的具体逻辑,对于GaussDB通信运维也具备一定的参考意义。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。