GaussDB(DWS) SQL进阶之SQL操作 - UPSERT

举报
acydy 发表于 2021/07/25 13:28:02 2021/07/25
【摘要】 【摘要】 GaussDB(DWS)支持的UPSERT功能,可以实时进行数据导入,在出现主键或者唯一索引冲突时执行更新或者忽略操作。对于实时数仓是一项非常重要的技术。

当前GaussDB(DWS)支持UPSERT功能。本片文章介绍UPSERT功能与基本用法。

1. 前言

前一篇博文 中介绍了一种高效数据导入方法MERGE INTO。MERGE INTO中涉及两张表,并且需要关联操作。这意味着不能一条数据做一次关联,需要等待一批数据,这样才能更加高效。其中还涉及分布式JOIN。如果对数据导入实时性要求较高,那一定不要错过GaussDB(DWS)的UPSERT功能。

2. UPSERT 概念

UPSERT : UPDATE or INSERT。可以原子性的执行如下操作:插入数据时判断是否有主键或者唯一索引冲突,如果无冲突,就进行插入,否则基于已有数据进行更新。常用于数据实时入库场景。

GaussDB(DWS)有两种功能:冲突更新或者冲突忽略:

冲突更新使用场景:业务中数据可能有重复,期望在导入数据时对数据进行实时更新。比如对同一件商某些状态状态如价格、库存等持续不断进行更新。

冲突忽略使用场景:有时可能由于网络不稳定等问题,上层业务可能会出现重复的导入数据。如果在数据库已经插入数据但是业务层以为网络问题收到报错时,业务层需要额外逻辑判定数据是否已经导入。这样降低数据导入性能,并且需要业务层参与。引入冲突忽略功能后,业务层可以无感知的进行数据导入。

3. UPSERT 语法

Command:     INSERT
Description: create new rows in a table
Syntax:
[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT [ IGNORE | OVERWRITE ] INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    { DEFAULT VALUES | VALUES {( { expression | DEFAULT } [, ...] ) }[, ...] | query }
    [ ON DUPLICATE KEY duplicate_action | ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING {* | {output_expression [ [ AS ] output_name ] }[, ...]} ];

where duplicate_action can be:

    UPDATE { column_name = { expression | DEFAULT } |
             ( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] )
           } [, ...]

and conflict_target can be one of:

    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_name

and conflict_action is one of:

    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] )
                  } [, ...]
              WHERE [condition]

GaussDB(DWS)支持两种风格UPSERT语法。

冲突更新 冲突忽略
不指定索引 INSERT INTO ON DUPLICATE KEY UPDATE INSERT IGNORE
可以指定列名或者约束名用于推断唯一约束 INSERT INTO ON CONFLICT(…) DO UPDATE SET INSERT INTO ON CONFLICT(…) DO NOTHING

执行UPSERT前置条件:插入的目标表必须有主键或者唯一约束。
因为使用UPSERT UPDATE功能实际是期望对重复数据进行更新。如果没有主键或者唯一约束,执行INSERT即可。

3.1 不指定索引

会在所有主键或唯一索引上检查冲突,有冲突就会忽略或者更新。
语法介绍:

  • IGNORE

    用于主键或者唯一约束冲突时忽略数据。

  • ON DUPLICATE KEY

    用于主键或者唯一约束冲突时更新冲突的数据。

    duplicate_action指定更新列和更新的数据。

3.2 指定索引

