浪院長 浪尖聊大數據 hashjoinjoin是作為業(yè)務開發(fā)繞不開的SQL話題,無論是傳統(tǒng)的數據庫join,還是大數據里的join。 Spark BroadCastHashJoin翻過源碼之后你就會發(fā)現,Spark 1.6之前實現BroadCastHashJoin就是利用的Java的HashMap來實現的。大家感興趣可以去Spark 1.6的源碼里搜索BroadCastHashJoin,HashedRelation,探查一下源碼。 ShuffledHashJoinBroadCastHashJoin適合的是大表和小表的join策略,將整個小表廣播。很多時候,參與join的表本身都不適合廣播,也不適合放入內存,但是按照一定分區(qū)拆開后就可以放入內存構建為HashRelation。這個就是分治思想了,將兩張表按照相同的hash分區(qū)器及分區(qū)數進行,對join條件進行分區(qū),那么需要join的key就會落入相同的分區(qū)里,然后就可以利用本地join的策略來進行join了。
SortMergeJoin上面兩張情況都是小表本身適合放入內存或者中表經過分區(qū)治理后適合放入內存,來完成本地化hashedjoin,小表數據放在內存中,很奢侈的,所以經常會遇到join,就oom。小表,中表都是依據內存說的,你內存無限,那是最好。
Spark SQL的join方式選擇假如用戶使用Spark SQL的適合用了hints,那Spark會先采用Hints提示的join方式。 -- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 來表達 broadcast hint SELECT /* BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key ShuffledHashJoin,hints的sql寫法如下: -- 僅支持 SHUFFLE_HASH 來表達 ShuffledHashJoin hint SELECT /* SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key SortMergeJoin,hints的SQL寫法如下: -- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 來表達 SortMergeJoin hintSELECT /* MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key 假設用戶沒有使用hints,默認順序是: plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold 參數:spark.sql.autoBroadcastJoinThreshold 假設兩張表都滿足廣播需求,選最小的。 spark.sql.join.preferSortMergeJoin=true, 還有兩個條件,根據統(tǒng)計信息,表的bytes是廣播的閾值*總并行度: plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions 并且該表bytes乘以3要小于等于另一張表的bytes: a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes 那么這張表就適合分治之后,作為每個分區(qū)構建本地hashtable的表。 def createSortMergeJoin() = { if (RowOrdering.isOrderable(leftKeys)) { Some(Seq(joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)))) } else { None } } 這段代碼是在SparkStrageties類,JoinSelection單例類內部。 createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint))) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) 當然,這三種join都是等值join,之前的版本Spark僅僅支持等值join但是不支持非等值join,常見的業(yè)務開發(fā)中確實存在非等值join的情況,spark目前支持非等值join的實現有以下兩種,由于實現問題,確實很容易oom。 Broadcast nested loop joinShuffle-and-replicate nested loop join。 |
|