GaussDB(DWS)运维 -- 行存存储和列存转换

举报
譡里个檔 发表于 2025/01/17 12:15:42 2025/01/17
298 0 0
【摘要】 提供一种行存表和列存表专项转换的方法

--
-- 使用场景: 行存表转列存表/列存表转行存表 转换自定义函数
-- 1. 可保留注释、索引和约束的前后一致性
-- 2. 建议打开视图重建功能,否则表存在依赖对象时会转换失败
-- 3. 日志会记录详细的执行日志信息,包括转换前后的表定义、权限和索引信息
-- 4. 转换失败后日志会记录详细的失败信息
-- 5. 行存到列存的转换时,如果函数的第四个参数typeReplace位true,会自动把raw/uuid/name转换成text,否则报错
--  
-- 参数说明
-- schemaname: 表的schema
-- tablename: 表名
-- mode: 转化模式, r2c为行转列 c2r为列转行
--
-- 使用约束
-- 1. 转换期间对表加7->8级锁,其他session对表只有读权限
-- 2. 行存转列存时
--    2.1.只保留主键和唯一约束/唯一索引,其它索引不会迁移到列存表上
--    2.2.存在where条件的表达式索引不会迁移到列存表上
--    2.3.非btree索引不会迁移到列存表上
--    2.4.没有迁移的索引会记录在字段skipidxdef中
-- 3. 列存转行存时
--    3.1.psort索引不会迁移到行存表上
--    3.2.没有迁移的索引会记录在字段skipidxdef中
-- 4. 如果当前存储格式和目标格式一致,不做转换,标记转换成功
-- 5. 如果checksum校验或者表行数校验不通过,转换失败
-- 6. 因行列存压缩级别配置不一样,新建对象均为默认压缩级别
-- 7. 默认只处理非临时表(非CREATE TEMP创建的),临时表转换失败,报'object doesnot exist'
-- 8. 单表存在serial内置生成的sequence或者ALTER SEQUEDCE OWNED BY
--    创建的sequence时,支持转换,转换前后sequence取值和权限保持不变

/*
--转换操作日志表
CREATE SCHEMA dfm;
DROP TABLE IF EXISTS dfm.switch_log;
CREATE TABLE IF NOT EXISTS dfm.switch_log(
    node text default pgxc_node_str(),
    pid bigint default split_part(pg_current_sessionid(), '.', 2),
    schemaname text,
    tablename text,
    mode text,
    start timestamptz(3),
    finish timestamptz(3),
    sucess bool,
    abort text,
    detail text,
    skipidxdef text
) DISTRIBUTE BY HASH(tablename, schemaname);
*/

CREATE OR REPLACE FUNCTION dfm.switch_orientation(schemaname text, tablename text, mode text, typeReplace bool default false)
 RETURNS boolean
 LANGUAGE plpgsql
 SECURITY DEFINER NOT FENCED NOT SHIPPABLE
AS $function$
DECLARE
    result bool;
    destori text;
    srcori text;
    optStr text;
    tableDef text;
    srcDef text[];
    lineStr text;
    defStr text;
    fetchAclStr text;
    fetchAclCmd text;
    aclStr text;
    srcAcl text;
    newAcl text;
    idxStr text;
    ownerStr text;
    rebuildStr text;
    skipIdxDef text;
    csum1  numeric;
    csum2  numeric;
    cnt1  numeric;
    cnt2  numeric;
    queryStr text;
    seqOid oid;
    seqName text;
    seqCol  text;
    seqColType text;
    seqOwner text;
    seqExpr text;
    seqTmp text;
    myCur refcursor;
    fetchSeqStr text;
    seqCtr int;
    seqSrcAcl text;
    seqNewAcl text;
    errMsg text;
    errCxt text;
    errDetail text;
    errHint text;
    start timestamptz;
    finish timestamptz;
    abort text;
    detail text;
    reloid oid;
    insertLogStr text;
    colSeqNum int;
    srcTypeArray text[];
    destTypeArray text[];
