Spark Sort-Based Shuffle內(nèi)幕徹底解密1、為什么使用Sort-Based Shuffle內(nèi)幕徹底解密 2、Sort-Based Shuffle實戰(zhàn) 3、Sort-Based Shuffle內(nèi)幕 4、Sort-Based Shuffle的不足 1、為什么使用Sort-Based Shuffle 1)Shuffle一般包含兩階段任務(wù),第一部分,產(chǎn)生Shuffle數(shù)據(jù)的階段(Map階段,額外補充,需要實現(xiàn)ShuffleManager中g(shù)etWriter來寫數(shù)據(jù)(數(shù)據(jù)可以BlockManager寫到Memory、Disk、Tachyon等,例如想非常快的Shuffle,此時可以考慮把數(shù)據(jù)寫在內(nèi)存中,但是內(nèi)存不穩(wěn)定,建議采用MEMORY_AND_DISK方式));第二部分,使用Shuffle數(shù)據(jù)的階段(Reduce階段,額外補充,需要實現(xiàn)ShuffleManager的getReader,Reader會向Driver獲取上一個Stage產(chǎn)生的Shuffle數(shù)據(jù)) 2)Spark的Job會被劃分成很多Stage: 如果只有一個Stage,則這個Job就相當于只有一個Mapper階段,當然不會產(chǎn)生Shuffle,適合于簡單的ETL; 如果不止一個Stage,則最后一個Stage就是最終的Reducer,最左側(cè)的第一個Stage就僅僅是整個Job的Mapper,中間所有的任意一個Stage是其父Stage的Reducer,且是其子Stage的Mapper 3)Spark Shuffle在最開始只支持Hash-based Shuffle:默認Mapper階段會為Reducer階段的每一個Task單獨創(chuàng)建一個文件來保存該Task中要使用的數(shù)據(jù),但是會在一些情況下(例如數(shù)據(jù)量非常大的情況)會造成大量文件(M*R,其中M代表Mapper中的所有的并行任務(wù)數(shù)量,R代表Reducer中所有的并行任務(wù)數(shù)量)的隨機磁盤I/O操作,且會形成大量的Memory消耗(極易造成OOM),這是致命的問題,因為第一不能夠處理大規(guī)模的數(shù)據(jù),第二Spark不能夠運行在大規(guī)模的分布式集群上!后來的改善方案是加入了Shuffle Consolidate機制來將Shuffle時候產(chǎn)生的文件數(shù)量減少到C*R個(C代表在Mapper端同時能夠使用的Cores的數(shù)量,R代表Reducer中所有的并行任務(wù)數(shù)量),但是此時如果Reducer端的并行數(shù)據(jù)分片過多的話,則C*R可能依舊過大,此時依舊沒有逃脫文件打開過多的厄運! Spark在引入Sort-based Shuffle(Spark1.1版本以前)以前比較適用于中小規(guī)模的大數(shù)據(jù)處理! 4)為了讓Spark在更大規(guī)模的集群上更高性能處理更大規(guī)模的數(shù)據(jù),于是就引入了Sort-based Shuffle!從此以后(Spark1.1版本開始),Spark可以勝任任意規(guī)模(包含PB級別及PB以上的級別)的大數(shù)據(jù)的處理,尤其是隨著鎢絲計劃的引入和優(yōu)化,把Spark更快速的在更大規(guī)模的集群處理更海量的數(shù)據(jù)的能力推向了一個新的巔峰! 5)Spark1.6版本支持至少三種類型Shuffle val shortShuffleMgrNames = Map( “hash” -> “org.apache.spark.shuffle.hash.HashShuffleManager”, “sort” -> “org.apache.spark.shuffle.sort.SortShuffleManager”, “tungsten-sort” -> “org.apache.spark.shuffle.sort.SortShuffleManager”) 實現(xiàn)ShuffleManager接口可以根據(jù)自己的業(yè)務(wù)實際需要最優(yōu)化的使用自定義的Shuffle實現(xiàn)。 6)Spark1.6默認采用的就是Sort-based Shuffle的方式 val shuffleMgrName= conf.get(“spark.shuffle.manager”,”sort”) 上述的源碼說明:你可以在Spark的配置文件中配置Spark框架運行時要使用的具體的ShuffleManager的實現(xiàn) 修改conf/spark-default.conf,加入以下內(nèi)容:spark.shuffle.manager SORT 具體ShuffleManager方式(Sort、Hash、鎢絲等) Sort-based Shuffle 不會為每個Reducer中的Task生成一個單獨的文件,相反,Sort-based Shuffle會把Mapper中每個SuffleMapTask所有的輸出數(shù)據(jù)Data只寫到一個文件中,因為每個ShuffleMapTask中的數(shù)據(jù)會被分類,所以Sort-based Shuffle使用index文件存儲具體ShuffleMapTask輸出數(shù)據(jù)在同一個Data文件中是如何分類的信息!!所以說基于Sort-based的Shuffle會在Mapper中的每一個ShuffleMapTask中產(chǎn)生兩個文件:Data文件和Index文件,其中Data文件是存儲當前Task的Shuffle輸出的,Index文件中則存儲了數(shù)據(jù)Data文件中數(shù)據(jù)通過Partitioner的分類信息,此時下一個階段的Stage中的Task就是根據(jù)這個Index文件獲取自己所要抓取的上一個Stage中的ShuffleMapTask產(chǎn)生的數(shù)據(jù)的。 Sort-based Shuffle會產(chǎn)生2M(M代表Mapper階段中并行的Partition的總數(shù)量,其實就是Mapper端ShuffleMapTask的總數(shù)量)個Shuffle臨時文件 回顧整個Shuffle的歷史,Shuffle產(chǎn)生的臨時文件的數(shù)量的變化依次為: Basic Hash Shuffle:M*R; Consalidate方式的Hash Shuffle:C*R; Sort-based Shuffle: 2M; 2、在集群中動手實戰(zhàn)Sort-based Shuffle 通過動手實踐確實證明了Sort-based Shuffle產(chǎn)生了2M個文件 在Sort-based Shuffle中的Reducer是如何獲取自己需要的數(shù)據(jù)的呢?具體而言,Reducer首先找Driver去獲取父Stage中每個ShuffleMapTask輸出的位置信息,根據(jù)位置信息獲取index文件,解析index文件,從解析的index文件中獲取Data文件中屬于自己的那部分內(nèi)容 3、默認Sort-based Shuffle的幾個缺陷 1)如果Mapper中Task的數(shù)量過大,依舊會產(chǎn)生很多小文件;此時在Shuffle傳遞數(shù)據(jù)的過程中到Reducer端,reduce會需要同時打開大量的記錄來進行反序列化,導致大量的內(nèi)存消耗和GC的巨大負擔,造成系統(tǒng)緩慢甚至崩潰 2)如果需要在分片內(nèi)進行排序的話,此時需要進行Mapper端和Reducer端的兩次排序?。?! hadoop mapper有數(shù)組代表的環(huán)形內(nèi)存緩存區(qū)(有數(shù)據(jù)也有索引)
|