用Carbondata做CDC
Carbon CDC的使用方式
Carbon CDC是通过其Merge api来实现的,语法如下所示,target表示需要被更新的目标表,source表示一个批次的更新数据
targetDS.merge(sourceDS, <condition>) .whenMatched(<condition>) .updateExpr(updateMap) .insertExpr(insertMap_u) .whenNotMatched(<condition>) .insertExpr(insertMap) .whenNotMatchedAndExistsOnlyOnTarget(<condition>) .delete() .execute()
MERGE API操作语义
以下是merge
API操作的详细说明。
merge
将根据条件合并数据集。whenMatched
当源行根据匹配条件与目标表行匹配时,将执行子句。这些子句具有以下语义。whenMatched
子句最多可以具有一个updateExpr和一个delete操作。updateExpr
合并中的操作仅更新匹配的目标行的指定列。该delete
操作将删除匹配的行。如果有两个
whenMatched
子句,则按指定的顺序对其进行求值。第一个子句必须具有子句条件(否则第二个子句将永远不会执行)。如果两个
whenMatched
子句都具有条件,并且对于匹配的源-目标行对都不满足,那么匹配的目标行将保持不变。whenNotMatched
当源行根据匹配条件与任何目标行都不匹配时,将执行子句。whenNotMatched
子句只能有insertExpr
动作。根据指定的列和相应的表达式生成新行。用户不需要指定目标表中的所有列。对于未指定的目标列,将插入NULL。whenNotMatchedAndExistsOnlyOnTarget
当row与源不匹配并且仅存在于target中时,执行子句。该子句只能具有删除操作。注意:尚不支持用于合并的SQL语法。
举一个最简单的例子
Carbon CDC的原理
CDC是通过Merge实现的,而Merge操作是通过CarbonMergeDataSetCommand来实现的,其内部通过Join的方式获取delta数据,source表全体参与Join,而Target表只有tupleId和参与匹配的列参与Join,这样可以避免大量数据IO, 除此外还增加了用于标记判断IUD状态的动态生成列;
join所生成的datafram迭代做两个操作,一是根据条件生成的状态来判断一条数据是删除还是更新或者是插入,如果是更新,则本质是先删除旧的数据再增加一条新的数据;
删除的数据称为deltaRDD,是将update的旧数据和delete的旧数据的tupleId提取出来,走carbon的delete流程来标记block/pageid/id等信息写入delta文件的方式表征删除;
insert的数据或update所新增的数据取自source join出来的结果,该RDD会通过carbon的insert流程写入到新的carbondata文件中,insert流程会产生新的segment;
在进行merge操作的时候,结束时会调用horizontal compaction,进行delta文件合并;
CDC过程中由于产生新的segment,所以可以通过carbon的minor compaction或major compaction进行segment数据合并,以更好的进行数据查询。
- 点赞
- 收藏
- 关注作者
评论(0)