BEGIN
    SET LOCAL client_min_messages = 'WARNING';
    SET LOCAL max_query_retry_times = 0;
    skipIdxDef := NULL;

    insertLogStr := 'INSERT INTO dfm.switch_log(schemaname, tablename, mode, start, finish, sucess, abort, detail, skipidxdef) VALUES(:1, :2, :3, :4, :5, :6, :7, :8, :9)';

    -- detail := E'-- version 5.2(2023-2-23 19:00)\n';
    -- 校验转换模式
    start := clock_timestamp()::timestamptz(3);
    detail := detail || start || ': check switch mode' || E'\n';
    IF mode = 'r2c' THEN -- 行转列
        detail := detail || start || ': try to switch table ' || schemaname || '.' || tablename  || ' from row to column' || E'\n';
        destori := 'column';
    ELSIF mode = 'c2r' THEN -- 列转行
        detail := detail || start || ': try to switch table ' || schemaname || '.' || tablename  || ' from column to row' || E'\n';
        destori := 'row';
    ELSE -- 校验失败
        abort := 'invalid mode: ' || mode;
        EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
        RETURN false;
    END IF;

    -- 确认表对象存在
    detail := detail || clock_timestamp()::timestamptz(3) || ': try to find table in database ' || E'\n';
    PERFORM c.oid
    FROM pg_class c
    LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = schemaname AND c.relname = tablename
    AND relpersistence <> 't'
    AND c.relkind = 'r' AND c.reloptions::text NOT LIKE '%internal_mask%';

    IF NOT FOUND THEN
        abort := 'object doesnot exist';
        EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
        RETURN false;
    END IF;

    -- 锁表:7级锁,表只读,防止并发的数据修改动作
    queryStr :=  'LOCK TABLE ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || ' IN EXCLUSIVE MODE;';
    detail := detail || clock_timestamp()::timestamptz(3) || E': try to lock table --\n    ' || queryStr || E'\n';
    EXECUTE IMMEDIATE  queryStr;

    -- 获取表的oid
    SELECT c.oid INTO reloid
    FROM pg_class c
    LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = schemaname AND c.relname = tablename
    AND relpersistence <> 't'
    AND c.relkind = 'r' AND c.reloptions::text NOT LIKE '%internal_mask%';

   -- 校验表是否存在依赖对象;如果存在&视图解耦开关没有打开,则报错
   PERFORM 1 FROM pg_settings WHERE name = 'view_independent' AND setting = 'on';
   IF NOT FOUND THEN
        detail := detail || clock_timestamp()::timestamptz(3) || ': view_independent = off' || E'\n';
        PERFORM 1 FROM gs_view_dependency_basic WHERE refobjschema = schemaname AND refobjname = tablename AND refobjectid = reloid;
        IF FOUND THEN
            abort := 'some object depend on current table';
            EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
            RETURN false;
        ELSE
            detail := detail || clock_timestamp()::timestamptz(3) || ': no dependent objects found' || E'\n';
        END IF;
   ELSE
        detail := detail || clock_timestamp()::timestamptz(3) || ': view_independent = on' || E'\n';
   END IF;

    -- 获取表的当前存储格式
    SELECT
        t1.option_value INTO srcori
    FROM (
        SELECT
            (pg_catalog.pg_options_to_table(c.reloptions)).option_name AS option_name,
            (pg_catalog.pg_options_to_table(c.reloptions)).option_value AS option_value
        FROM pg_class c
        WHERE oid = reloid
    ) t1
    WHERE t1.option_name IN ('orientation');

    -- 当前存储格式已经是预期格式
    IF srcori = destori THEN
        detail := detail || clock_timestamp()::timestamptz(3) || ': it is already '|| srcori || E'\n';
        EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), true, abort, detail, skipIdxDef;
        RETURN true;
    END IF;

    -- 生成预期的option(注意中间几个只有行存或者列存支持的option被抛弃掉了)
    detail := detail || clock_timestamp()::timestamptz(3) || ': gen reloptions ' || E'\n';
    WITH src AS (
        SELECT
            (pg_catalog.pg_options_to_table(c.reloptions)).option_name AS option_name,
            (pg_catalog.pg_options_to_table(c.reloptions)).option_value AS option_value
        FROM pg_class c
        WHERE oid = reloid
    ),
    t AS(
        SELECT
            option_name, option_value
        FROM src t1
        WHERE t1.option_name NOT IN ('orientation', 'compression', 'enable_hstore', 'colversion', 'enable_delta', 'fillfactor')

        UNION ALL
        SELECT 'orientation', destori
    )
    SELECT
        'WITH(' || string_agg(option_name || '=' || option_value, ',') || ')' INTO optStr
    FROM t;

    -- 获取原表的 name raw uuid 字段类型信息
    SELECT array_agg(c)::text[] INTO srcTypeArray FROM(
        SELECT
            CASE WHEN atttypid = 19   THEN '	' || a.attname || ' name' -- name
                 WHEN atttypid = 86   THEN '	' || a.attname || ' raw'  -- raw
                 WHEN atttypid = 2950 THEN '	' || a.attname || ' uuid' --uuid
                 ELSE NULL
            END AS c
        FROM pg_attribute a
        INNER JOIN pg_type t ON t.oid = a.atttypid
        WHERE attrelid = reloid AND attnum > 0 AND attisdropped = false
        ORDER BY attnum
    );
    detail := detail || clock_timestamp()::timestamptz(3) || E': source attnum+type array --\n    ' || srcTypeArray::text || E'\n';

    -- 获取name raw uuid字段类型要转换的目标类型信息
    SELECT array_agg(c)::text[] INTO destTypeArray FROM(
        SELECT
            CASE WHEN atttypid = 19   THEN '	' || a.attname || ' text' -- name -> text
                 WHEN atttypid = 86   THEN '	' || a.attname || ' text' -- raw -> text
                 WHEN atttypid = 2950 THEN '	' || a.attname || ' text' --uuid -> text
                 ELSE NULL
            END AS c
        FROM pg_attribute a
        INNER JOIN pg_type t ON t.oid = a.atttypid
        WHERE attrelid = reloid AND attnum > 0 AND attisdropped = false
        ORDER BY attnum
    );
    detail := detail || clock_timestamp()::timestamptz(3) || E': target attnum+type array --\n    ' || destTypeArray::text || E'\n';

    -- 获取表定义
    detail := detail || clock_timestamp()::timestamptz(3) || ': start to get source table def ' || E'\n';
    SELECT pg_get_tabledef(reloid) INTO tableDef FROM DUAL;
    detail := detail || clock_timestamp()::timestamptz(3) || E': source table def --\n' || tableDef || E'\n';

    SELECT regexp_split_to_array(tableDef, '\n') INTO srcDef;

    detail := detail || clock_timestamp()::timestamptz(3) || ': start to gen temp table def ' || E'\n';
    colSeqNum := 0;

    --逐行解析表定义
    FOREACH lineStr IN ARRAY srcDef
    LOOP
        IF lineStr LIKE 'CREATE  TABLE %(' THEN
            -- 后续是列信息
            colSeqNum := 1;
        ELSIF lineStr LIKE 'WITH (%orientation=%)' THEN
            -- 列信息处理完毕
            lineStr := optStr;
            colSeqNum := 0;
        ELSIF colSeqNum > 0 AND typeReplace = true AND mode = 'r2c' THEN
            --- 处理列数据类型转换
            IF srcTypeArray[colSeqNum] IS NOT NULL THEN
                detail := detail || clock_timestamp()::timestamptz(3) || ': change column type --  FROM "' || srcTypeArray[colSeqNum] || '" TO ""' || destTypeArray[colSeqNum] || E'"\n';
                lineStr := replace(lineStr, srcTypeArray[colSeqNum], destTypeArray[colSeqNum]);
            END IF;
            colSeqNum := colSeqNum + 1;
        ELSIF lineStr LIKE 'SET search_path = %;' THEN
            -- 对象切换到schema dfm下
            lineStr := 'SET search_path = dfm;'; 
        ELSIF lineStr LIKE 'CREATE UNIQUE INDEX % ON % USING % WHERE %' OR lineStr LIKE 'CREATE INDEX % ON % USING % WHERE %' THEN
            -- 忽略表达式索引 
            skipIdxDef := skipIdxDef || '    ' ||  lineStr || E'\n';
            lineStr := NULL;
        ELSIF lineStr LIKE 'CREATE UNIQUE INDEX%ON%USING cbtree%' THEN
            lineStr := replace(lineStr, ' USING cbtree ', ' USING btree '); -- 把索引类型cbtree替换为btree
            lineStr := replace(lineStr, ' '|| schemaname || '.', ' ');  -- 去除索引定义里面表名前的schema名
        ELSIF lineStr LIKE 'CREATE UNIQUE INDEX%ON%USING btree%' THEN
            lineStr := replace(lineStr, ' '|| schemaname || '.', ' ');  -- 去除索引定义里面表名前的schema名
        ELSIF lineStr LIKE 'CREATE INDEX%ON%USING%' THEN
            IF destori = 'column' THEN
                -- 行存转列存场景:忽略非主键&非唯一索引
                skipIdxDef := skipIdxDef || '    ' ||  lineStr || E'\n';
                lineStr := NULL;
            ELSE
                IF lineStr LIKE 'CREATE INDEX%USING psort%' THEN
                    -- 列存转行存场景:忽略psort索引
                    skipIdxDef := skipIdxDef || '    ' ||  lineStr || E'\n';
                    lineStr := NULL;
                ELSIF lineStr LIKE 'CREATE INDEX%USING cbtree%' THEN
                    lineStr := replace(lineStr, ' USING cbtree ', ' USING btree '); -- 把索引类型cbtree替换为btree
                    lineStr := replace(lineStr, ' '|| schemaname || '.', ' ');  -- 去除索引定义里面表名前的schema名
                ELSIF lineStr LIKE 'CREATE INDEX%USING btree%' THEN
                    lineStr := replace(lineStr, ' '|| schemaname || '.', ' ');  -- 去除索引定义里面表名前的schema名
                ELSE
                    abort := 'UNSUPPORT INDEX: ' || lineStr; -- 其他类型索引为不支持的索引类型
                    EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
                    RETURN false;
                END IF;
            END IF;
        END IF;

        IF lineStr IS NULL THEN
            CONTINUE;
        ELSIF lineStr LIKE 'CREATE %INDEX % ON % USING %' OR lineStr LIKE 'ALTER TABLE % ADD CONSTRAINT %' THEN
            -- 单独提起索引定义,在数据导入后创建,加快迁移速度
            idxStr := idxStr || E'\n    ' || lineStr;
        ELSE
            defStr := defStr || E'\n    ' || lineStr;
        END IF;
    END LOOP;

    detail := detail || clock_timestamp()::timestamptz(3) || ': temp table def(defStr) -- '|| defStr || E'\n';
    IF idxStr IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || ': index info(idxStr) -- '|| idxStr || E'\n';
    ELSE
        detail := detail || clock_timestamp()::timestamptz(3) || ': index info(idxStr) IS NULL ' || E'\n';
    END IF;

    -- 获取原始表的权限信息
    fetchAclStr := '
    WITH t AS(
        SELECT c.relname as tablename, '' '' AS attname , relacl::text AS acl, 0 AS attnum
        FROM pg_class c
        WHERE oid = :1 AND relacl IS NOT NULL
        UNION ALL
        SELECT c.relname as tablename, attname, attacl::text, attnum
        FROM pg_attribute a INNER JOIN pg_class c ON c.oid = a.attrelid AND c.oid = :1
        AND attnum > 0 AND attacl IS NOT NULL
    )
    SELECT string_agg(tablename || E''\t\t\t|'' || attname || E''\t\t\t|'' || acl, E''\n    '' ORDER BY attnum) FROM t;';
    EXECUTE IMMEDIATE fetchAclStr INTO srcAcl  USING IN reloid;
    IF srcAcl IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || E':  original table''s acl info --\n    ' || srcAcl || E'\n';
    ELSE
        detail := detail || clock_timestamp()::timestamptz(3) || E':  original table''s acl info IS NULL';
    END IF;

    -- 构建权限信息
    fetchAclCmd := '
    WITH sub_obj AS(
        SELECT NULL::text AS objtype, proowner AS owneroid, pronamespace AS nspoid, proname as objectname, NULL AS subobjectname, proacl AS objacl
        FROM pg_proc WHERE false

        UNION ALL

        SELECT ''attribute'' AS objtype, relowner AS owneroid, c.relnamespace AS nspoid, c.relname as objectname, attname AS subobjectname, attacl
        FROM pg_attribute a INNER JOIN pg_class c ON c.oid = a.attrelid AND c.oid = :1

        UNION ALL

        SELECT CASE relkind WHEN ''r'' THEN ''table'' WHEN ''v'' THEN ''view'' WHEN ''S'' THEN ''sequence'' END AS objtype, relowner AS owneroid,
            relnamespace AS nspoid, relname as objectname, NULL AS subobjectname, relacl
        FROM pg_class WHERE relkind IN (''v'', ''r'', ''S'') AND oid = :1
    ),
    obj AS(
        SELECT objtype, owneroid, n.nspname as objectschema, objectname, subobjectname, objacl
        FROM sub_obj o
        LEFT JOIN pg_namespace n ON n.oid = o.nspoid
        WHERE objacl IS NOT NULL AND (o.nspoid IS NULL OR o.nspoid NOT IN (99,100,3987,3966,11))
        AND (objectschema IS NULL OR objectschema NOT IN (''information_schema'',''dbms_job'',''dbms_output'',''dbms_random'',''utl_raw'',''dbms_lob'', ''dbms_sql'', ''sys'', ''utl_file''))
    ),
    split_acl AS(
        SELECT
            objtype, owneroid, objectschema, objectname, subobjectname,
            (aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).grantor AS grantor,
            (aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).grantee AS grantee,
            (aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).privilege_type AS privilege,
            (aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).is_grantable AS grantable
        FROM obj
    ),
    filter_acl AS(
        SELECT
            objtype, owneroid, objectschema, objectname, subobjectname,
            grantor, grantee, privilege, grantable
        FROM split_acl

        UNION ALL

        SELECT
            ''role'' AS objtype, NULL AS owneroid, NULL AS objectschema, a1.rolname as objectname, NULL AS subobjectname,
            d.grantor AS grantor, d.member AS grantee, NULl AS privilege, false AS grantable
        FROM pg_auth_members d
        INNER JOIN pg_authid a1 ON a1.oid = d.roleid
    ),
    grantee_info AS(
        SELECT pg_authid.oid, pg_authid.rolname AS rolname FROM pg_authid
        UNION ALL
        SELECT 0::oid AS oid, ''public''::name AS rolname
    ),
    info AS(
        SELECT
            objtype,
            o.rolname AS objowner,
            objectschema AS objschema,
            objectname AS objname,
            subobjectname AS subobjname,
            g1.rolname AS grantor,
            g2.rolname AS grantee,
            privilege,
            grantable
        FROM filter_acl a
        LEFT JOIN pg_authid o ON o.oid = a.owneroid
        LEFT JOIN pg_authid g1 ON g1.oid = a.grantor
        LEFT JOIN grantee_info g2 ON g2.oid = a.grantee
        WHERE a.grantor <> a.grantee
    ),
    t AS(
        SELECT
            CASE WHEN objtype = ''attribute'' THEN
                ''GRANT '' || privilege || ''('' || quote_ident(subobjname) || '')'' || '' ON TABLE '' || ''dfm'' || ''.'' || quote_ident(objname) || '' TO '' || quote_ident(grantee)
            WHEN objtype = ''table'' THEN
                ''GRANT '' || privilege || '' ON TABLE '' || ''dfm'' || ''.'' || quote_ident(objname) || '' TO '' || quote_ident(grantee)
            END ||
            CASE WHEN grantable THEN '' WITH GRANT OPTION '' ELSE '' '' END || '';'' AS acl
        FROM info
        WHERE objtype <> ''role''
    )
    SELECT E''\n    '' || string_agg(acl, E''\n    '')  FROM t';
    IF srcAcl IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || E': try to gen acl info(aclStr)' || E'\n';
        EXECUTE IMMEDIATE fetchAclCmd INTO aclStr USING IN reloid;
        defStr := defStr || aclStr;
        detail := detail || clock_timestamp()::timestamptz(3) || ': acl info(aclStr) --' || aclStr || E'\n';
    END IF;

    -- 修改中间表的owner为原表的owner
    SELECT E'\n    ALTER TABLE dfm.' || quote_ident(tablename) || ' OWNER TO ' || quote_ident(pg_get_userbyid(relowner)) || ';' INTO ownerStr FROM pg_class WHERE oid = reloid;
    detail := detail || clock_timestamp()::timestamptz(3) || ': owner info(ownerStr) -- ' || ownerStr || E'\n';
    defStr := defStr || ownerStr;

    -- 创建中间表
    detail := detail || clock_timestamp()::timestamptz(3) || ': start to create temp table ( defStr + aclStr + ownerStr) -- ' || defStr || E'\n';
    EXECUTE IMMEDIATE  defStr;

    -- 往中间表导入数据
    queryStr := 'INSERT INTO dfm.' || quote_ident(tablename) || ' SELECT * FROM ' || quote_ident(schemaname) || '.' || quote_ident(tablename);
    detail := detail || clock_timestamp()::timestamptz(3) || E': start to import data to dfm.' || quote_ident(tablename) || E' -- \n    ' || queryStr || E'\n';
    EXECUTE IMMEDIATE queryStr;

    -- 计算原表行数和checksum值
    queryStr := 'SELECT count(1)::numeric, checksum(' || quote_ident(tablename) || '::text) FROM ' || quote_ident(schemaname) || '.' || quote_ident(tablename);
    detail := detail || clock_timestamp()::timestamptz(3) || E': start to calculate source tables''s the verification information --\n    ' || queryStr || E'\n';
    EXECUTE IMMEDIATE queryStr  INTO cnt1, csum1;
    detail := detail || clock_timestamp()::timestamptz(3) || ': source table('|| quote_ident(schemaname) || '.' || quote_ident(tablename)||') verification info -- count = '|| cnt1 || ' checksum = ' || csum1 || E'\n';

    -- 计算中间表行数和checksum值
    queryStr := 'SELECT count(1)::numeric, checksum(' || quote_ident(tablename) || '::text) FROM dfm.' || quote_ident(tablename);
    detail := detail || clock_timestamp()::timestamptz(3) || E': start to calculate temp tables''s the verification information -- \n    ' || queryStr || E'\n';
    EXECUTE IMMEDIATE queryStr  INTO cnt2, csum2;
    detail := detail || clock_timestamp()::timestamptz(3) || ': temp table(dfm.' || quote_ident(tablename) || ') verification info -- count = '|| cnt2 || ' checksum = ' || csum2 || E'\n';

    -- 校验checksum,如果行数不一致或者checksum不一致,则报错
    IF cnt2 <> cnt1 OR csum1 <> csum2 THEN
        abort := 'data verification failed. ';
        EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
        EXECUTE IMMEDIATE 'DROP TABLE dfm.'|| quote_ident(tablename);
        RETURN false;
    END IF;

    -- 重建索引
    IF idxStr IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || ': start to create index on temp table(dfm.' || quote_ident(tablename) || ') --'|| idxStr || E'\n';
        EXECUTE IMMEDIATE idxStr;
    END IF;

    -- 构建视图重建语句
    detail := detail || clock_timestamp()::timestamptz(3) || ': start to gen rebuild sql' || E'\n';
    SELECT
        string_agg('ALTER VIEW ONLY ' || quote_ident(n1.nspname) || '.' || quote_ident(c1.relname) || ' REBUILD;', E'\n    ') INTO rebuildStr
    FROM pg_catalog.pg_clASs c
    INNER JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid AND n.nspname NOT IN ('cstore', 'pmk'))
    INNER JOIN (SELECT DISTINCT objid,refobjid FROM pg_catalog.pg_depend WHERE classid = 2618 AND refclassid = 1259 AND objid > 16384 AND refobjid > 16384) d ON (c.oid = d.refobjid)
    INNER JOIN pg_catalog.pg_rewrite r ON d.objid = r.oid
    INNER JOIN pg_catalog.pg_clASs c1 ON (c1.oid = r.ev_clASs AND c1.relkind = 'v' AND c1.oid <> c.oid)
    INNER JOIN pg_catalog.pg_namespace n1 ON(c1.relnamespace = n1.oid AND n1.nspname NOT IN ('cstore', 'pmk'))
    WHERE (c.oid >= 16384 AND c.relkind IN ('r', 'v'))
    AND (c1.oid >= 16384 AND c1.relkind = 'v') AND c.oid = reloid;
    IF rebuildStr IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || E': rebuild view sql info(rebuildStr) --\n    ' || rebuildStr || E'\n';
    ELSE
        detail := detail || clock_timestamp()::timestamptz(3) || ': rebuild view sql(rebuildStr) IS NULL' || E'\n';
    END IF;

    -- 判断表是否有私有sequence信息
    fetchSeqStr := '
    WITH t AS(
        SELECT
            n.nspname as tableschema,
            c.relname as tablename,
            a.attname,
            format_type(a.atttypid, a.atttypmod) AS atttype,
            c1.oid as sequenceoid,
            n1.nspname AS sequenceschema,
            c1.relname AS sequencename,
            quote_ident(pg_get_userbyid(c.relowner)) AS owner,
            pg_get_expr(f.adbin, 0) as defstr
        FROM pg_depend d
        INNER JOIN pg_class c ON c.oid = d.refobjid
        INNER JOIN pg_namespace n ON n.oid = c.relnamespace
        INNER JOIN pg_class c1 ON c1.oid = d.objid
        INNER JOIN pg_namespace n1 ON n1.oid = c1.relnamespace
        LEFT JOIN pg_attrdef f ON f.adrelid = c.oid AND (f.adnum = d.refobjsubid)
        INNER JOIN pg_attribute a ON a.attnum = d.refobjsubid AND a.attrelid = c.oid
        WHERE refclassid = 1259
        AND classid = 1259
        AND refobjid = :1
        AND deptype = ''a''
        AND refobjsubid > 0
        AND objsubid = 0
        AND c1.relkind = ''S'' AND c.relkind = ''r''
        AND (defstr IS NULL OR defstr like ''nextval(%)%'')
        AND c.relowner = c1.relowner
        AND c.relnamespace = c1.relnamespace
    )
    SELECT /*+ set global(enable_hashjoin off) set global(enable_mergejoin off) */
        attname, sequencename, atttype, owner, defstr, sequenceoid
    FROM t
    ORDER BY attname, sequencename';

    seqTmp := NULL;
    detail := detail || clock_timestamp()::timestamptz(3) || ': start to migrate sequence to temp table.' || E'\n';
    OPEN myCur FOR fetchSeqStr USING reloid;
    FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
    WHILE myCur%FOUND LOOP
        EXECUTE IMMEDIATE fetchAclStr INTO srcAcl USING IN seqoid;

        -- 创建跟原表schema和owner相同的中间表,供sequence的bind信息和 defalut表达式信息迁移使用
        IF seqTmp IS NULL THEN
            seqTmp := pg_current_sessionid();
            queryStr := 'CREATE TABLE ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || '(LIKE ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || E' EXCLUDING ALL EXCLUDING DEFAULTS INCLUDING RELOPTIONS);\n    ';
            queryStr := queryStr || 'ALTER TABLE ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ' OWNER TO ' ||  quote_ident(seqOwner) || E';\n    ' ;
            detail := detail || clock_timestamp()::timestamptz(3) || ': create temp table(' || quote_ident(seqTmp) || ') for migrate sequence('|| quote_ident(seqName) || E') -- \n    ' || queryStr;
            EXECUTE IMMEDIATE queryStr;
        END IF;

        IF srcAcl IS NOT NULL THEN
            detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info --\n    ' || srcAcl || E'\n';
        ELSE
            detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info IS NULL\n';
        END IF;

     -- 把 sequence 的 bind 信息和 defalut 表达式信息迁移到中间表上
        queryStr := NULL;
        IF trim(seqExpr) IS NOT NULL THEN
            queryStr := 'ALTER TABLE '|| quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ' ALTER COLUMN ' || quote_ident(seqCol) || ' SET DEFAULT ' || seqExpr ||  E';\n    ' ;
        END IF;
        queryStr := queryStr || 'ALTER SEQUENCE '|| quote_ident(schemaname) || '.' || quote_ident(seqName) || ' OWNED BY ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || '.' || quote_ident(seqCol) || ';' ;
        detail := detail || clock_timestamp()::timestamptz(3) || ': gen info for migrate sequence('|| quote_ident(seqName) || ') to temp table('|| quote_ident(seqTmp) || E') -- \n    ' || queryStr || E'\n';
        EXECUTE IMMEDIATE queryStr;

        FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
    END LOOP;
    CLOSE myCur;

    -- 删除原表
    queryStr := 'DROP TABLE '|| quote_ident(schemaname) || '.' || quote_ident(tablename) || ';';
    detail := detail || clock_timestamp()::timestamptz(3) || E': start to drop source table -- \n    ' || queryStr || E'\n';
    EXECUTE IMMEDIATE queryStr;
    -- 把中间表的schema修改为原表schema
    queryStr := 'ALTER TABLE dfm.' || quote_ident(tablename) || ' SET SCHEMA ' || quote_ident(schemaname) || ';';
    detail := detail || clock_timestamp()::timestamptz(3) || E': start to rename temp table schema --\n    '|| queryStr || E'\n';
    EXECUTE IMMEDIATE queryStr;

    -- 把中间表的oid
    IF seqTmp IS NOT NULL THEN
        -- reloid 刷新为sequence中间表的oid
        SELECT c.oid INTO reloid
        FROM pg_class c
        LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
        WHERE n.nspname = schemaname AND c.relname = seqTmp;

        -- 把sequence迁移到目标表
        OPEN myCur FOR fetchSeqStr USING reloid;
        FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
        WHILE myCur%FOUND LOOP
             -- 把 sequence 的 bind 信息迁移到中间表上
            queryStr := 'ALTER SEQUENCE '|| quote_ident(schemaname) || '.' || quote_ident(seqName) || ' OWNED BY ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || '.' || quote_ident(seqCol);
            detail := detail || clock_timestamp()::timestamptz(3) || ': start to migrate sequence('|| quote_ident(seqName) || E') to target table -- \n    ' || queryStr || E'\n';
            EXECUTE IMMEDIATE queryStr;

             -- 保存sequence的权限信息,供后续校验
            EXECUTE IMMEDIATE fetchAclStr INTO srcAcl USING IN seqoid;
            IF srcAcl IS NOT NULL THEN
                detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info --\n    ' || srcAcl || E'\n';
            ELSE
                detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info IS NULL\n';
            END IF;

            FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
        END LOOP;

        -- 删除转移sequence信息的中间表
        queryStr := 'DROP TABLE '|| quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ';' ;
        detail := detail || clock_timestamp()::timestamptz(3) || ': start to drop temp table(' || quote_ident(schemaname) || '.' || pg_current_sessionid() || E') -- \n    ' || queryStr || E'\n';
        EXECUTE IMMEDIATE queryStr;
        CLOSE myCur;
    END IF;

    -- 视图重建
    IF rebuildStr IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || E': start to rebuild view \n    ' || rebuildStr || E'\n';
        EXECUTE IMMEDIATE rebuildStr;
    END IF;

    -- reloid 刷新为最终目标表的oid
    SELECT c.oid INTO reloid
    FROM pg_class c
    LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = schemaname AND c.relname = tablename;

    -- 获取最终目标表的权限信息
    EXECUTE IMMEDIATE fetchAclStr INTO newAcl  USING IN reloid;
    IF newAcl IS NOT NULL THEN
        detail := detail || clock_timestamp()::timestamptz(3) || E':  original table''s acl info --\n    ' || srcAcl || E'\n';
    ELSE
        detail := detail || clock_timestamp()::timestamptz(3) || E':  original table''s acl info IS NULL\n    ';
    END IF;

    detail := detail || clock_timestamp()::timestamptz(3) || E': finish switch\n';
    EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), true, abort, detail, skipIdxDef;
    RETURN true;

    EXCEPTION WHEN OTHERS THEN
        get stacked diagnostics errMsg=MESSAGE_TEXT, errCxt=PG_EXCEPTION_CONTEXT, errDetail=PG_EXCEPTION_DETAIL;
        abort := 'ERROR: ' || errMsg;
        IF errDetail IS NOT NULL THEN
            abort := abort || E'\nDETAIL: ' || errDetail;
        END IF;

        IF errHint IS NOT NULL THEN
            abort := abort || E'\nHINT: ' || errHint;
        END IF;

        IF errCxt IS NOT NULL THEN
            abort := abort || E'\nCONTEXT: ' || errCxt;
        END IF;

        EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;

        RETURN false;
END
$function$
;

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。