会从ON CONFLICT子句中指定列名、包含列名的表达式或者约束名上推断主键或者唯一索引。

  • ON CONFLICT

    用于主键或者唯一约束冲突时忽略或者更新冲突的数据。

    • conflict_target用于指定列名index_column_name 、包含多个列名的表达式index_expression 或者约束名字constraint_name。

    • ON CONFLICT ON CONSTRAINT 后面指定主键或者唯一约束名。

    • ON CONFLICT(…) 括号里支持列名,用于推断在该列上的主键或者唯一约束。

      作用是通过指定列名或者约束名推断主键或者唯一索引。其中index_column_name和index_expression遵循CREATE INDEX的索引列格式。列名可以是单一或者多个列名,或者由列名组成的表达式。

      所有恰好包含指定列/表达式的主键或者唯一索引(不考虑顺序)都会被推断为满足要求的索引。

    • 此种语法的冲突忽略策略可以不指定conflict_target。即 INSERT INTO ON CONFLICT DO NOTHING。

  • conflict_action

    指定主键或者唯一约束冲突时执行的策略。有两种:

    • DO NOTHING 冲突忽略。
    • DO UPDATE SET冲突更新。 后面指定更新列和更新的数据。

3.3 UPDATE子句

  • VALUES(colname)

    VALUES(…) :引用插入的values colname列的值。里面要求是列名、复合类型的某一列。

    UPDATE c = VALUES©,含义:如果插入数据与表中已有数据有冲突,将c列更新成INSERT INTO VALUES指定的c列值。

  • EXCLUDED.colname

    表示因冲突而本该排除的数据行,与VALUES(colname) 含义相同。

    UPDATE子句中VALUES(colname)用法不支持外层嵌套函数,即不支持类似sqrt(VALUES(colname))用法。如需支持,使用EXCLUDED.colname语法。

  • WHERE子句

    用于在数据冲突时,判断是否满足指定条件。如果满足,则更新冲突数据。否则忽略。只有INSERT INTO ON CONFLICT(…) DO UPDATE SET 支持WHERE语法。

4. 实战应用

先准备好表和数据。表dst主键是a列。

gaussdb=# CREATE TABLE dst(a int primary key, b int, c int, constraint primary_dst primary key(a)) DISTRIBUTE BY HASH(a);
gaussdb=# INSERT INTO dst VALUES(1,1,1);
gaussdb=# CREATE TABLE src(a int, b int, c int) DISTRIBUTE BY HASH(a);
gaussdb=# INSERT INTO src VALUES(1,1,1),(2,2,2),(3,3,3);

4.1 不指定索引

4.1.1 冲突更新

插入两条数据,一条与表中数据有主键冲突,另一条没有。

从插入结果可以看到数据(1,1,1)被更新成(1,1,3)。无冲突数据(2,3,4)插入。下面例子含义是(1,2,3)有冲突,把冲突那行的c列值改成3。

gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON DUPLICATE KEY UPDATE c = VALUES(c);
INSERT 0 2
gaussdb=# SELECT * FROM dst ORDER BY 1;
 a | b | c
---+---+---
 1 | 1 | 3
 2 | 3 | 4
(2 rows)

4.1.2 冲突忽略

接着执行冲突忽略语句。 (1,2,3)由于冲突没有插入,忽略了。(3,4,5)没有冲突,进行插入。

gaussdb=# INSERT IGNORE INTO dst VALUES(1,2,3),(3,4,5);
INSERT 0 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
 a | b | c
---+---+---
 1 | 1 | 3
 2 | 3 | 4
 3 | 4 | 5
(3 rows)

4.2 指定索引

4.2.1 指定列名,冲突更新

一条数据冲突进行更新,一条数据无冲突进行插入。

gaussdb=# TRUNCATE dst;
gaussdb=# INSERT INTO dst VALUES(1,1,1);
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b;
INSERT 0 2
gaussdb=# SELECT * FROM dst;
 a | b | c
---+---+---
 1 | 2 | 1
 2 | 3 | 4
(2 rows)

4.2.2 指定约束名

两条数据在约束primary_dst上都有冲突,更新b列。

gaussdb=# INSERT INTO dst VALUES(1,3,3),(2,4,4) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;
INSERT 0 2
gaussdb=# SELECT * FROM dst;
 a | b | c
---+---+---
 1 | 3 | 1
 2 | 4 | 4
(2 rows)

4.2.3 唯一索引推断

指定列名为c,但是在c列上无主键或者唯一约束,报错。

gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(c) DO UPDATE SET b = EXCLUDED.b;
ERROR:  There is no unique constraint matching the ON CONFLICT specification
gaussdb=# INSERT INTO dst VALUES(1,2,3),(2,3,4) ON CONFLICT(a,c) DO UPDATE SET b = EXCLUDED.b;
ERROR:  There is no unique constraint matching the ON CONFLICT specification

4.2.3 WHERE 条件

WHERE C > 3 只有在原表c列的值大于3时才进行更新。(2,4,4)满足条件被更新。

gaussdb=# INSERT INTO dst VALUES(1,10,3),(2,11,2) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b WHERE c > 3;
INSERT 0 1
gaussdb=# SELECT * FROM dst;
 a | b  | c
---+----+---
 1 |  3 | 1
 2 | 11 | 4
(2 rows)

4.3 INSERT INTO SELECT

支持从源表中导入数据。

两条冲突数据更新b、c列,一条无冲突数据进行插入。

gaussdb=# SELECT * FROM src;
 a | b | c
---+---+---
 3 | 3 | 3
 1 | 1 | 1
 2 | 2 | 2
(3 rows)
gaussdb=# INSERT INTO dst SELECT * FROM src ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 4, c = EXCLUDED.c + 1;
INSERT 0 3
gaussdb=# SELECT * FROM dst ORDER BY 1;
 a | b | c
---+---+---
 1 | 5 | 2  -- 更新b、c列
 2 | 6 | 3  -- 更新b、c列
 3 | 3 | 3  -- 插入
(3 rows)

4.4 别名引用

建立一个名为EXCLUDED表,与UPDATE后面引用插入列方式EXCLUDED相冲突。可以使用 AS 语法指定别名避免冲突。

gaussdb=# CREATE TABLE EXCLUDED(d int unique, e int, f int);
gaussdb=# INSERT INTO EXCLUDED VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e;
ERROR:  table reference "excluded" is ambiguous
LINE 1: ...ED VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e...
                                                             ^
CONTEXT:  referenced column: e

gaussdb=# INSERT INTO EXCLUDED AS tt VALUES(1,2,3) ON CONFLICT(d) DO UPDATE SET e = EXCLUDED.e;

4.5 查看计划

显示推断主键或者唯一索引名称:

gaussdb=# EXPLAIN VERBOSE INSERT INTO dst VALUES(1,10,3),(2,11,2) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;
                                             QUERY PLAN
-----------------------------------------------------------------------------------------------------
  id |                operation                | E-rows | E-distinct | E-memory | E-width | E-costs
 ----+-----------------------------------------+--------+------------+----------+---------+---------
   1 | ->  Streaming (type: GATHER)            |      1 |            |          |      12 | 2.21
   2 |    ->  Insert on public.dst             |      4 |            |          |      12 | 2.11
   3 |       ->  Streaming(type: REDISTRIBUTE) |      4 |            | 2MB      |      12 | 0.11
   4 |          ->  Values Scan on "*VALUES*"  |      4 |            | 1MB      |      12 | 0.03

 Predicate Information (identified by plan id)
 ---------------------------------------------
   2 --Insert on public.dst
         Conflict Resolution: UPDATE
         Conflict Arbiter Indexes: primary_dst

EXPLAIN PERFORMANCE显示实际插入(Tuple Inserted) 与冲突更新(Tuple Conflicted)的数据条数。

gaussdb=# SELECT * FROM dst;
 a | b | c
---+---+---
 1 | 3 | 1
 2 | 4 | 2
 3 | 3 | 3
(3 rows)

