Spark SQL三种join
Spark SQL三种join
Mysql 的 join怎么实现的?
对于Spark来说有3中Join的实现,每种 Join对应着不同的应用场景:
- Broadcast Hash Join:适合一张较小的表和一张大表进行join
- Shuffle Hash Join :适合一张小表和一张大表进行join,或者是两张小表之间的join
- Sort Merge Join:适合两张较大的表之间进行 join
-
Hash Join:小表会作为Build Table,大表作为Probe Table,依次读取Build Table的数据,对于每一行数据根据join key进行 hash,hash到对应的Bucket,生成 hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存;再依次扫描 Probe Table ( order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件,如果匹配成功就可以将两者 join在一起(为什么 Build Table选择小表?因为构建的 HashTable最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景)。
-
Broadcast Hash Join:当有限维度表和事实表进行Join操作时,为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表中的全部数据,一定程度上牺牲了空间,换取 shuffle操作大量的耗时。
👉(1)broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的 p2p思路;
👉(2) hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探; -
Shuffle Hash Join:利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行 Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗。
数据倾斜的产生和解决办法?
数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。
在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。
避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。
如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
- 点赞
- 收藏
- 关注作者
评论(0)