GaussDB(DWS) SQL进阶之SQL操作 - UPSERT
当前GaussDB(DWS)支持UPSERT功能。本片文章介绍UPSERT功能与基本用法。
1. 前言
在前一篇博文 中介绍了一种高效数据导入方法MERGE INTO。MERGE INTO中涉及两张表,并且需要关联操作。这意味着不能一条数据做一次关联,需要等待一批数据,这样才能更加高效。其中还涉及分布式JOIN。如果对数据导入实时性要求较高,那一定不要错过GaussDB(DWS)的UPSERT功能。
2. UPSERT 概念
UPSERT : UPDATE or INSERT。可以原子性的执行如下操作:插入数据时判断是否有主键或者唯一索引冲突,如果无冲突,就进行插入,否则基于已有数据进行更新。常用于数据实时入库场景。
GaussDB(DWS)有两种功能:冲突更新或者冲突忽略:
冲突更新使用场景:业务中数据可能有重复,期望在导入数据时对数据进行实时更新。比如对同一件商某些状态状态如价格、库存等持续不断进行更新。
冲突忽略使用场景:有时可能由于网络不稳定等问题,上层业务可能会出现重复的导入数据。如果在数据库已经插入数据但是业务层以为网络问题收到报错时,业务层需要额外逻辑判定数据是否已经导入。这样降低数据导入性能,并且需要业务层参与。引入冲突忽略功能后,业务层可以无感知的进行数据导入。
3. UPSERT 语法
Command: INSERT
Description: create new rows in a table
Syntax:
[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT [ IGNORE | OVERWRITE ] INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
{ DEFAULT VALUES | VALUES {( { expression | DEFAULT } [, ...] ) }[, ...] | query }
[ ON DUPLICATE KEY duplicate_action | ON CONFLICT [ conflict_target ] conflict_action ]
[ RETURNING {* | {output_expression [ [ AS ] output_name ] }[, ...]} ];
where duplicate_action can be:
UPDATE { column_name = { expression | DEFAULT } |
( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] )
} [, ...]
and conflict_target can be one of:
( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
ON CONSTRAINT constraint_name
and conflict_action is one of:
DO NOTHING
DO UPDATE SET { column_name = { expression | DEFAULT } |
( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] )
} [, ...]
WHERE [condition]
GaussDB(DWS)支持两种风格UPSERT语法。
冲突更新 | 冲突忽略 | |
---|---|---|
不指定索引 | INSERT INTO ON DUPLICATE KEY UPDATE | INSERT IGNORE |
可以指定列名或者约束名用于推断唯一约束 | INSERT INTO ON CONFLICT(…) DO UPDATE SET | INSERT INTO ON CONFLICT(…) DO NOTHING |
执行UPSERT前置条件:插入的目标表必须有主键或者唯一约束。
因为使用UPSERT UPDATE功能实际是期望对重复数据进行更新。如果没有主键或者唯一约束,执行INSERT即可。
3.1 不指定索引
会在所有主键或唯一索引上检查冲突,有冲突就会忽略或者更新。
语法介绍:
-
IGNORE
用于主键或者唯一约束冲突时忽略数据。
-
ON DUPLICATE KEY
用于主键或者唯一约束冲突时更新冲突的数据。
duplicate_action指定更新列和更新的数据。
3.2 指定索引
会从ON CONFLICT子句中指定列名、包含列名的表达式或者约束名上推断主键或者唯一索引。
-
ON CONFLICT
用于主键或者唯一约束冲突时忽略或者更新冲突的数据。
-
conflict_target用于指定列名index_column_name 、包含多个列名的表达式index_expression 或者约束名字constraint_name。
-
ON CONFLICT ON CONSTRAINT 后面指定主键或者唯一约束名。
-
ON CONFLICT(…) 括号里支持列名,用于推断在该列上的主键或者唯一约束。
作用是通过指定列名或者约束名推断主键或者唯一索引。其中index_column_name和index_expression遵循CREATE INDEX的索引列格式。列名可以是单一或者多个列名,或者由列名组成的表达式。
所有恰好包含指定列/表达式的主键或者唯一索引(不考虑顺序)都会被推断为满足要求的索引。
-
此种语法的冲突忽略策略可以不指定conflict_target。即 INSERT INTO ON CONFLICT DO NOTHING。
-
-
conflict_action
指定主键或者唯一约束冲突时执行的策略。有两种:
- DO NOTHING 冲突忽略。
- DO UPDATE SET冲突更新。 后面指定更新列和更新的数据。
3.3 UPDATE子句
-
VALUES(colname)
VALUES(…) :引用插入的values colname列的值。里面要求是列名、复合类型的某一列。
UPDATE c = VALUES©,含义:如果插入数据与表中已有数据有冲突,将c列更新成INSERT INTO VALUES指定的c列值。
-
EXCLUDED.colname
表示因冲突而本该排除的数据行,与VALUES(colname) 含义相同。
UPDATE子句中VALUES(colname)用法不支持外层嵌套函数,即不支持类似sqrt(VALUES(colname))用法。如需支持,使用EXCLUDED.colname语法。
-
WHERE子句
用于在数据冲突时,判断是否满足指定条件。如果满足,则更新冲突数据。否则忽略。只有INSERT INTO ON CONFLICT(…) DO UPDATE SET 支持WHERE语法。
4. 实战应用
先准备好表和数据。表dst主键是a列。
gaussdb=# CREATE TABLE dst(a int primary key, b int, c int, constraint primary_dst primary key(a)) DISTRIBUTE BY HASH(a);
gaussdb=# INSERT INTO dst VALUES(1,1,1);
gaussdb=# CREATE TABLE src(a int, b int, c int) DISTRIBUTE BY HASH(a);
gaussdb=# INSERT INTO src VALUES(1,1,1),(2,2,2),(3,3,3);
4.1 不指定索引
4.1.1 冲突更新
插入两条数据,一条与表中数据有主键冲突,另一条没有。
从插入结果可以看到数据(1,1,1)被更新成(1,1,3)。无冲突数据(2,3,4)插入。下面例子含义是(1,2,3)有冲突,把冲突那行的c列值改成3。
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON DUPLICATE KEY UPDATE c = VALUES(c);
INSERT 0 2
gaussdb=# SELECT * FROM dst ORDER BY 1;
a | b | c
---+---+---
1 | 1 | 3
2 | 3 | 4
(2 rows)
4.1.2 冲突忽略
接着执行冲突忽略语句。 (1,2,3)由于冲突没有插入,忽略了。(3,4,5)没有冲突,进行插入。
gaussdb=# INSERT IGNORE INTO dst VALUES(1,2,3),(3,4,5);
INSERT 0 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
a | b | c
---+---+---
1 | 1 | 3
2 | 3 | 4
3 | 4 | 5
(3 rows)
4.2 指定索引
4.2.1 指定列名,冲突更新
一条数据冲突进行更新,一条数据无冲突进行插入。
gaussdb=# TRUNCATE dst;
gaussdb=# INSERT INTO dst VALUES(1,1,1);
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b;
INSERT 0 2
gaussdb=# SELECT * FROM dst;
a | b | c
---+---+---
1 | 2 | 1
2 | 3 | 4
(2 rows)
4.2.2 指定约束名
两条数据在约束primary_dst上都有冲突,更新b列。
gaussdb=# INSERT INTO dst VALUES(1,3,3),(2,4,4) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;
INSERT 0 2
gaussdb=# SELECT * FROM dst;
a | b | c
---+---+---
1 | 3 | 1
2 | 4 | 4
(2 rows)
4.2.3 唯一索引推断
指定列名为c,但是在c列上无主键或者唯一约束,报错。
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(c) DO UPDATE SET b = EXCLUDED.b;
ERROR: There is no unique constraint matching the ON CONFLICT specification
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(a,c) DO UPDATE SET b = EXCLUDED.b;
ERROR: There is no unique constraint matching the ON CONFLICT specification
4.2.3 WHERE 条件
WHERE C > 3 只有在原表c列的值大于3时才进行更新。(2,4,4)满足条件被更新。
gaussdb=# INSERT INTO dst VALUES(1,10,3),(2,11,2) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b WHERE c > 3;
INSERT 0 1
gaussdb=# SELECT * FROM dst;
a | b | c
---+----+---
1 | 3 | 1
2 | 11 | 4
(2 rows)
4.3 INSERT INTO SELECT
支持从源表中导入数据。
两条冲突数据更新b、c列,一条无冲突数据进行插入。
gaussdb=# SELECT * FROM src;
a | b | c
---+---+---
3 | 3 | 3
1 | 1 | 1
2 | 2 | 2
(3 rows)
gaussdb=# INSERT INTO dst SELECT * FROM src ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 4, c = EXCLUDED.c + 1;
INSERT 0 3
gaussdb=# SELECT * FROM dst ORDER BY 1;
a | b | c
---+---+---
1 | 5 | 2 -- 更新b、c列
2 | 6 | 3 -- 更新b、c列
3 | 3 | 3 -- 插入
(3 rows)
4.4 别名引用
建立一个名为EXCLUDED表,与UPDATE后面引用插入列方式EXCLUDED相冲突。可以使用 AS 语法指定别名避免冲突。
gaussdb=# CREATE TABLE EXCLUDED(d int unique, e int, f int);
gaussdb=# INSERT INTO EXCLUDED VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e;
ERROR: table reference "excluded" is ambiguous
LINE 1: ...ED VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e...
^
CONTEXT: referenced column: e
gaussdb=# INSERT INTO EXCLUDED AS tt VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e;
4.5 查看计划
显示推断主键或者唯一索引名称:
gaussdb=# EXPLAIN VERBOSE INSERT INTO dst VALUES(1,10,3),(2,11,2) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;
QUERY PLAN
-----------------------------------------------------------------------------------------------------
id | operation | E-rows | E-distinct | E-memory | E-width | E-costs
----+-----------------------------------------+--------+------------+----------+---------+---------
1 | -> Streaming (type: GATHER) | 1 | | | 12 | 2.21
2 | -> Insert on public.dst | 4 | | | 12 | 2.11
3 | -> Streaming(type: REDISTRIBUTE) | 4 | | 2MB | 12 | 0.11
4 | -> Values Scan on "*VALUES*" | 4 | | 1MB | 12 | 0.03
Predicate Information (identified by plan id)
---------------------------------------------
2 --Insert on public.dst
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: primary_dst
EXPLAIN PERFORMANCE显示实际插入(Tuple Inserted) 与冲突更新(Tuple Conflicted)的数据条数。
gaussdb=# SELECT * FROM dst;
a | b | c
---+---+---
1 | 3 | 1
2 | 4 | 2
3 | 3 | 3
(3 rows)
gaussdb=# EXPLAIN PERFORMANCE INSERT INTO dst VALUES(1,10,3),(2,11,2),(5,1,1) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------
id | operation | A-time | A-rows | E-rows | E-distinct | Peak Memory | E-memory | A-width | E-width | E-costs
----+-----------------------------------------+-----------------+--------+--------+------------+----------------+----------+---------+---------+---------
1 | -> Streaming (type: GATHER) | 49.832 | 0 | 1 | | 68KB | | | 12 | 2.26
2 | -> Insert on public.dst | [5.391, 43.084] | 3 | 6 | | [175KB, 176KB] | | | 12 | 2.17
3 | -> Streaming(type: REDISTRIBUTE) | [5.387, 42.842] | 3 | 6 | | [27KB, 28KB] | 2MB | | 12 | 0.16
4 | -> Values Scan on "*VALUES*" | [0.013, 0.013] | 3 | 6 | | [6KB, 6KB] | 1MB | | 12 | 0.04
Predicate Information (identified by plan id)
----------------------------------------------
2 --Insert on public.dst
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: primary_dst
Tuple Inserted: 1
Tuple Conflicted: 2
5. 并发更新
在UPSERT概念中UPSERT可以原子性的执行如下:插入数据时判断是否有主键或者唯一索引冲突,如果无冲突,就进行插入,否则基于已有数据进行更新。在实现中可以保证UPSERT支持并发更新。有两个基本场景:
-
两个线程同时执行UPSERT,且插入的数据在目标表中不存在并且互相冲突。
两个线程各自都判断没有冲突,即将进行插入。但是只有一个线程会先拿到锁并插入。另一个线程在插入索引时发现索引冲突,那本次INSERT数据不能提交,需要在已有的数据上进行更新。
-
两个线程同时执行UPSERT,且插入的数据在目标表已存在并且互相冲突。
两个线程各自都判断有冲突,即将进行更新。一个线程拿到锁更新后,另一个需要在新数据上再次更新。但是拿到的老数据。所以需要再次扫描拿到最新的数据。
上面两个场景的简单描述相信大家可以理解UPSERT并发实现的原理了。
6. UPSERT约束
这里汇总下UPSERT的使用约束,部分已经在上文描述。
语法
- 不支持与WITH子句同时使用。
- 不支持与INSERT OVERWRITE同时使用。
- UPDATE子句中VALUES(colname)用法不支持外层嵌套函数,即不支持类似sqrt(VALUES(colname))用法。如需支持,使用EXCLUDED.colname语法。
- INSERT INTO ON CONFLICT(…) DO UPDATE必须有conflict_target。即必须指定列或者约束名。
功能
-
不能更新分布列
-
不支持对包含触发器(触发事件为INSERT或UPDATE)的目标表执行UPSERT语句。
-
执行INSERT UPDATE语句时目标表必须有主键或者唯一索引。
-
UPDATE子句、UPDATE WHERE过滤条件、唯一索引推断处的条件表达式不支持不下推函数或表达式,否则报错。
-
UPDATE子句、UPDATE WHERE过滤条件不支持子查询,否则报错。
-
当在列存表上执行UPSERT时,建议开启DELTA表,若DELTA表关闭会导致并发性能较差和空间膨胀的问题。
-
列存复制表不支持通过UPSERT语句执行冲突更新操作。
-
通过INSERT INTO SELECT语句执行UPSERT的更新操作时,需要注意SELECT语句的查询结果顺序。在分布式环境中未使用ORDER BY语句时每次执行相同的SELECT语句返回结果顺序可能不一样,这会导致UPSERT语句的执行结果不符合预期。
总结
GaussDB(DWS)提供的数据导入的功能UPSERT,对于实时数仓是一项非常关键的功能。在主键或者唯一索引冲突时能够执行更新或者忽略操作。简化业务层导入数据的逻辑。
想了解GaussDB(DWS)更多信息,欢迎微信搜索“GaussDB DWS”关注微信公众号,和您分享最新最全的PB级数仓黑科技,后台还可获取众多学习资料哦~
- 点赞
- 收藏
- 关注作者
评论(0)