乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Spark join種類(>3種)及join選擇依據

       印度阿三17 2021-03-15

      浪院長 浪尖聊大數據

      hashjoin


      join是作為業(yè)務開發(fā)繞不開的SQL話題,無論是傳統(tǒng)的數據庫join,還是大數據里的join。
      做過Spark/flink流處理的應該都用過一種流表和維表的join,維表對于Spark來說可以是driver端獲取后廣播到每個Executor,然后在executor端執(zhí)行流表task的時候join,其實大多數是個hashmap,而很多時候這個維表比較大會存儲于redis/hbase。Flink進行維表join可以用的方式比較多了,比如直接open方法里從外部加載的靜態(tài)hashmap,這種就無法更新,因為Flink不像Spark可以每個批次或者若干批次加載一次維表。也可以使用LRU 異步IO 外部存儲來實現join,這樣就實現了對外部更新的感知。甚至也可以使用Flink的廣播功能實現join操作。
      上面所說的就是比較常見的hashjoin的簡單表達,將維表通過join的條件key構建為一個hashtable,就拿java 8的HashMap來說吧,就是一個數組 鏈表(鏈表過長會變?yōu)榧t黑樹),數組下標就是key,數組存儲的是value的指針。
      Spark join種類(>3種)及join選擇依據
      join的時候主表通過join條件構建key去,hashmap里查找。

      Spark BroadCastHashJoin


      翻過源碼之后你就會發(fā)現,Spark 1.6之前實現BroadCastHashJoin就是利用的Java的HashMap來實現的。大家感興趣可以去Spark 1.6的源碼里搜索BroadCastHashJoin,HashedRelation,探查一下源碼。
      具體實現就是driver端根據表的統(tǒng)計信息,當發(fā)現一張小表達到廣播條件的時候,就會將小表collect到driver端,然后構建一個HashedRelation,然后廣播。
      Spark join種類(>3種)及join選擇依據
      其實,就跟我們在使用Spark Streaming的時候廣播hashmap一樣。
      重點強調里面最大行數限制和最大bytes限制并不是我們設置的自動廣播參數限制,而是內部存儲結構的限制。
      Spark join種類(>3種)及join選擇依據
      還有在Spark后期版本主要就是使用了TaskMemoryManager而不是HashMap進行了背書。

      ShuffledHashJoin


      BroadCastHashJoin適合的是大表和小表的join策略,將整個小表廣播。很多時候,參與join的表本身都不適合廣播,也不適合放入內存,但是按照一定分區(qū)拆開后就可以放入內存構建為HashRelation。這個就是分治思想了,將兩張表按照相同的hash分區(qū)器及分區(qū)數進行,對join條件進行分區(qū),那么需要join的key就會落入相同的分區(qū)里,然后就可以利用本地join的策略來進行join了。
      Spark join種類(>3種)及join選擇依據
      也即是ShuffledHashJoin有兩個重要步驟:

      1. join的兩張表有一張是相對小表,經過拆分后可以實現本地join。

      2. 相同的分區(qū)器及分區(qū)數,按照joinkey進行分區(qū),這樣約束后joinkey范圍就限制在相同的分區(qū)中,不依賴其他分區(qū)完成join。

      3. 對小表分區(qū)構建一個HashRelation。然后就可以完成本地hashedjoin了,參考ShuffleHashJoinExec代碼,這個如下圖:
        Spark join種類(>3種)及join選擇依據

      SortMergeJoin


      上面兩張情況都是小表本身適合放入內存或者中表經過分區(qū)治理后適合放入內存,來完成本地化hashedjoin,小表數據放在內存中,很奢侈的,所以經常會遇到join,就oom。小表,中表都是依據內存說的,你內存無限,那是最好。
      那么,大表和大表join怎么辦?這時候就可以利用SortMergeJoin來完成。
      SortMergeJoin基本過程如下:

      1. 首先采取相同的分區(qū)器及分區(qū)數對兩張表進行重分區(qū)操作,保證兩張表相同的key落到相同的分區(qū)。

      2. 對于單個分區(qū)節(jié)點兩個表的數據,分別進行按照key排序。

      3. 對排好序的兩張分區(qū)表數據執(zhí)行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊。'
        Spark join種類(>3種)及join選擇依據
        Spark 3.1以后的spark版本對sortmergejoin又進一步優(yōu)化了。

      Spark SQL的join方式選擇


      假如用戶使用Spark SQL的適合用了hints,那Spark會先采用Hints提示的join方式。
      broadcastHashJoin,hints寫法如下:

      -- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 來表達 broadcast hint
      SELECT /*  BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

      ShuffledHashJoin,hints的sql寫法如下:

      -- 僅支持 SHUFFLE_HASH 來表達 ShuffledHashJoin hint
      SELECT /*  SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key

      SortMergeJoin,hints的SQL寫法如下:

      -- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 來表達 SortMergeJoin hintSELECT /*  MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key

      假設用戶沒有使用hints,默認順序是:
      1.先判斷,假設join的表統(tǒng)計信息現實,一張表大小大于0,且小于等于用戶配置的自動廣播閾值則,采用廣播。

      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
      參數:spark.sql.autoBroadcastJoinThreshold

      假設兩張表都滿足廣播需求,選最小的。
      2.不滿足廣播就判斷是否滿足ShuffledHashJoin,首先下面參數要設置為false,默認為true。

      spark.sql.join.preferSortMergeJoin=true,

      還有兩個條件,根據統(tǒng)計信息,表的bytes是廣播的閾值*總并行度:

      plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions

      并且該表bytes乘以3要小于等于另一張表的bytes:

      a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes

      那么這張表就適合分治之后,作為每個分區(qū)構建本地hashtable的表。
      3.不滿足廣播,也不滿足ShuffledHashJoin,就判斷是否滿足SortMergeJoin。條件很簡單,那就是key要支持可排序。

      def createSortMergeJoin() = {
      if (RowOrdering.isOrderable(leftKeys)) {
          Some(Seq(joins.SortMergeJoinExec(
            leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right))))
        } else {
          None
        }
      }

      這段代碼是在SparkStrageties類,JoinSelection單例類內部。

      createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
        .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
        .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint)))
        .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
        .getOrElse(createJoinWithoutHint())

      當然,這三種join都是等值join,之前的版本Spark僅僅支持等值join但是不支持非等值join,常見的業(yè)務開發(fā)中確實存在非等值join的情況,spark目前支持非等值join的實現有以下兩種,由于實現問題,確實很容易oom。

      Broadcast nested loop joinShuffle-and-replicate nested loop join。

      Spark join種類(>3種)及join選擇依據

        本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯系方式、誘導購買等信息,謹防詐騙。如發(fā)現有害或侵權內容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多