gaussdb=# EXPLAIN PERFORMANCE INSERT INTO dst VALUES(1,10,3),(2,11,2),(5,1,1) ON CONFLICT ON CONSTRAINT primary_dst DO UPDATE SET b = EXCLUDED.b;

                                                                       QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------
  id |                operation                |     A-time      | A-rows | E-rows | E-distinct |  Peak Memory   | E-memory | A-width | E-width | E-costs
 ----+-----------------------------------------+-----------------+--------+--------+------------+----------------+----------+---------+---------+---------
   1 | ->  Streaming (type: GATHER)            | 49.832          |      0 |      1 |            | 68KB           |          |         |      12 | 2.26
   2 |    ->  Insert on public.dst             | [5.391, 43.084] |      3 |      6 |            | [175KB, 176KB] |          |         |      12 | 2.17
   3 |       ->  Streaming(type: REDISTRIBUTE) | [5.387, 42.842] |      3 |      6 |            | [27KB, 28KB]   | 2MB      |         |      12 | 0.16
   4 |          ->  Values Scan on "*VALUES*"  | [0.013, 0.013]  |      3 |      6 |            | [6KB, 6KB]     | 1MB      |         |      12 | 0.04

 Predicate Information (identified by plan id)
 ----------------------------------------------
   2 --Insert on public.dst
          Conflict Resolution: UPDATE
          Conflict Arbiter Indexes: primary_dst
          Tuple Inserted: 1
          Tuple Conflicted: 2

5. 并发更新

在UPSERT概念中UPSERT可以原子性的执行如下:插入数据时判断是否有主键或者唯一索引冲突,如果无冲突,就进行插入,否则基于已有数据进行更新。在实现中可以保证UPSERT支持并发更新。有两个基本场景:

  • 两个线程同时执行UPSERT,且插入的数据在目标表中不存在并且互相冲突。

    两个线程各自都判断没有冲突,即将进行插入。但是只有一个线程会先拿到锁并插入。另一个线程在插入索引时发现索引冲突,那本次INSERT数据不能提交,需要在已有的数据上进行更新。

  • 两个线程同时执行UPSERT,且插入的数据在目标表已存在并且互相冲突。

    两个线程各自都判断有冲突,即将进行更新。一个线程拿到锁更新后,另一个需要在新数据上再次更新。但是拿到的老数据。所以需要再次扫描拿到最新的数据。

上面两个场景的简单描述相信大家可以理解UPSERT并发实现的原理了。

6. UPSERT约束

这里汇总下UPSERT的使用约束,部分已经在上文描述。

语法

  • 不支持与WITH子句同时使用。
  • 不支持与INSERT OVERWRITE同时使用。
  • UPDATE子句中VALUES(colname)用法不支持外层嵌套函数,即不支持类似sqrt(VALUES(colname))用法。如需支持,使用EXCLUDED.colname语法。
  • INSERT INTO ON CONFLICT(…) DO UPDATE必须有conflict_target。即必须指定列或者约束名。

功能

  • 不能更新分布列

  • 不支持对包含触发器(触发事件为INSERT或UPDATE)的目标表执行UPSERT语句。

  • 执行INSERT UPDATE语句时目标表必须有主键或者唯一索引。

  • UPDATE子句、UPDATE WHERE过滤条件、唯一索引推断处的条件表达式不支持不下推函数或表达式,否则报错。

  • UPDATE子句、UPDATE WHERE过滤条件不支持子查询,否则报错。

  • 当在列存表上执行UPSERT时,建议开启DELTA表,若DELTA表关闭会导致并发性能较差和空间膨胀的问题。

  • 列存复制表不支持通过UPSERT语句执行冲突更新操作。

  • 通过INSERT INTO SELECT语句执行UPSERT的更新操作时,需要注意SELECT语句的查询结果顺序。在分布式环境中未使用ORDER BY语句时每次执行相同的SELECT语句返回结果顺序可能不一样,这会导致UPSERT语句的执行结果不符合预期。

总结

GaussDB(DWS)提供的数据导入的功能UPSERT,对于实时数仓是一项非常关键的功能。在主键或者唯一索引冲突时能够执行更新或者忽略操作。简化业务层导入数据的逻辑。

想了解GaussDB(DWS)更多信息,欢迎微信搜索“GaussDB DWS”关注微信公众号,和您分享最新最全的PB级数仓黑科技,后台还可获取众多学习资料哦~

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。