很多文檔中描述,Mapper的數(shù)量在默認(rèn)情況下不可直接控制干預(yù),因為Mapper的數(shù)量由輸入的大小和個數(shù)決定。在默認(rèn)情況下,最終input占據(jù)了多少block,就應(yīng)該啟動多少個Mapper。如果輸入的文件數(shù)量巨大,但是每個文件的size都小于HDFS的blockSize,那么會造成啟動的Mapper等于文件的數(shù)量(即每個文件都占據(jù)了一個block),那么很可能造成啟動的Mapper數(shù)量超出限制而導(dǎo)致崩潰。這些邏輯確實是正確的,但都是在默認(rèn)情況下的邏輯。其實如果進行一些客戶化的設(shè)置,就可以控制了。 在Hadoop中,設(shè)置Map task的數(shù)量不像設(shè)置Reduce task數(shù)量那樣直接,即:不能夠通過API直接精確的告訴Hadoop應(yīng)該啟動多少個Map task。 你也許奇怪了,在API中不是提供了接口org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)嗎?這個值難道不可以設(shè)置Map task的數(shù)量嗎?這個API的確沒錯,在文檔上解釋”Note: This is only a hint to the framework.“,即這個值對Hadoop的框架來說僅僅是個提示,不起決定性的作用。也就是說,即便你設(shè)置了,也不一定得到你想要的效果。 1. InputFormat介紹 在具體設(shè)置Map task數(shù)量之前,非常有必要了解一下與Map-Reduce輸入相關(guān)的基礎(chǔ)知識。 這個接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的輸入規(guī)格說明(input-specification),它將所有的輸入文件分割成邏輯上的InputSplit,每一個InputSplit將會分給一個單獨的mapper;它還提供RecordReader的具體實現(xiàn),這個Reader從邏輯的InputSplit上獲取input records并傳給Mapper處理。 InputFormat有多種具體實現(xiàn),諸如FileInputFormat(處理基于文件的輸入的基礎(chǔ)抽象類), DBInputFormat(處理基于數(shù)據(jù)庫的輸入,數(shù)據(jù)來自于一個能用SQL查詢的表),KeyValueTextInputFormat(特殊的FineInputFormat,處理Plain Text File,文件由回車或者回車換行符分割成行,每一行由key.value.separator.in.input.line分割成Key和Value),CompositeInputFormat,DelegatingInputFormat等。在絕大多數(shù)應(yīng)用場景中都會使用FileInputFormat及其子類型。 通過以上的簡單介紹,我們知道InputFormat決定著InputSplit,每個InputSplit會分配給一個單獨的Mapper,因此InputFormat決定了具體的Map task數(shù)量。 2. FileInputFormat中影響Map數(shù)量的因素 在日常使用中,F(xiàn)ileInputFormat是最常用的InputFormat,它有很多具體的實現(xiàn)。以下分析的影響Map數(shù)量的因素僅對FileInputFormat及其子類有效,其他非FileInputFormat可以去查看相應(yīng)的 getSplits(JobConf job, int numSplits) 具體實現(xiàn)即可。 請看如下代碼段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代碼):
totalSize:是整個Map-Reduce job所有輸入的總大小。 numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設(shè)置的值,給M-R框架的Map數(shù)量的提示。 goalSize:是輸入總大小與提示Map task數(shù)量的比值,即期望每個Mapper處理多少的數(shù)據(jù),僅僅是期望,具體處理的數(shù)據(jù)數(shù)由下面的computeSplitSize決定。 minSplitSize:默認(rèn)為1,可由子類復(fù)寫函數(shù)protected void setMinSplitSize(long minSplitSize) 重新設(shè)置。一般情況下,都為1,特殊情況除外。 minSize:取的1和mapred.min.split.size中較大的一個。 blockSize:HDFS的塊大小,默認(rèn)為64M,一般大的HDFS都設(shè)置成128M。 splitSize:就是最終每個Split的大小,那么Map的數(shù)量基本上就是totalSize/splitSize。 接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的數(shù)據(jù)量)和HDFS的block size中取較小的,然后與mapred.min.split.size相比取較大的。 3. 如何調(diào)整Map的數(shù)量 有了2的分析,下面調(diào)整Map的數(shù)量就很容易了。 3.1 減小Map-Reduce job 啟動時創(chuàng)建的Mapper數(shù)量 當(dāng)處理大批量的大數(shù)據(jù)時,一種常見的情況是job啟動的mapper數(shù)量太多而超出了系統(tǒng)限制,導(dǎo)致Hadoop拋出異常終止執(zhí)行。解決這種異常的思路是減少mapper的數(shù)量。具體如下: 3.1.1 輸入文件size巨大,但不是小文件 這種情況可以通過增大每個mapper的input size,即增大minSize或者增大blockSize來減少所需的mapper的數(shù)量。增大blockSize通常不可行,因為當(dāng)HDFS被hadoop namenode -format之后,blockSize就已經(jīng)確定了(由格式化時dfs.block.size決定),如果要更改blockSize,需要重新格式化HDFS,這樣當(dāng)然會丟失已有的數(shù)據(jù)。所以通常情況下只能通過增大minSize,即增大mapred.min.split.size的值。 3.1.2 輸入文件數(shù)量巨大,且都是小文件 所謂小文件,就是單個文件的size小于blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合并成一個InputSplit送給mapper處理,從而減少mapper的數(shù)量。具體細(xì)節(jié)稍后會更新并展開。 3.2 增加Map-Reduce job 啟動時創(chuàng)建的Mapper數(shù)量 增加mapper的數(shù)量,可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值。 原文鏈接:http://blog.csdn.net/yishao_20140413/article/details/24932655 |
|