GaussDB(DWS)运维 -- 行存存储和列存转换
【摘要】 提供一种行存表和列存表专项转换的方法
--
-- 使用场景: 行存表转列存表/列存表转行存表 转换自定义函数
-- 1. 可保留注释、索引和约束的前后一致性
-- 2. 建议打开视图重建功能,否则表存在依赖对象时会转换失败
-- 3. 日志会记录详细的执行日志信息,包括转换前后的表定义、权限和索引信息
-- 4. 转换失败后日志会记录详细的失败信息
-- 5. 行存到列存的转换时,如果函数的第四个参数typeReplace位true,会自动把raw/uuid/name转换成text,否则报错
--
-- 参数说明
-- schemaname: 表的schema
-- tablename: 表名
-- mode: 转化模式, r2c为行转列 c2r为列转行
--
-- 使用约束
-- 1. 转换期间对表加7->8级锁,其他session对表只有读权限
-- 2. 行存转列存时
-- 2.1.只保留主键和唯一约束/唯一索引,其它索引不会迁移到列存表上
-- 2.2.存在where条件的表达式索引不会迁移到列存表上
-- 2.3.非btree索引不会迁移到列存表上
-- 2.4.没有迁移的索引会记录在字段skipidxdef中
-- 3. 列存转行存时
-- 3.1.psort索引不会迁移到行存表上
-- 3.2.没有迁移的索引会记录在字段skipidxdef中
-- 4. 如果当前存储格式和目标格式一致,不做转换,标记转换成功
-- 5. 如果checksum校验或者表行数校验不通过,转换失败
-- 6. 因行列存压缩级别配置不一样,新建对象均为默认压缩级别
-- 7. 默认只处理非临时表(非CREATE TEMP创建的),临时表转换失败,报'object doesnot exist'
-- 8. 单表存在serial内置生成的sequence或者ALTER SEQUEDCE OWNED BY
-- 创建的sequence时,支持转换,转换前后sequence取值和权限保持不变
/*
--转换操作日志表
CREATE SCHEMA dfm;
DROP TABLE IF EXISTS dfm.switch_log;
CREATE TABLE IF NOT EXISTS dfm.switch_log(
node text default pgxc_node_str(),
pid bigint default split_part(pg_current_sessionid(), '.', 2),
schemaname text,
tablename text,
mode text,
start timestamptz(3),
finish timestamptz(3),
sucess bool,
abort text,
detail text,
skipidxdef text
) DISTRIBUTE BY HASH(tablename, schemaname);
*/
CREATE OR REPLACE FUNCTION dfm.switch_orientation(schemaname text, tablename text, mode text, typeReplace bool default false)
RETURNS boolean
LANGUAGE plpgsql
SECURITY DEFINER NOT FENCED NOT SHIPPABLE
AS $function$
DECLARE
result bool;
destori text;
srcori text;
optStr text;
tableDef text;
srcDef text[];
lineStr text;
defStr text;
fetchAclStr text;
fetchAclCmd text;
aclStr text;
srcAcl text;
newAcl text;
idxStr text;
ownerStr text;
rebuildStr text;
skipIdxDef text;
csum1 numeric;
csum2 numeric;
cnt1 numeric;
cnt2 numeric;
queryStr text;
seqOid oid;
seqName text;
seqCol text;
seqColType text;
seqOwner text;
seqExpr text;
seqTmp text;
myCur refcursor;
fetchSeqStr text;
seqCtr int;
seqSrcAcl text;
seqNewAcl text;
errMsg text;
errCxt text;
errDetail text;
errHint text;
start timestamptz;
finish timestamptz;
abort text;
detail text;
reloid oid;
insertLogStr text;
colSeqNum int;
srcTypeArray text[];
destTypeArray text[];
BEGIN
SET LOCAL client_min_messages = 'WARNING';
SET LOCAL max_query_retry_times = 0;
skipIdxDef := NULL;
insertLogStr := 'INSERT INTO dfm.switch_log(schemaname, tablename, mode, start, finish, sucess, abort, detail, skipidxdef) VALUES(:1, :2, :3, :4, :5, :6, :7, :8, :9)';
-- detail := E'-- version 5.2(2023-2-23 19:00)\n';
-- 校验转换模式
start := clock_timestamp()::timestamptz(3);
detail := detail || start || ': check switch mode' || E'\n';
IF mode = 'r2c' THEN -- 行转列
detail := detail || start || ': try to switch table ' || schemaname || '.' || tablename || ' from row to column' || E'\n';
destori := 'column';
ELSIF mode = 'c2r' THEN -- 列转行
detail := detail || start || ': try to switch table ' || schemaname || '.' || tablename || ' from column to row' || E'\n';
destori := 'row';
ELSE -- 校验失败
abort := 'invalid mode: ' || mode;
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
RETURN false;
END IF;
-- 确认表对象存在
detail := detail || clock_timestamp()::timestamptz(3) || ': try to find table in database ' || E'\n';
PERFORM c.oid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = schemaname AND c.relname = tablename
AND relpersistence <> 't'
AND c.relkind = 'r' AND c.reloptions::text NOT LIKE '%internal_mask%';
IF NOT FOUND THEN
abort := 'object doesnot exist';
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
RETURN false;
END IF;
-- 锁表:7级锁,表只读,防止并发的数据修改动作
queryStr := 'LOCK TABLE ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || ' IN EXCLUSIVE MODE;';
detail := detail || clock_timestamp()::timestamptz(3) || E': try to lock table --\n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
-- 获取表的oid
SELECT c.oid INTO reloid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = schemaname AND c.relname = tablename
AND relpersistence <> 't'
AND c.relkind = 'r' AND c.reloptions::text NOT LIKE '%internal_mask%';
-- 校验表是否存在依赖对象;如果存在&视图解耦开关没有打开,则报错
PERFORM 1 FROM pg_settings WHERE name = 'view_independent' AND setting = 'on';
IF NOT FOUND THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': view_independent = off' || E'\n';
PERFORM 1 FROM gs_view_dependency_basic WHERE refobjschema = schemaname AND refobjname = tablename AND refobjectid = reloid;
IF FOUND THEN
abort := 'some object depend on current table';
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
RETURN false;
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': no dependent objects found' || E'\n';
END IF;
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': view_independent = on' || E'\n';
END IF;
-- 获取表的当前存储格式
SELECT
t1.option_value INTO srcori
FROM (
SELECT
(pg_catalog.pg_options_to_table(c.reloptions)).option_name AS option_name,
(pg_catalog.pg_options_to_table(c.reloptions)).option_value AS option_value
FROM pg_class c
WHERE oid = reloid
) t1
WHERE t1.option_name IN ('orientation');
-- 当前存储格式已经是预期格式
IF srcori = destori THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': it is already '|| srcori || E'\n';
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), true, abort, detail, skipIdxDef;
RETURN true;
END IF;
-- 生成预期的option(注意中间几个只有行存或者列存支持的option被抛弃掉了)
detail := detail || clock_timestamp()::timestamptz(3) || ': gen reloptions ' || E'\n';
WITH src AS (
SELECT
(pg_catalog.pg_options_to_table(c.reloptions)).option_name AS option_name,
(pg_catalog.pg_options_to_table(c.reloptions)).option_value AS option_value
FROM pg_class c
WHERE oid = reloid
),
t AS(
SELECT
option_name, option_value
FROM src t1
WHERE t1.option_name NOT IN ('orientation', 'compression', 'enable_hstore', 'colversion', 'enable_delta', 'fillfactor')
UNION ALL
SELECT 'orientation', destori
)
SELECT
'WITH(' || string_agg(option_name || '=' || option_value, ',') || ')' INTO optStr
FROM t;
-- 获取原表的 name raw uuid 字段类型信息
SELECT array_agg(c)::text[] INTO srcTypeArray FROM(
SELECT
CASE WHEN atttypid = 19 THEN ' ' || a.attname || ' name' -- name
WHEN atttypid = 86 THEN ' ' || a.attname || ' raw' -- raw
WHEN atttypid = 2950 THEN ' ' || a.attname || ' uuid' --uuid
ELSE NULL
END AS c
FROM pg_attribute a
INNER JOIN pg_type t ON t.oid = a.atttypid
WHERE attrelid = reloid AND attnum > 0 AND attisdropped = false
ORDER BY attnum
);
detail := detail || clock_timestamp()::timestamptz(3) || E': source attnum+type array --\n ' || srcTypeArray::text || E'\n';
-- 获取name raw uuid字段类型要转换的目标类型信息
SELECT array_agg(c)::text[] INTO destTypeArray FROM(
SELECT
CASE WHEN atttypid = 19 THEN ' ' || a.attname || ' text' -- name -> text
WHEN atttypid = 86 THEN ' ' || a.attname || ' text' -- raw -> text
WHEN atttypid = 2950 THEN ' ' || a.attname || ' text' --uuid -> text
ELSE NULL
END AS c
FROM pg_attribute a
INNER JOIN pg_type t ON t.oid = a.atttypid
WHERE attrelid = reloid AND attnum > 0 AND attisdropped = false
ORDER BY attnum
);
detail := detail || clock_timestamp()::timestamptz(3) || E': target attnum+type array --\n ' || destTypeArray::text || E'\n';
-- 获取表定义
detail := detail || clock_timestamp()::timestamptz(3) || ': start to get source table def ' || E'\n';
SELECT pg_get_tabledef(reloid) INTO tableDef FROM DUAL;
detail := detail || clock_timestamp()::timestamptz(3) || E': source table def --\n' || tableDef || E'\n';
SELECT regexp_split_to_array(tableDef, '\n') INTO srcDef;
detail := detail || clock_timestamp()::timestamptz(3) || ': start to gen temp table def ' || E'\n';
colSeqNum := 0;
--逐行解析表定义
FOREACH lineStr IN ARRAY srcDef
LOOP
IF lineStr LIKE 'CREATE TABLE %(' THEN
-- 后续是列信息
colSeqNum := 1;
ELSIF lineStr LIKE 'WITH (%orientation=%)' THEN
-- 列信息处理完毕
lineStr := optStr;
colSeqNum := 0;
ELSIF colSeqNum > 0 AND typeReplace = true AND mode = 'r2c' THEN
--- 处理列数据类型转换
IF srcTypeArray[colSeqNum] IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': change column type -- FROM "' || srcTypeArray[colSeqNum] || '" TO ""' || destTypeArray[colSeqNum] || E'"\n';
lineStr := replace(lineStr, srcTypeArray[colSeqNum], destTypeArray[colSeqNum]);
END IF;
colSeqNum := colSeqNum + 1;
ELSIF lineStr LIKE 'SET search_path = %;' THEN
-- 对象切换到schema dfm下
lineStr := 'SET search_path = dfm;';
ELSIF lineStr LIKE 'CREATE UNIQUE INDEX % ON % USING % WHERE %' OR lineStr LIKE 'CREATE INDEX % ON % USING % WHERE %' THEN
-- 忽略表达式索引
skipIdxDef := skipIdxDef || ' ' || lineStr || E'\n';
lineStr := NULL;
ELSIF lineStr LIKE 'CREATE UNIQUE INDEX%ON%USING cbtree%' THEN
lineStr := replace(lineStr, ' USING cbtree ', ' USING btree '); -- 把索引类型cbtree替换为btree
lineStr := replace(lineStr, ' '|| schemaname || '.', ' '); -- 去除索引定义里面表名前的schema名
ELSIF lineStr LIKE 'CREATE UNIQUE INDEX%ON%USING btree%' THEN
lineStr := replace(lineStr, ' '|| schemaname || '.', ' '); -- 去除索引定义里面表名前的schema名
ELSIF lineStr LIKE 'CREATE INDEX%ON%USING%' THEN
IF destori = 'column' THEN
-- 行存转列存场景:忽略非主键&非唯一索引
skipIdxDef := skipIdxDef || ' ' || lineStr || E'\n';
lineStr := NULL;
ELSE
IF lineStr LIKE 'CREATE INDEX%USING psort%' THEN
-- 列存转行存场景:忽略psort索引
skipIdxDef := skipIdxDef || ' ' || lineStr || E'\n';
lineStr := NULL;
ELSIF lineStr LIKE 'CREATE INDEX%USING cbtree%' THEN
lineStr := replace(lineStr, ' USING cbtree ', ' USING btree '); -- 把索引类型cbtree替换为btree
lineStr := replace(lineStr, ' '|| schemaname || '.', ' '); -- 去除索引定义里面表名前的schema名
ELSIF lineStr LIKE 'CREATE INDEX%USING btree%' THEN
lineStr := replace(lineStr, ' '|| schemaname || '.', ' '); -- 去除索引定义里面表名前的schema名
ELSE
abort := 'UNSUPPORT INDEX: ' || lineStr; -- 其他类型索引为不支持的索引类型
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
RETURN false;
END IF;
END IF;
END IF;
IF lineStr IS NULL THEN
CONTINUE;
ELSIF lineStr LIKE 'CREATE %INDEX % ON % USING %' OR lineStr LIKE 'ALTER TABLE % ADD CONSTRAINT %' THEN
-- 单独提起索引定义,在数据导入后创建,加快迁移速度
idxStr := idxStr || E'\n ' || lineStr;
ELSE
defStr := defStr || E'\n ' || lineStr;
END IF;
END LOOP;
detail := detail || clock_timestamp()::timestamptz(3) || ': temp table def(defStr) -- '|| defStr || E'\n';
IF idxStr IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': index info(idxStr) -- '|| idxStr || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': index info(idxStr) IS NULL ' || E'\n';
END IF;
-- 获取原始表的权限信息
fetchAclStr := '
WITH t AS(
SELECT c.relname as tablename, '' '' AS attname , relacl::text AS acl, 0 AS attnum
FROM pg_class c
WHERE oid = :1 AND relacl IS NOT NULL
UNION ALL
SELECT c.relname as tablename, attname, attacl::text, attnum
FROM pg_attribute a INNER JOIN pg_class c ON c.oid = a.attrelid AND c.oid = :1
AND attnum > 0 AND attacl IS NOT NULL
)
SELECT string_agg(tablename || E''\t\t\t|'' || attname || E''\t\t\t|'' || acl, E''\n '' ORDER BY attnum) FROM t;';
EXECUTE IMMEDIATE fetchAclStr INTO srcAcl USING IN reloid;
IF srcAcl IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || E': original table''s acl info --\n ' || srcAcl || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || E': original table''s acl info IS NULL';
END IF;
-- 构建权限信息
fetchAclCmd := '
WITH sub_obj AS(
SELECT NULL::text AS objtype, proowner AS owneroid, pronamespace AS nspoid, proname as objectname, NULL AS subobjectname, proacl AS objacl
FROM pg_proc WHERE false
UNION ALL
SELECT ''attribute'' AS objtype, relowner AS owneroid, c.relnamespace AS nspoid, c.relname as objectname, attname AS subobjectname, attacl
FROM pg_attribute a INNER JOIN pg_class c ON c.oid = a.attrelid AND c.oid = :1
UNION ALL
SELECT CASE relkind WHEN ''r'' THEN ''table'' WHEN ''v'' THEN ''view'' WHEN ''S'' THEN ''sequence'' END AS objtype, relowner AS owneroid,
relnamespace AS nspoid, relname as objectname, NULL AS subobjectname, relacl
FROM pg_class WHERE relkind IN (''v'', ''r'', ''S'') AND oid = :1
),
obj AS(
SELECT objtype, owneroid, n.nspname as objectschema, objectname, subobjectname, objacl
FROM sub_obj o
LEFT JOIN pg_namespace n ON n.oid = o.nspoid
WHERE objacl IS NOT NULL AND (o.nspoid IS NULL OR o.nspoid NOT IN (99,100,3987,3966,11))
AND (objectschema IS NULL OR objectschema NOT IN (''information_schema'',''dbms_job'',''dbms_output'',''dbms_random'',''utl_raw'',''dbms_lob'', ''dbms_sql'', ''sys'', ''utl_file''))
),
split_acl AS(
SELECT
objtype, owneroid, objectschema, objectname, subobjectname,
(aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).grantor AS grantor,
(aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).grantee AS grantee,
(aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).privilege_type AS privilege,
(aclexplode(COALESCE(objacl, acldefault(''r''::"char", owneroid)))).is_grantable AS grantable
FROM obj
),
filter_acl AS(
SELECT
objtype, owneroid, objectschema, objectname, subobjectname,
grantor, grantee, privilege, grantable
FROM split_acl
UNION ALL
SELECT
''role'' AS objtype, NULL AS owneroid, NULL AS objectschema, a1.rolname as objectname, NULL AS subobjectname,
d.grantor AS grantor, d.member AS grantee, NULl AS privilege, false AS grantable
FROM pg_auth_members d
INNER JOIN pg_authid a1 ON a1.oid = d.roleid
),
grantee_info AS(
SELECT pg_authid.oid, pg_authid.rolname AS rolname FROM pg_authid
UNION ALL
SELECT 0::oid AS oid, ''public''::name AS rolname
),
info AS(
SELECT
objtype,
o.rolname AS objowner,
objectschema AS objschema,
objectname AS objname,
subobjectname AS subobjname,
g1.rolname AS grantor,
g2.rolname AS grantee,
privilege,
grantable
FROM filter_acl a
LEFT JOIN pg_authid o ON o.oid = a.owneroid
LEFT JOIN pg_authid g1 ON g1.oid = a.grantor
LEFT JOIN grantee_info g2 ON g2.oid = a.grantee
WHERE a.grantor <> a.grantee
),
t AS(
SELECT
CASE WHEN objtype = ''attribute'' THEN
''GRANT '' || privilege || ''('' || quote_ident(subobjname) || '')'' || '' ON TABLE '' || ''dfm'' || ''.'' || quote_ident(objname) || '' TO '' || quote_ident(grantee)
WHEN objtype = ''table'' THEN
''GRANT '' || privilege || '' ON TABLE '' || ''dfm'' || ''.'' || quote_ident(objname) || '' TO '' || quote_ident(grantee)
END ||
CASE WHEN grantable THEN '' WITH GRANT OPTION '' ELSE '' '' END || '';'' AS acl
FROM info
WHERE objtype <> ''role''
)
SELECT E''\n '' || string_agg(acl, E''\n '') FROM t';
IF srcAcl IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || E': try to gen acl info(aclStr)' || E'\n';
EXECUTE IMMEDIATE fetchAclCmd INTO aclStr USING IN reloid;
defStr := defStr || aclStr;
detail := detail || clock_timestamp()::timestamptz(3) || ': acl info(aclStr) --' || aclStr || E'\n';
END IF;
-- 修改中间表的owner为原表的owner
SELECT E'\n ALTER TABLE dfm.' || quote_ident(tablename) || ' OWNER TO ' || quote_ident(pg_get_userbyid(relowner)) || ';' INTO ownerStr FROM pg_class WHERE oid = reloid;
detail := detail || clock_timestamp()::timestamptz(3) || ': owner info(ownerStr) -- ' || ownerStr || E'\n';
defStr := defStr || ownerStr;
-- 创建中间表
detail := detail || clock_timestamp()::timestamptz(3) || ': start to create temp table ( defStr + aclStr + ownerStr) -- ' || defStr || E'\n';
EXECUTE IMMEDIATE defStr;
-- 往中间表导入数据
queryStr := 'INSERT INTO dfm.' || quote_ident(tablename) || ' SELECT * FROM ' || quote_ident(schemaname) || '.' || quote_ident(tablename);
detail := detail || clock_timestamp()::timestamptz(3) || E': start to import data to dfm.' || quote_ident(tablename) || E' -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
-- 计算原表行数和checksum值
queryStr := 'SELECT count(1)::numeric, checksum(' || quote_ident(tablename) || '::text) FROM ' || quote_ident(schemaname) || '.' || quote_ident(tablename);
detail := detail || clock_timestamp()::timestamptz(3) || E': start to calculate source tables''s the verification information --\n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr INTO cnt1, csum1;
detail := detail || clock_timestamp()::timestamptz(3) || ': source table('|| quote_ident(schemaname) || '.' || quote_ident(tablename)||') verification info -- count = '|| cnt1 || ' checksum = ' || csum1 || E'\n';
-- 计算中间表行数和checksum值
queryStr := 'SELECT count(1)::numeric, checksum(' || quote_ident(tablename) || '::text) FROM dfm.' || quote_ident(tablename);
detail := detail || clock_timestamp()::timestamptz(3) || E': start to calculate temp tables''s the verification information -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr INTO cnt2, csum2;
detail := detail || clock_timestamp()::timestamptz(3) || ': temp table(dfm.' || quote_ident(tablename) || ') verification info -- count = '|| cnt2 || ' checksum = ' || csum2 || E'\n';
-- 校验checksum,如果行数不一致或者checksum不一致,则报错
IF cnt2 <> cnt1 OR csum1 <> csum2 THEN
abort := 'data verification failed. ';
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
EXECUTE IMMEDIATE 'DROP TABLE dfm.'|| quote_ident(tablename);
RETURN false;
END IF;
-- 重建索引
IF idxStr IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': start to create index on temp table(dfm.' || quote_ident(tablename) || ') --'|| idxStr || E'\n';
EXECUTE IMMEDIATE idxStr;
END IF;
-- 构建视图重建语句
detail := detail || clock_timestamp()::timestamptz(3) || ': start to gen rebuild sql' || E'\n';
SELECT
string_agg('ALTER VIEW ONLY ' || quote_ident(n1.nspname) || '.' || quote_ident(c1.relname) || ' REBUILD;', E'\n ') INTO rebuildStr
FROM pg_catalog.pg_clASs c
INNER JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid AND n.nspname NOT IN ('cstore', 'pmk'))
INNER JOIN (SELECT DISTINCT objid,refobjid FROM pg_catalog.pg_depend WHERE classid = 2618 AND refclassid = 1259 AND objid > 16384 AND refobjid > 16384) d ON (c.oid = d.refobjid)
INNER JOIN pg_catalog.pg_rewrite r ON d.objid = r.oid
INNER JOIN pg_catalog.pg_clASs c1 ON (c1.oid = r.ev_clASs AND c1.relkind = 'v' AND c1.oid <> c.oid)
INNER JOIN pg_catalog.pg_namespace n1 ON(c1.relnamespace = n1.oid AND n1.nspname NOT IN ('cstore', 'pmk'))
WHERE (c.oid >= 16384 AND c.relkind IN ('r', 'v'))
AND (c1.oid >= 16384 AND c1.relkind = 'v') AND c.oid = reloid;
IF rebuildStr IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || E': rebuild view sql info(rebuildStr) --\n ' || rebuildStr || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': rebuild view sql(rebuildStr) IS NULL' || E'\n';
END IF;
-- 判断表是否有私有sequence信息
fetchSeqStr := '
WITH t AS(
SELECT
n.nspname as tableschema,
c.relname as tablename,
a.attname,
format_type(a.atttypid, a.atttypmod) AS atttype,
c1.oid as sequenceoid,
n1.nspname AS sequenceschema,
c1.relname AS sequencename,
quote_ident(pg_get_userbyid(c.relowner)) AS owner,
pg_get_expr(f.adbin, 0) as defstr
FROM pg_depend d
INNER JOIN pg_class c ON c.oid = d.refobjid
INNER JOIN pg_namespace n ON n.oid = c.relnamespace
INNER JOIN pg_class c1 ON c1.oid = d.objid
INNER JOIN pg_namespace n1 ON n1.oid = c1.relnamespace
LEFT JOIN pg_attrdef f ON f.adrelid = c.oid AND (f.adnum = d.refobjsubid)
INNER JOIN pg_attribute a ON a.attnum = d.refobjsubid AND a.attrelid = c.oid
WHERE refclassid = 1259
AND classid = 1259
AND refobjid = :1
AND deptype = ''a''
AND refobjsubid > 0
AND objsubid = 0
AND c1.relkind = ''S'' AND c.relkind = ''r''
AND (defstr IS NULL OR defstr like ''nextval(%)%'')
AND c.relowner = c1.relowner
AND c.relnamespace = c1.relnamespace
)
SELECT /*+ set global(enable_hashjoin off) set global(enable_mergejoin off) */
attname, sequencename, atttype, owner, defstr, sequenceoid
FROM t
ORDER BY attname, sequencename';
seqTmp := NULL;
detail := detail || clock_timestamp()::timestamptz(3) || ': start to migrate sequence to temp table.' || E'\n';
OPEN myCur FOR fetchSeqStr USING reloid;
FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
WHILE myCur%FOUND LOOP
EXECUTE IMMEDIATE fetchAclStr INTO srcAcl USING IN seqoid;
-- 创建跟原表schema和owner相同的中间表,供sequence的bind信息和 defalut表达式信息迁移使用
IF seqTmp IS NULL THEN
seqTmp := pg_current_sessionid();
queryStr := 'CREATE TABLE ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || '(LIKE ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || E' EXCLUDING ALL EXCLUDING DEFAULTS INCLUDING RELOPTIONS);\n ';
queryStr := queryStr || 'ALTER TABLE ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ' OWNER TO ' || quote_ident(seqOwner) || E';\n ' ;
detail := detail || clock_timestamp()::timestamptz(3) || ': create temp table(' || quote_ident(seqTmp) || ') for migrate sequence('|| quote_ident(seqName) || E') -- \n ' || queryStr;
EXECUTE IMMEDIATE queryStr;
END IF;
IF srcAcl IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info --\n ' || srcAcl || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info IS NULL\n';
END IF;
-- 把 sequence 的 bind 信息和 defalut 表达式信息迁移到中间表上
queryStr := NULL;
IF trim(seqExpr) IS NOT NULL THEN
queryStr := 'ALTER TABLE '|| quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ' ALTER COLUMN ' || quote_ident(seqCol) || ' SET DEFAULT ' || seqExpr || E';\n ' ;
END IF;
queryStr := queryStr || 'ALTER SEQUENCE '|| quote_ident(schemaname) || '.' || quote_ident(seqName) || ' OWNED BY ' || quote_ident(schemaname) || '.' || quote_ident(seqTmp) || '.' || quote_ident(seqCol) || ';' ;
detail := detail || clock_timestamp()::timestamptz(3) || ': gen info for migrate sequence('|| quote_ident(seqName) || ') to temp table('|| quote_ident(seqTmp) || E') -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
END LOOP;
CLOSE myCur;
-- 删除原表
queryStr := 'DROP TABLE '|| quote_ident(schemaname) || '.' || quote_ident(tablename) || ';';
detail := detail || clock_timestamp()::timestamptz(3) || E': start to drop source table -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
-- 把中间表的schema修改为原表schema
queryStr := 'ALTER TABLE dfm.' || quote_ident(tablename) || ' SET SCHEMA ' || quote_ident(schemaname) || ';';
detail := detail || clock_timestamp()::timestamptz(3) || E': start to rename temp table schema --\n '|| queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
-- 把中间表的oid
IF seqTmp IS NOT NULL THEN
-- reloid 刷新为sequence中间表的oid
SELECT c.oid INTO reloid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = schemaname AND c.relname = seqTmp;
-- 把sequence迁移到目标表
OPEN myCur FOR fetchSeqStr USING reloid;
FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
WHILE myCur%FOUND LOOP
-- 把 sequence 的 bind 信息迁移到中间表上
queryStr := 'ALTER SEQUENCE '|| quote_ident(schemaname) || '.' || quote_ident(seqName) || ' OWNED BY ' || quote_ident(schemaname) || '.' || quote_ident(tablename) || '.' || quote_ident(seqCol);
detail := detail || clock_timestamp()::timestamptz(3) || ': start to migrate sequence('|| quote_ident(seqName) || E') to target table -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
-- 保存sequence的权限信息,供后续校验
EXECUTE IMMEDIATE fetchAclStr INTO srcAcl USING IN seqoid;
IF srcAcl IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info --\n ' || srcAcl || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || ': private sequence(' || quote_ident(seqName) || E')''s acl info IS NULL\n';
END IF;
FETCH myCur INTO seqCol, seqName, seqColType, seqOwner, seqExpr, seqoid;
END LOOP;
-- 删除转移sequence信息的中间表
queryStr := 'DROP TABLE '|| quote_ident(schemaname) || '.' || quote_ident(seqTmp) || ';' ;
detail := detail || clock_timestamp()::timestamptz(3) || ': start to drop temp table(' || quote_ident(schemaname) || '.' || pg_current_sessionid() || E') -- \n ' || queryStr || E'\n';
EXECUTE IMMEDIATE queryStr;
CLOSE myCur;
END IF;
-- 视图重建
IF rebuildStr IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || E': start to rebuild view \n ' || rebuildStr || E'\n';
EXECUTE IMMEDIATE rebuildStr;
END IF;
-- reloid 刷新为最终目标表的oid
SELECT c.oid INTO reloid
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = schemaname AND c.relname = tablename;
-- 获取最终目标表的权限信息
EXECUTE IMMEDIATE fetchAclStr INTO newAcl USING IN reloid;
IF newAcl IS NOT NULL THEN
detail := detail || clock_timestamp()::timestamptz(3) || E': original table''s acl info --\n ' || srcAcl || E'\n';
ELSE
detail := detail || clock_timestamp()::timestamptz(3) || E': original table''s acl info IS NULL\n ';
END IF;
detail := detail || clock_timestamp()::timestamptz(3) || E': finish switch\n';
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), true, abort, detail, skipIdxDef;
RETURN true;
EXCEPTION WHEN OTHERS THEN
get stacked diagnostics errMsg=MESSAGE_TEXT, errCxt=PG_EXCEPTION_CONTEXT, errDetail=PG_EXCEPTION_DETAIL;
abort := 'ERROR: ' || errMsg;
IF errDetail IS NOT NULL THEN
abort := abort || E'\nDETAIL: ' || errDetail;
END IF;
IF errHint IS NOT NULL THEN
abort := abort || E'\nHINT: ' || errHint;
END IF;
IF errCxt IS NOT NULL THEN
abort := abort || E'\nCONTEXT: ' || errCxt;
END IF;
EXECUTE IMMEDIATE insertLogStr USING IN schemaname, tablename, mode, start, clock_timestamp(), false, abort, detail, skipIdxDef;
RETURN false;
END
$function$
;
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)