乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Hadoop API 使用介紹

       mumuxd 2011-08-09

      Hadoop API 使用介紹
      2009-11-17 00:57

      Hadoop API被分成(divide into)如下幾種主要的包(package)
      org.apache.hadoop.conf     定義了系統(tǒng)參數(shù)的配置文件處理API。
      org.apache.hadoop.fs          定義了抽象的文件系統(tǒng)API。
      org.apache.hadoop.dfs       Hadoop分布式文件系統(tǒng)(HDFS)模塊的實現(xiàn)。
      org.apache.hadoop.io         定義了通用的I/O API,用于針對網(wǎng)絡(luò),數(shù)據(jù)庫,文件等數(shù)據(jù)對象做讀寫操作。
      org.apache.hadoop.ipc       用于網(wǎng)絡(luò)服務(wù)端和客戶端的工具,封裝了網(wǎng)絡(luò)異步I/O的基礎(chǔ)模塊。
      org.apache.hadoop.mapred         Hadoop分布式計算系統(tǒng)(MapReduce)模塊的實現(xiàn),包括任務(wù)的分發(fā)調(diào)度等。
      org.apache.hadoop.metrics        定義了用于性能統(tǒng)計信息的API,主要用于mapred和dfs模塊。
      org.apache.hadoop.record 定義了針對記錄的I/O API類以及一個記錄描述語言翻譯器,用于簡化將記錄序列化成語言中性的格式(language-neutral manner)。
      org.apache.hadoop.tools    定義了一些通用的工具。
      org.apache.hadoop.util       定義了一些公用的API。

      MapReduce框架結(jié)構(gòu)
           Map/Reduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型,它最初是由Google工程師設(shè)計并實現(xiàn)的,Google已經(jīng)將它完整的MapReduce論文公開發(fā)布了。其中對它的定義是,Map/Reduce是一個編程模型(programming model),是一個用于處理和生成大規(guī)模數(shù)據(jù)集(processing and generating large data sets)的相關(guān)的實現(xiàn)。用戶定義一個map函數(shù)來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數(shù)將所有這些中間的有著相同key的values合并起來。很多現(xiàn)實世界中的任務(wù)都可用這個模型來表達。
      Hadoop的Map/Reduce框架也是基于這個原理實現(xiàn)的,下面簡要介紹一下Map/Reduce框架主要組成及相互的關(guān)系。
      2.1 總體結(jié)構(gòu)
      2.1.1     Mapper和Reducer

      運行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個Combiner類,它實際也是Reducer的實現(xiàn)。
      2.1.2    JobTracker和TaskTracker
      它們都是由一個master服務(wù)JobTracker和多個運行于多個節(jié)點的slaver服務(wù)TaskTracker兩個類提供的服務(wù)調(diào)度的。master負(fù)責(zé)調(diào)度job的每一個子任務(wù)task運行于slave上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運行它,slave則負(fù)責(zé)直接執(zhí)行每一個task。TaskTracker都需要運行在HDFS的DataNode上,而JobTracker則不需要,一般情況應(yīng)該把JobTracker部署在單獨的機器上。
      2.1.3    JobClient
      每一個job都會在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)Configuration打包成jar文件存儲在HDFS,并把路徑提交到JobTracker的master服務(wù),然后由master創(chuàng)建每一個Task(即MapTask和ReduceTask)將它們分發(fā)到各個TaskTracker服務(wù)中去執(zhí)行。
      2.1.4    JobInProgress
      JobClient提交job后,JobTracker會創(chuàng)建一個JobInProgress來跟蹤和調(diào)度這個job,并把它添加到j(luò)ob隊列里。JobInProgress會根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個ReduceTask。
      2.1.5    TaskInProgress
      JobTracker啟動任務(wù)時通過每一個TaskInProgress來launch Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設(shè)置好環(huán)境變量后啟動一個獨立的java child進程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。
      2.1.6    MapTask和ReduceTask
      一個完整的job會自動依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實際也是Reducer接口類的實現(xiàn)。Mapper會根據(jù)job jar中定義的輸入數(shù)據(jù)集按<key1,value1>對讀入,處理完成生成臨時的<key2,value2>對,如果定義了Combiner,MapTask會在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進程調(diào)用Reducer處理,生成最終結(jié)果<key3,value3>對。這個過程在下一部分再詳細(xì)介紹。
      下圖描述了Map/Reduce框架中主要組成和它們之間的關(guān)系:


      2.2     Job創(chuàng)建過程
      2.2.1    JobClient.runJob() 開始運行job并分解輸入數(shù)據(jù)集
      一個MapReduce的Job會通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個小數(shù)據(jù)集會對應(yīng)創(chuàng)建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統(tǒng)一打包到j(luò)obFile的jar中并存儲在HDFS中,再將jobFile路徑提交給JobTracker去調(diào)度和執(zhí)行。
      2.2.2    JobClient.submitJob() 提交job到JobTracker
      jobFile的提交過程是通過RPC模塊(有單獨一章來詳細(xì)介紹)來實現(xiàn)的。大致過程是,JobClient類中通過RPC實現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實現(xiàn)JobSubmissionProtocol接口。JobTracker則根據(jù)獲得的jobFile路徑創(chuàng)建與job有關(guān)的一系列對象(即JobInProgress和TaskInProgress等)來調(diào)度并執(zhí)行job。
      JobTracker創(chuàng)建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態(tài)信息,如執(zhí)行時間、Map和Reduce任務(wù)完成的比例等。JobClient會根據(jù)這個JobStatus對象創(chuàng)建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執(zhí)行過程的統(tǒng)計數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺。
      與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示


      2.3     Job執(zhí)行過程
      上面已經(jīng)提到,job是統(tǒng)一由JobTracker來調(diào)度的,具體的Task分發(fā)給各個TaskTracker節(jié)點來執(zhí)行。下面通過源碼來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請求開始。
      2.3.1    JobTracker初始化Job和Task隊列過程
      2.3.1.1     JobTracker.submitJob() 收到請求
      當(dāng)JobTracker接收到新的job請求(即submitJob()函數(shù)被調(diào)用)后,會創(chuàng)建一個JobInProgress對象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時候會初始化一系列與任務(wù)有關(guān)的參數(shù),如job jar的位置(會把它從HDFS復(fù)制本地的文件系統(tǒng)中的臨時目錄里),Map和Reduce的數(shù)據(jù),job的優(yōu)先級別,以及記錄統(tǒng)計報告的對象等。
      2.3.1.2     JobTracker.resortPriority() 加入隊列并按優(yōu)先級排序
      JobInProgress創(chuàng)建后,首先將它加入到j(luò)obs隊列里,分別用一個map成員變量jobs用來管理所有jobs對象,一個list成員變量jobsByPriority用來維護jobs的執(zhí)行優(yōu)先級別。之后JobTracker會調(diào)用resortPriority()函數(shù),將jobs先按優(yōu)先級別排序,再按提交時間排序,這樣保證最高優(yōu)先并且先提交的job會先執(zhí)行。
      2.3.1.3     JobTracker.JobInitThread 通知初始化線程
      然后JobTracker會把此job加入到一個管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。通過此成員變量調(diào)用notifyAll()函數(shù),會喚起一個用于初始化job的線程JobInitThread來處理(JobTracker會有幾個內(nèi)部的線程來維護jobs隊列,它們的實現(xiàn)都在JobTracker代碼里,稍候再詳細(xì)介紹)。JobInitThread收到信號后即取出最靠前的job,即優(yōu)先級別最高的job,調(diào)用JobInProgress的initTasks()函數(shù)執(zhí)行真正的初始化工作。
      2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress
      Task的初始化過程稍復(fù)雜些,首先步驟JobInProgress會創(chuàng)建Map的監(jiān)控對象。在initTasks()函數(shù)里通過調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點的host,這個會在RawSplit創(chuàng)建時通過FileSplit的getLocations()函數(shù)獲取,該函數(shù)會調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個細(xì)節(jié)會在HDFS模塊中講解)。當(dāng)然如果是存儲在本地文件系統(tǒng)中,即使用LocalFileSystem時當(dāng)然只有一個location即“localhost”了。
      其次JobInProgress會創(chuàng)建Reduce的監(jiān)控對象,這個比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的也是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。
      JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束,執(zhí)行則是通過另一異步的方式處理的,下面接著介紹它。
      與初始化Job過程相關(guān)的類和方法如下圖所示


      2.3.2TaskTracker執(zhí)行Task的過程
      Task的執(zhí)行實際是由TaskTracker發(fā)起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會在這時啟動,即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時進行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口(在IPC章節(jié)詳細(xì)介紹)實現(xiàn)。這個過程實際比較復(fù)雜,下面一一簡單介紹下每個步驟。
      2.3.2.1     TaskTracker.run() 連接JobTracker
      TaskTracker的啟動過程會初始化一系列參數(shù)和服務(wù)(另有單獨的一節(jié)介紹),然后嘗試連接JobTracker服務(wù)(即必須實現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù),此過程參見run()方法。
      2.3.2.2     TaskTracker.offerService() 主循環(huán)
      如果連接JobTracker服務(wù)成功,TaskTracker就會調(diào)用offerService()函數(shù)進入主執(zhí)行循環(huán)中。這個循環(huán)會每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat()獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數(shù)組。再遍歷這個數(shù)組,如果是一個新任務(wù)指令即LaunchTaskAction則調(diào)用startNewTask()函數(shù)執(zhí)行新任務(wù),否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執(zhí)行KillJobAction或者KillTaskAction等。
      2.3.2.3     TaskTracker.transmitHeartBeat() 獲取JobTracker指令
      在transmitHeartBeat()函數(shù)處理中,TaskTracker會創(chuàng)建一個新的TaskTrackerStatus對象記錄目前任務(wù)的執(zhí)行狀況,然后通過IPC接口調(diào)用JobTracker的heartbeat()方法發(fā)送過去,并接受新的指令,即返回值TaskTrackerAction數(shù)組。在這個調(diào)用之前,TaskTracker會先檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。操作成功后再更新相關(guān)的統(tǒng)計信息等。
      2.3.2.4     TaskTracker.startNewTask() 啟動新任務(wù)
      此函數(shù)的主要任務(wù)就是創(chuàng)建TaskTracker$TaskInProgress對象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊列中。完成后則調(diào)用localizeJob()真正初始化Task并開始執(zhí)行。
      2.3.2.5     TaskTracker.localizeJob() 初始化job目錄等
      此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊列中。完成后即調(diào)用launchTaskForJob()開始執(zhí)行Task。
      2.3.2.6     TaskTracker.launchTaskForJob() 執(zhí)行任務(wù)
      啟動Task的工作實際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來執(zhí)行的。
      2.3.2.7     TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù)
      執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法最后啟動Task獨立的java執(zhí)行子進程。
      2.3.2.8     Task.createRunner() 創(chuàng)建啟動Runner對象
      Task有兩個實現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會創(chuàng)建MapTaskRunner來啟動Task子進程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動。
      2.3.2.9     TaskRunner.start() 啟動子進程真正執(zhí)行Task
      這里是真正啟動子進程并執(zhí)行Task的地方。它會調(diào)用run()函數(shù)來處理。執(zhí)行的過程比較復(fù)雜,主要的工作就是初始化啟動java子進程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等(需要將TaskTracker的環(huán)境變量以及job jar的路徑合并起來)。然后裝載job jar包,調(diào)用runChild()方法啟動子進程,即通過ProcessBuilder來創(chuàng)建,同時子進程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過TaskLog類來實現(xiàn)。這里有個小問題,Task子進程只能輸出INFO級別日志,而且該級別是在run()函數(shù)中直接指定,不過改進也不復(fù)雜。
      與Job執(zhí)行過程相關(guān)的類和方法如下圖所示


      2.4 JobTracker和TaskTracker
      如上面所述,JobTracker和TaskTracker是MapReduce框架最基本的兩個服務(wù),其他所有處理均由它們調(diào)度執(zhí)行,下面簡單介紹它們內(nèi)部提供的服務(wù)及創(chuàng)建的線程,詳細(xì)過程下回分解
      2.4.1JobTracker的服務(wù)和線程
      JobTracker是MapReduce框架中最主要的類之一,所有job的執(zhí)行都由它來調(diào)度,而且Hadoop系統(tǒng)中只配置一個JobTracker應(yīng)用。啟動JobTracker后它會初始化若干個服務(wù)以及若干個內(nèi)部線程用來維護job的執(zhí)行過程和結(jié)果。下面簡單介紹一下它們。
      首先,JobTracker會啟動一個interTrackerServer,端口配置在Configuration中的“mapred.job.tracker”參數(shù),缺省是綁定8012端口。它有兩個用途,一是用于接收和處理TaskTracker的heartbeat等請求,即必須實現(xiàn)InterTrackerProtocol接口及協(xié)議。二是用于接收和處理JobClient的請求,如submitJob,killJob等,即必須實現(xiàn)JobSubmissionProtocol接口及協(xié)議。
      其次,它會啟動一個infoServer,運行StatusHttpServer,缺省監(jiān)聽50030端口。是一個web服務(wù),用于給用戶提供web界面查詢job執(zhí)行狀況的服務(wù)。
      JobTracker還會啟動多個線程,ExpireLaunchingTasks線程用于停止那些未在超時時間內(nèi)報告進度的Tasks。ExpireTrackers線程用于停止那些可能已經(jīng)當(dāng)?shù)舻腡askTracker,即長時間未報告的TaskTracker將不會再分配新的Task。RetireJobs線程用于清除那些已經(jīng)完成很長時間還存在隊列里的jobs。JobInitThread線程用于初始化job,這在前面章節(jié)已經(jīng)介紹。TaskCommitQueue線程用于調(diào)度Task的那些所有與FileSystem操作相關(guān)的處理,并記錄Task的狀態(tài)等信息。
      2.4.2TaskTracker的服務(wù)和線程
      TaskTracker也是MapReduce框架中最主要的類之一,它運行于每一臺DataNode節(jié)點上,用于調(diào)度Task的實際運行工作。它內(nèi)部也會啟動一些服務(wù)和線程。
      TaskTracker也會啟動一個StatusHttpServer服務(wù)來提供web界面的查詢Task執(zhí)行狀態(tài)的工具。
      其次,它還會啟動一個taskReportServer服務(wù),這個用于提供給它的子進程即TaskRunner啟動的MapTask或者ReduceTask向它報告狀況,子進程的啟動命令實現(xiàn)在TaskTracker$Child類中,由TaskRunner.run()通過命令行參數(shù)傳入該服務(wù)地址和端口,即調(diào)用TaskTracker的getTaskTrackerReportAddress(),這個地址會在taskReportServer服務(wù)創(chuàng)建時獲得。
      TaskTracker也會啟動一個MapEventsFetcherThread線程用于獲取Map任務(wù)的輸出數(shù)據(jù)信息。
      2.5 Job狀態(tài)監(jiān)控
      未完待

      Hadoop API 使用介紹 2009-11-17 00:57 Hadoop API被分成(divide into)如下幾種主要的包(package) org.apache.hadoop.conf     定義了系統(tǒng)參數(shù)的配置文件處理API。 org.apache.hadoop.fs          定義了抽象的文件系統(tǒng)API。 org.apache.hadoop.dfs       Hadoop分布式文件系統(tǒng)(HDFS)模塊的實現(xiàn)。 org.apache.hadoop.io         定義了通用的I/O API,用于針對網(wǎng)絡(luò),數(shù)據(jù)庫,文件等數(shù)據(jù)對象做讀寫操作。 org.apache.hadoop.ipc       用于網(wǎng)絡(luò)服務(wù)端和客戶端的工具,封裝了網(wǎng)絡(luò)異步I/O的基礎(chǔ)模塊。 org.apache.hadoop.mapred         Hadoop分布式計算系統(tǒng)(MapReduce)模塊的實現(xiàn),包括任務(wù)的分發(fā)調(diào)度等。 org.apache.hadoop.metrics        定義了用于性能統(tǒng)計信息的API,主要用于mapred和dfs模塊。 org.apache.hadoop.record 定義了針對記錄的I/O API類以及一個記錄描述語言翻譯器,用于簡化將記錄序列化成語言中性的格式(language-neutral manner)。 org.apache.hadoop.tools    定義了一些通用的工具。 org.apache.hadoop.util       定義了一些公用的API。 MapReduce框架結(jié)構(gòu)      Map/Reduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型,它最初是由Google工程師設(shè)計并實現(xiàn)的,Google已經(jīng)將它完整的MapReduce論文公開發(fā)布了。其中對它的定義是,Map/Reduce是一個編程模型(programming model),是一個用于處理和生成大規(guī)模數(shù)據(jù)集(processing and generating large data sets)的相關(guān)的實現(xiàn)。用戶定義一個map函數(shù)來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數(shù)將所有這些中間的有著相同key的values合并起來。很多現(xiàn)實世界中的任務(wù)都可用這個模型來表達。 Hadoop的Map/Reduce框架也是基于這個原理實現(xiàn)的,下面簡要介紹一下Map/Reduce框架主要組成及相互的關(guān)系。 2.1 總體結(jié)構(gòu) 2.1.1     Mapper和Reducer 運行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個Combiner類,它實際也是Reducer的實現(xiàn)。 2.1.2    JobTracker和TaskTracker 它們都是由一個master服務(wù)JobTracker和多個運行于多個節(jié)點的slaver服務(wù)TaskTracker兩個類提供的服務(wù)調(diào)度的。master負(fù)責(zé)調(diào)度job的每一個子任務(wù)task運行于slave上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運行它,slave則負(fù)責(zé)直接執(zhí)行每一個task。TaskTracker都需要運行在HDFS的DataNode上,而JobTracker則不需要,一般情況應(yīng)該把JobTracker部署在單獨的機器上。 2.1.3    JobClient 每一個job都會在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)Configuration打包成jar文件存儲在HDFS,并把路徑提交到JobTracker的master服務(wù),然后由master創(chuàng)建每一個Task(即MapTask和ReduceTask)將它們分發(fā)到各個TaskTracker服務(wù)中去執(zhí)行。 2.1.4    JobInProgress JobClient提交job后,JobTracker會創(chuàng)建一個JobInProgress來跟蹤和調(diào)度這個job,并把它添加到j(luò)ob隊列里。JobInProgress會根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個ReduceTask。 2.1.5    TaskInProgress JobTracker啟動任務(wù)時通過每一個TaskInProgress來launch Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設(shè)置好環(huán)境變量后啟動一個獨立的java child進程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。 2.1.6    MapTask和ReduceTask 一個完整的job會自動依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實際也是Reducer接口類的實現(xiàn)。Mapper會根據(jù)job jar中定義的輸入數(shù)據(jù)集按對讀入,處理完成生成臨時的對,如果定義了Combiner,MapTask會在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進程調(diào)用Reducer處理,生成最終結(jié)果對。這個過程在下一部分再詳細(xì)介紹。 下圖描述了Map/Reduce框架中主要組成和它們之間的關(guān)系: 2.2     Job創(chuàng)建過程 2.2.1    JobClient.runJob() 開始運行job并分解輸入數(shù)據(jù)集 一個MapReduce的Job會通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個小數(shù)據(jù)集會對應(yīng)創(chuàng)建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統(tǒng)一打包到j(luò)obFile的jar中并存儲在HDFS中,再將jobFile路徑提交給JobTracker去調(diào)度和執(zhí)行。 2.2.2    JobClient.submitJob() 提交job到JobTracker jobFile的提交過程是通過RPC模塊(有單獨一章來詳細(xì)介紹)來實現(xiàn)的。大致過程是,JobClient類中通過RPC實現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實現(xiàn)JobSubmissionProtocol接口。JobTracker則根據(jù)獲得的jobFile路徑創(chuàng)建與job有關(guān)的一系列對象(即JobInProgress和TaskInProgress等)來調(diào)度并執(zhí)行job。 JobTracker創(chuàng)建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態(tài)信息,如執(zhí)行時間、Map和Reduce任務(wù)完成的比例等。JobClient會根據(jù)這個JobStatus對象創(chuàng)建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執(zhí)行過程的統(tǒng)計數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺。 與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示 2.3     Job執(zhí)行過程 上面已經(jīng)提到,job是統(tǒng)一由JobTracker來調(diào)度的,具體的Task分發(fā)給各個TaskTracker節(jié)點來執(zhí)行。下面通過源碼來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請求開始。 2.3.1    JobTracker初始化Job和Task隊列過程 2.3.1.1     JobTracker.submitJob() 收到請求 當(dāng)JobTracker接收到新的job請求(即submitJob()函數(shù)被調(diào)用)后,會創(chuàng)建一個JobInProgress對象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時候會初始化一系列與任務(wù)有關(guān)的參數(shù),如job jar的位置(會把它從HDFS復(fù)制本地的文件系統(tǒng)中的臨時目錄里),Map和Reduce的數(shù)據(jù),job的優(yōu)先級別,以及記錄統(tǒng)計報告的對象等。 2.3.1.2     JobTracker.resortPriority() 加入隊列并按優(yōu)先級排序 JobInProgress創(chuàng)建后,首先將它加入到j(luò)obs隊列里,分別用一個map成員變量jobs用來管理所有jobs對象,一個list成員變量jobsByPriority用來維護jobs的執(zhí)行優(yōu)先級別。之后JobTracker會調(diào)用resortPriority()函數(shù),將jobs先按優(yōu)先級別排序,再按提交時間排序,這樣保證最高優(yōu)先并且先提交的job會先執(zhí)行。 2.3.1.3     JobTracker.JobInitThread 通知初始化線程 然后JobTracker會把此job加入到一個管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。通過此成員變量調(diào)用notifyAll()函數(shù),會喚起一個用于初始化job的線程JobInitThread來處理(JobTracker會有幾個內(nèi)部的線程來維護jobs隊列,它們的實現(xiàn)都在JobTracker代碼里,稍候再詳細(xì)介紹)。JobInitThread收到信號后即取出最靠前的job,即優(yōu)先級別最高的job,調(diào)用JobInProgress的initTasks()函數(shù)執(zhí)行真正的初始化工作。 2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress Task的初始化過程稍復(fù)雜些,首先步驟JobInProgress會創(chuàng)建Map的監(jiān)控對象。在initTasks()函數(shù)里通過調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點的host,這個會在RawSplit創(chuàng)建時通過FileSplit的getLocations()函數(shù)獲取,該函數(shù)會調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個細(xì)節(jié)會在HDFS模塊中講解)。當(dāng)然如果是存儲在本地文件系統(tǒng)中,即使用LocalFileSystem時當(dāng)然只有一個location即“localhost”了。 其次JobInProgress會創(chuàng)建Reduce的監(jiān)控對象,這個比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的也是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。 JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束,執(zhí)行則是通過另一異步的方式處理的,下面接著介紹它。 與初始化Job過程相關(guān)的類和方法如下圖所示 2.3.2TaskTracker執(zhí)行Task的過程 Task的執(zhí)行實際是由TaskTracker發(fā)起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會在這時啟動,即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時進行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口(在IPC章節(jié)詳細(xì)介紹)實現(xiàn)。這個過程實際比較復(fù)雜,下面一一簡單介紹下每個步驟。 2.3.2.1     TaskTracker.run() 連接JobTracker TaskTracker的啟動過程會初始化一系列參數(shù)和服務(wù)(另有單獨的一節(jié)介紹),然后嘗試連接JobTracker服務(wù)(即必須實現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù),此過程參見run()方法。 2.3.2.2     TaskTracker.offerService() 主循環(huán) 如果連接JobTracker服務(wù)成功,TaskTracker就會調(diào)用offerService()函數(shù)進入主執(zhí)行循環(huán)中。這個循環(huán)會每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat()獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數(shù)組。再遍歷這個數(shù)組,如果是一個新任務(wù)指令即LaunchTaskAction則調(diào)用startNewTask()函數(shù)執(zhí)行新任務(wù),否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執(zhí)行KillJobAction或者KillTaskAction等。 2.3.2.3     TaskTracker.transmitHeartBeat() 獲取JobTracker指令 在transmitHeartBeat()函數(shù)處理中,TaskTracker會創(chuàng)建一個新的TaskTrackerStatus對象記錄目前任務(wù)的執(zhí)行狀況,然后通過IPC接口調(diào)用JobTracker的heartbeat()方法發(fā)送過去,并接受新的指令,即返回值TaskTrackerAction數(shù)組。在這個調(diào)用之前,TaskTracker會先檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。操作成功后再更新相關(guān)的統(tǒng)計信息等。 2.3.2.4     TaskTracker.startNewTask() 啟動新任務(wù) 此函數(shù)的主要任務(wù)就是創(chuàng)建TaskTracker$TaskInProgress對象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊列中。完成后則調(diào)用localizeJob()真正初始化Task并開始執(zhí)行。 2.3.2.5     TaskTracker.localizeJob() 初始化job目錄等 此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊列中。完成后即調(diào)用launchTaskForJob()開始執(zhí)行Task。 2.3.2.6     TaskTracker.launchTaskForJob() 執(zhí)行任務(wù) 啟動Task的工作實際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來執(zhí)行的。 2.3.2.7     TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù) 執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法最后啟動Task獨立的java執(zhí)行子進程。 2.3.2.8     Task.createRunner() 創(chuàng)建啟動Runner對象 Task有兩個實現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會創(chuàng)建MapTaskRunner來啟動Task子進程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動。 2.3.2.9     TaskRunner.start() 啟動子進程真正執(zhí)行Task 這里是真正啟動子進程并執(zhí)行Task的地方。它會調(diào)用run()函數(shù)來處理。執(zhí)行的過程比較復(fù)雜,主要的工作就是初始化啟動java子進程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等(需要將TaskTracker的環(huán)境變量以及job jar的路徑合并起來)。然后裝載job jar包,調(diào)用runChild()方法啟動子進程,即通過ProcessBuilder來創(chuàng)建,同時子進程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過TaskLog類來實現(xiàn)。這里有個小問題,Task子進程只能輸出INFO級別日志,而且該級別是在run()函數(shù)中直接指定,不過改進也不復(fù)雜。 與Job執(zhí)行過程相關(guān)的類和方法如下圖所示 2.4 JobTracker和TaskTracker 如上面所述,JobTracker和TaskTracker是MapReduce框架最基本的兩個服務(wù),其他所有處理均由它們調(diào)度執(zhí)行,下面簡單介紹它們內(nèi)部提供的服務(wù)及創(chuàng)建的線程,詳細(xì)過程下回分解 2.4.1JobTracker的服務(wù)和線程 JobTracker是MapReduce框架中最主要的類之一,所有job的執(zhí)行都由它來調(diào)度,而且Hadoop系統(tǒng)中只配置一個JobTracker應(yīng)用。啟動JobTracker后它會初始化若干個服務(wù)以及若干個內(nèi)部線程用來維護job的執(zhí)行過程和結(jié)果。下面簡單介紹一下它們。 首先,JobTracker會啟動一個interTrackerServer,端口配置在Configuration中的“mapred.job.tracker”參數(shù),缺省是綁定8012端口。它有兩個用途,一是用于接收和處理TaskTracker的heartbeat等請求,即必須實現(xiàn)InterTrackerProtocol接口及協(xié)議。二是用于接收和處理JobClient的請求,如submitJob,killJob等,即必須實現(xiàn)JobSubmissionProtocol接口及協(xié)議。 其次,它會啟動一個infoServer,運行StatusHttpServer,缺省監(jiān)聽50030端口。是一個web服務(wù),用于給用戶提供web界面查詢job執(zhí)行狀況的服務(wù)。 JobTracker還會啟動多個線程,ExpireLaunchingTasks線程用于停止那些未在超時時間內(nèi)報告進度的Tasks。ExpireTrackers線程用于停止那些可能已經(jīng)當(dāng)?shù)舻腡askTracker,即長時間未報告的TaskTracker將不會再分配新的Task。RetireJobs線程用于清除那些已經(jīng)完成很長時間還存在隊列里的jobs。JobInitThread線程用于初始化job,這在前面章節(jié)已經(jīng)介紹。TaskCommitQueue線程用于調(diào)度Task的那些所有與FileSystem操作相關(guān)的處理,并記錄Task的狀態(tài)等信息。 2.4.2TaskTracker的服務(wù)和線程 TaskTracker也是MapReduce框架中最主要的類之一,它運行于每一臺DataNode節(jié)點上,用于調(diào)度Task的實際運行工作。它內(nèi)部也會啟動一些服務(wù)和線程。 TaskTracker也會啟動一個StatusHttpServer服務(wù)來提供web界面的查詢Task執(zhí)行狀態(tài)的工具。 其次,它還會啟動一個taskReportServer服務(wù),這個用于提供給它的子進程即TaskRunner啟動的MapTask或者ReduceTask向它報告狀況,子進程的啟動命令實現(xiàn)在TaskTracker$Child類中,由TaskRunner.run()通過命令行參數(shù)傳入該服務(wù)地址和端口,即調(diào)用TaskTracker的getTaskTrackerReportAddress(),這個地址會在taskReportServer服務(wù)創(chuàng)建時獲得。 TaskTracker也會啟動一個MapEventsFetcherThread線程用于獲取Map任務(wù)的輸出數(shù)據(jù)信息。 2.5 Job狀態(tài)監(jiān)控 未完待Hadoop API 使用介紹 2009-11-17 00:57 Hadoop API被分成(divide into)如下幾種主要的包(package) org.apache.hadoop.conf     定義了系統(tǒng)參數(shù)的配置文件處理API。 org.apache.hadoop.fs          定義了抽象的文件系統(tǒng)API。 org.apache.hadoop.dfs       Hadoop分布式文件系統(tǒng)(HDFS)模塊的實現(xiàn)。 org.apache.hadoop.io         定義了通用的I/O API,用于針對網(wǎng)絡(luò),數(shù)據(jù)庫,文件等數(shù)據(jù)對象做讀寫操作。 org.apache.hadoop.ipc       用于網(wǎng)絡(luò)服務(wù)端和客戶端的工具,封裝了網(wǎng)絡(luò)異步I/O的基礎(chǔ)模塊。 org.apache.hadoop.mapred         Hadoop分布式計算系統(tǒng)(MapReduce)模塊的實現(xiàn),包括任務(wù)的分發(fā)調(diào)度等。 org.apache.hadoop.metrics        定義了用于性能統(tǒng)計信息的API,主要用于mapred和dfs模塊。 org.apache.hadoop.record 定義了針對記錄的I/O API類以及一個記錄描述語言翻譯器,用于簡化將記錄序列化成語言中性的格式(language-neutral manner)。 org.apache.hadoop.tools    定義了一些通用的工具。 org.apache.hadoop.util       定義了一些公用的API。 MapReduce框架結(jié)構(gòu)      Map/Reduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型,它最初是由Google工程師設(shè)計并實現(xiàn)的,Google已經(jīng)將它完整的MapReduce論文公開發(fā)布了。其中對它的定義是,Map/Reduce是一個編程模型(programming model),是一個用于處理和生成大規(guī)模數(shù)據(jù)集(processing and generating large data sets)的相關(guān)的實現(xiàn)。用戶定義一個map函數(shù)來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數(shù)將所有這些中間的有著相同key的values合并起來。很多現(xiàn)實世界中的任務(wù)都可用這個模型來表達。 Hadoop的Map/Reduce框架也是基于這個原理實現(xiàn)的,下面簡要介紹一下Map/Reduce框架主要組成及相互的關(guān)系。 2.1 總體結(jié)構(gòu) 2.1.1     Mapper和Reducer 運行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個Combiner類,它實際也是Reducer的實現(xiàn)。 2.1.2    JobTracker和TaskTracker 它們都是由一個master服務(wù)JobTracker和多個運行于多個節(jié)點的slaver服務(wù)TaskTracker兩個類提供的服務(wù)調(diào)度的。master負(fù)責(zé)調(diào)度job的每一個子任務(wù)task運行于slave上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運行它,slave則負(fù)責(zé)直接執(zhí)行每一個task。TaskTracker都需要運行在HDFS的DataNode上,而JobTracker則不需要,一般情況應(yīng)該把JobTracker部署在單獨的機器上。 2.1.3    JobClient 每一個job都會在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)Configuration打包成jar文件存儲在HDFS,并把路徑提交到JobTracker的master服務(wù),然后由master創(chuàng)建每一個Task(即MapTask和ReduceTask)將它們分發(fā)到各個TaskTracker服務(wù)中去執(zhí)行。 2.1.4    JobInProgress JobClient提交job后,JobTracker會創(chuàng)建一個JobInProgress來跟蹤和調(diào)度這個job,并把它添加到j(luò)ob隊列里。JobInProgress會根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個ReduceTask。 2.1.5    TaskInProgress JobTracker啟動任務(wù)時通過每一個TaskInProgress來launch Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設(shè)置好環(huán)境變量后啟動一個獨立的java child進程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。 2.1.6    MapTask和ReduceTask 一個完整的job會自動依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實際也是Reducer接口類的實現(xiàn)。Mapper會根據(jù)job jar中定義的輸入數(shù)據(jù)集按對讀入,處理完成生成臨時的對,如果定義了Combiner,MapTask會在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進程調(diào)用Reducer處理,生成最終結(jié)果對。這個過程在下一部分再詳細(xì)介紹。 下圖描述了Map/Reduce框架中主要組成和它們之間的關(guān)系: 2.2     Job創(chuàng)建過程 2.2.1    JobClient.runJob() 開始運行job并分解輸入數(shù)據(jù)集 一個MapReduce的Job會通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個小數(shù)據(jù)集會對應(yīng)創(chuàng)建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統(tǒng)一打包到j(luò)obFile的jar中并存儲在HDFS中,再將jobFile路徑提交給JobTracker去調(diào)度和執(zhí)行。 2.2.2    JobClient.submitJob() 提交job到JobTracker jobFile的提交過程是通過RPC模塊(有單獨一章來詳細(xì)介紹)來實現(xiàn)的。大致過程是,JobClient類中通過RPC實現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實現(xiàn)JobSubmissionProtocol接口。JobTracker則根據(jù)獲得的jobFile路徑創(chuàng)建與job有關(guān)的一系列對象(即JobInProgress和TaskInProgress等)來調(diào)度并執(zhí)行job。 JobTracker創(chuàng)建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態(tài)信息,如執(zhí)行時間、Map和Reduce任務(wù)完成的比例等。JobClient會根據(jù)這個JobStatus對象創(chuàng)建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執(zhí)行過程的統(tǒng)計數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺。 與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示 2.3     Job執(zhí)行過程 上面已經(jīng)提到,job是統(tǒng)一由JobTracker來調(diào)度的,具體的Task分發(fā)給各個TaskTracker節(jié)點來執(zhí)行。下面通過源碼來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請求開始。 2.3.1    JobTracker初始化Job和Task隊列過程 2.3.1.1     JobTracker.submitJob() 收到請求 當(dāng)JobTracker接收到新的job請求(即submitJob()函數(shù)被調(diào)用)后,會創(chuàng)建一個JobInProgress對象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時候會初始化一系列與任務(wù)有關(guān)的參數(shù),如job jar的位置(會把它從HDFS復(fù)制本地的文件系統(tǒng)中的臨時目錄里),Map和Reduce的數(shù)據(jù),job的優(yōu)先級別,以及記錄統(tǒng)計報告的對象等。 2.3.1.2     JobTracker.resortPriority() 加入隊列并按優(yōu)先級排序 JobInProgress創(chuàng)建后,首先將它加入到j(luò)obs隊列里,分別用一個map成員變量jobs用來管理所有jobs對象,一個list成員變量jobsByPriority用來維護jobs的執(zhí)行優(yōu)先級別。之后JobTracker會調(diào)用resortPriority()函數(shù),將jobs先按優(yōu)先級別排序,再按提交時間排序,這樣保證最高優(yōu)先并且先提交的job會先執(zhí)行。 2.3.1.3     JobTracker.JobInitThread 通知初始化線程 然后JobTracker會把此job加入到一個管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。通過此成員變量調(diào)用notifyAll()函數(shù),會喚起一個用于初始化job的線程JobInitThread來處理(JobTracker會有幾個內(nèi)部的線程來維護jobs隊列,它們的實現(xiàn)都在JobTracker代碼里,稍候再詳細(xì)介紹)。JobInitThread收到信號后即取出最靠前的job,即優(yōu)先級別最高的job,調(diào)用JobInProgress的initTasks()函數(shù)執(zhí)行真正的初始化工作。 2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress Task的初始化過程稍復(fù)雜些,首先步驟JobInProgress會創(chuàng)建Map的監(jiān)控對象。在initTasks()函數(shù)里通過調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點的host,這個會在RawSplit創(chuàng)建時通過FileSplit的getLocations()函數(shù)獲取,該函數(shù)會調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個細(xì)節(jié)會在HDFS模塊中講解)。當(dāng)然如果是存儲在本地文件系統(tǒng)中,即使用LocalFileSystem時當(dāng)然只有一個location即“localhost”了。 其次JobInProgress會創(chuàng)建Reduce的監(jiān)控對象,這個比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的也是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。 JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束,執(zhí)行則是通過另一異步的方式處理的,下面接著介紹它。 與初始化Job過程相關(guān)的類和方法如下圖所示 2.3.2TaskTracker執(zhí)行Task的過程 Task的執(zhí)行實際是由TaskTracker發(fā)起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會在這時啟動,即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時進行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口(在IPC章節(jié)詳細(xì)介紹)實現(xiàn)。這個過程實際比較復(fù)雜,下面一一簡單介紹下每個步驟。 2.3.2.1     TaskTracker.run() 連接JobTracker TaskTracker的啟動過程會初始化一系列參數(shù)和服務(wù)(另有單獨的一節(jié)介紹),然后嘗試連接JobTracker服務(wù)(即必須實現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù),此過程參見run()方法。 2.3.2.2     TaskTracker.offerService() 主循環(huán) 如果連接JobTracker服務(wù)成功,TaskTracker就會調(diào)用offerService()函數(shù)進入主執(zhí)行循環(huán)中。這個循環(huán)會每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat()獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數(shù)組。再遍歷這個數(shù)組,如果是一個新任務(wù)指令即LaunchTaskAction則調(diào)用startNewTask()函數(shù)執(zhí)行新任務(wù),否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執(zhí)行KillJobAction或者KillTaskAction等。 2.3.2.3     TaskTracker.transmitHeartBeat() 獲取JobTracker指令 在transmitHeartBeat()函數(shù)處理中,TaskTracker會創(chuàng)建一個新的TaskTrackerStatus對象記錄目前任務(wù)的執(zhí)行狀況,然后通過IPC接口調(diào)用JobTracker的heartbeat()方法發(fā)送過去,并接受新的指令,即返回值TaskTrackerAction數(shù)組。在這個調(diào)用之前,TaskTracker會先檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。操作成功后再更新相關(guān)的統(tǒng)計信息等。 2.3.2.4     TaskTracker.startNewTask() 啟動新任務(wù) 此函數(shù)的主要任務(wù)就是創(chuàng)建TaskTracker$TaskInProgress對象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊列中。完成后則調(diào)用localizeJob()真正初始化Task并開始執(zhí)行。 2.3.2.5     TaskTracker.localizeJob() 初始化job目錄等 此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊列中。完成后即調(diào)用launchTaskForJob()開始執(zhí)行Task。 2.3.2.6     TaskTracker.launchTaskForJob() 執(zhí)行任務(wù) 啟動Task的工作實際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來執(zhí)行的。 2.3.2.7     TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù) 執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法最后啟動Task獨立的java執(zhí)行子進程。 2.3.2.8     Task.createRunner() 創(chuàng)建啟動Runner對象 Task有兩個實現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會創(chuàng)建MapTaskRunner來啟動Task子進程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動。 2.3.2.9     TaskRunner.start() 啟動子進程真正執(zhí)行Task 這里是真正啟動子進程并執(zhí)行Task的地方。它會調(diào)用run()函數(shù)來處理。執(zhí)行的過程比較復(fù)雜,主要的工作就是初始化啟動java子進程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等(需要將TaskTracker的環(huán)境變量以及job jar的路徑合并起來)。然后裝載job jar包,調(diào)用runChild()方法啟動子進程,即通過ProcessBuilder來創(chuàng)建,同時子進程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過TaskLog類來實現(xiàn)。這里有個小問題,Task子進程只能輸出INFO級別日志,而且該級別是在run()函數(shù)中直接指定,不過改進也不復(fù)雜。 與Job執(zhí)行過程相關(guān)的類和方法如下圖所示 2.4 JobTracker和TaskTracker 如上面所述,JobTracker和TaskTracker是MapReduce框架最基本的兩個服務(wù),其他所有處理均由它們調(diào)度執(zhí)行,下面簡單介紹它們內(nèi)部提供的服務(wù)及創(chuàng)建的線程,詳細(xì)過程下回分解 2.4.1JobTracker的服務(wù)和線程 JobTracker是MapReduce框架中最主要的類之一,所有job的執(zhí)行都由它來調(diào)度,而且Hadoop系統(tǒng)中只配置一個JobTracker應(yīng)用。啟動JobTracker后它會初始化若干個服務(wù)以及若干個內(nèi)部線程用來維護job的執(zhí)行過程和結(jié)果。下面簡單介紹一下它們。 首先,JobTracker會啟動一個interTrackerServer,端口配置在Configuration中的“mapred.job.tracker”參數(shù),缺省是綁定8012端口。它有兩個用途,一是用于接收和處理TaskTracker的heartbeat等請求,即必須實現(xiàn)InterTrackerProtocol接口及協(xié)議。二是用于接收和處理JobClient的請求,如submitJob,killJob等,即必須實現(xiàn)JobSubmissionProtocol接口及協(xié)議。 其次,它會啟動一個infoServer,運行StatusHttpServer,缺省監(jiān)聽50030端口。是一個web服務(wù),用于給用戶提供web界面查詢job執(zhí)行狀況的服務(wù)。 JobTracker還會啟動多個線程,ExpireLaunchingTasks線程用于停止那些未在超時時間內(nèi)報告進度的Tasks。ExpireTrackers線程用于停止那些可能已經(jīng)當(dāng)?shù)舻腡askTracker,即長時間未報告的TaskTracker將不會再分配新的Task。RetireJobs線程用于清除那些已經(jīng)完成很長時間還存在隊列里的jobs。JobInitThread線程用于初始化job,這在前面章節(jié)已經(jīng)介紹。TaskCommitQueue線程用于調(diào)度Task的那些所有與FileSystem操作相關(guān)的處理,并記錄Task的狀態(tài)等信息。 2.4.2TaskTracker的服務(wù)和線程 TaskTracker也是MapReduce框架中最主要的類之一,它運行于每一臺DataNode節(jié)點上,用于調(diào)度Task的實際運行工作。它內(nèi)部也會啟動一些服務(wù)和線程。 TaskTracker也會啟動一個StatusHttpServer服務(wù)來提供web界面的查詢Task執(zhí)行狀態(tài)的工具。 其次,它還會啟動一個taskReportServer服務(wù),這個用于提供給它的子進程即TaskRunner啟動的MapTask或者ReduceTask向它報告狀況,子進程的啟動命令實現(xiàn)在TaskTracker$Child類中,由TaskRunner.run()通過命令行參數(shù)傳入該服務(wù)地址和端口,即調(diào)用TaskTracker的getTaskTrackerReportAddress(),這個地址會在taskReportServer服務(wù)創(chuàng)建時獲得。 TaskTracker也會啟動一個MapEventsFetcherThread線程用于獲取Map任務(wù)的輸出數(shù)據(jù)信息。 2.5 Job狀態(tài)監(jiān)控 未完待 Hadoop API 使用介紹 2009-11-17 00:57 Hadoop API被分成(divide into)如下幾種主要的包(package) org.apache.hadoop.conf     定義了系統(tǒng)參數(shù)的配置文件處理API。 org.apache.hadoop.fs          定義了抽象的文件系統(tǒng)API。 org.apache.hadoop.dfs       Hadoop分布式文件系統(tǒng)(HDFS)模塊的實現(xiàn)。 org.apache.hadoop.io         定義了通用的I/O API,用于針對網(wǎng)絡(luò),數(shù)據(jù)庫,文件等數(shù)據(jù)對象做讀寫操作。 org.apache.hadoop.ipc       用于網(wǎng)絡(luò)服務(wù)端和客戶端的工具,封裝了網(wǎng)絡(luò)異步I/O的基礎(chǔ)模塊。 org.apache.hadoop.mapred         Hadoop分布式計算系統(tǒng)(MapReduce)模塊的實現(xiàn),包括任務(wù)的分發(fā)調(diào)度等。 org.apache.hadoop.metrics        定義了用于性能統(tǒng)計信息的API,主要用于mapred和dfs模塊。 org.apache.hadoop.record 定義了針對記錄的I/O API類以及一個記錄描述語言翻譯器,用于簡化將記錄序列化成語言中性的格式(language-neutral manner)。 org.apache.hadoop.tools    定義了一些通用的工具。 org.apache.hadoop.util       定義了一些公用的API。 MapReduce框架結(jié)構(gòu)      Map/Reduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型,它最初是由Google工程師設(shè)計并實現(xiàn)的,Google已經(jīng)將它完整的MapReduce論文公開發(fā)布了。其中對它的定義是,Map/Reduce是一個編程模型(programming model),是一個用于處理和生成大規(guī)模數(shù)據(jù)集(processing and generating large data sets)的相關(guān)的實現(xiàn)。用戶定義一個map函數(shù)來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數(shù)將所有這些中間的有著相同key的values合并起來。很多現(xiàn)實世界中的任務(wù)都可用這個模型來表達。 Hadoop的Map/Reduce框架也是基于這個原理實現(xiàn)的,下面簡要介紹一下Map/Reduce框架主要組成及相互的關(guān)系。 2.1 總體結(jié)構(gòu) 2.1.1     Mapper和Reducer 運行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個Combiner類,它實際也是Reducer的實現(xiàn)。 2.1.2    JobTracker和TaskTracker 它們都是由一個master服務(wù)JobTracker和多個運行于多個節(jié)點的slaver服務(wù)TaskTracker兩個類提供的服務(wù)調(diào)度的。master負(fù)責(zé)調(diào)度job的每一個子任務(wù)task運行于slave上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運行它,slave則負(fù)責(zé)直接執(zhí)行每一個task。TaskTracker都需要運行在HDFS的DataNode上,而JobTracker則不需要,一般情況應(yīng)該把JobTracker部署在單獨的機器上。 2.1.3    JobClient 每一個job都會在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)Configuration打包成jar文件存儲在HDFS,并把路徑提交到JobTracker的master服務(wù),然后由master創(chuàng)建每一個Task(即MapTask和ReduceTask)將它們分發(fā)到各個TaskTracker服務(wù)中去執(zhí)行。 2.1.4    JobInProgress JobClient提交job后,JobTracker會創(chuàng)建一個JobInProgress來跟蹤和調(diào)度這個job,并把它添加到j(luò)ob隊列里。JobInProgress會根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個ReduceTask。 2.1.5    TaskInProgress JobTracker啟動任務(wù)時通過每一個TaskInProgress來launch Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設(shè)置好環(huán)境變量后啟動一個獨立的java child進程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。 2.1.6    MapTask和ReduceTask 一個完整的job會自動依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實際也是Reducer接口類的實現(xiàn)。Mapper會根據(jù)job jar中定義的輸入數(shù)據(jù)集按對讀入,處理完成生成臨時的對,如果定義了Combiner,MapTask會在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進程調(diào)用Reducer處理,生成最終結(jié)果對。這個過程在下一部分再詳細(xì)介紹。 下圖描述了Map/Reduce框架中主要組成和它們之間的關(guān)系: 2.2     Job創(chuàng)建過程 2.2.1    JobClient.runJob() 開始運行job并分解輸入數(shù)據(jù)集 一個MapReduce的Job會通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個小數(shù)據(jù)集會對應(yīng)創(chuàng)建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統(tǒng)一打包到j(luò)obFile的jar中并存儲在HDFS中,再將jobFile路徑提交給JobTracker去調(diào)度和執(zhí)行。 2.2.2    JobClient.submitJob() 提交job到JobTracker jobFile的提交過程是通過RPC模塊(有單獨一章來詳細(xì)介紹)來實現(xiàn)的。大致過程是,JobClient類中通過RPC實現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實現(xiàn)JobSubmissionProtocol接口。JobTracker則根據(jù)獲得的jobFile路徑創(chuàng)建與job有關(guān)的一系列對象(即JobInProgress和TaskInProgress等)來調(diào)度并執(zhí)行job。 JobTracker創(chuàng)建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態(tài)信息,如執(zhí)行時間、Map和Reduce任務(wù)完成的比例等。JobClient會根據(jù)這個JobStatus對象創(chuàng)建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執(zhí)行過程的統(tǒng)計數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺。 與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示 2.3     Job執(zhí)行過程 上面已經(jīng)提到,job是統(tǒng)一由JobTracker來調(diào)度的,具體的Task分發(fā)給各個TaskTracker節(jié)點來執(zhí)行。下面通過源碼來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請求開始。 2.3.1    JobTracker初始化Job和Task隊列過程 2.3.1.1     JobTracker.submitJob() 收到請求 當(dāng)JobTracker接收到新的job請求(即submitJob()函數(shù)被調(diào)用)后,會創(chuàng)建一個JobInProgress對象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時候會初始化一系列與任務(wù)有關(guān)的參數(shù),如job jar的位置(會把它從HDFS復(fù)制本地的文件系統(tǒng)中的臨時目錄里),Map和Reduce的數(shù)據(jù),job的優(yōu)先級別,以及記錄統(tǒng)計報告的對象等。 2.3.1.2     JobTracker.resortPriority() 加入隊列并按優(yōu)先級排序 JobInProgress創(chuàng)建后,首先將它加入到j(luò)obs隊列里,分別用一個map成員變量jobs用來管理所有jobs對象,一個list成員變量jobsByPriority用來維護jobs的執(zhí)行優(yōu)先級別。之后JobTracker會調(diào)用resortPriority()函數(shù),將jobs先按優(yōu)先級別排序,再按提交時間排序,這樣保證最高優(yōu)先并且先提交的job會先執(zhí)行。 2.3.1.3     JobTracker.JobInitThread 通知初始化線程 然后JobTracker會把此job加入到一個管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。通過此成員變量調(diào)用notifyAll()函數(shù),會喚起一個用于初始化job的線程JobInitThread來處理(JobTracker會有幾個內(nèi)部的線程來維護jobs隊列,它們的實現(xiàn)都在JobTracker代碼里,稍候再詳細(xì)介紹)。JobInitThread收到信號后即取出最靠前的job,即優(yōu)先級別最高的job,調(diào)用JobInProgress的initTasks()函數(shù)執(zhí)行真正的初始化工作。 2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress Task的初始化過程稍復(fù)雜些,首先步驟JobInProgress會創(chuàng)建Map的監(jiān)控對象。在initTasks()函數(shù)里通過調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點的host,這個會在RawSplit創(chuàng)建時通過FileSplit的getLocations()函數(shù)獲取,該函數(shù)會調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個細(xì)節(jié)會在HDFS模塊中講解)。當(dāng)然如果是存儲在本地文件系統(tǒng)中,即使用LocalFileSystem時當(dāng)然只有一個location即“localhost”了。 其次JobInProgress會創(chuàng)建Reduce的監(jiān)控對象,這個比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的也是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。 JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束,執(zhí)行則是通過另一異步的方式處理的,下面接著介紹它。 與初始化Job過程相關(guān)的類和方法如下圖所示 2.3.2TaskTracker執(zhí)行Task的過程 Task的執(zhí)行實際是由TaskTracker發(fā)起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會在這時啟動,即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時進行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口(在IPC章節(jié)詳細(xì)介紹)實現(xiàn)。這個過程實際比較復(fù)雜,下面一一簡單介紹下每個步驟。 2.3.2.1     TaskTracker.run() 連接JobTracker TaskTracker的啟動過程會初始化一系列參數(shù)和服務(wù)(另有單獨的一節(jié)介紹),然后嘗試連接JobTracker服務(wù)(即必須實現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù),此過程參見run()方法。 2.3.2.2     TaskTracker.offerService() 主循環(huán) 如果連接JobTracker服務(wù)成功,TaskTracker就會調(diào)用offerService()函數(shù)進入主執(zhí)行循環(huán)中。這個循環(huán)會每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat()獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數(shù)組。再遍歷這個數(shù)組,如果是一個新任務(wù)指令即LaunchTaskAction則調(diào)用startNewTask()函數(shù)執(zhí)行新任務(wù),否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執(zhí)行KillJobAction或者KillTaskAction等。 2.3.2.3     TaskTracker.transmitHeartBeat() 獲取JobTracker指令 在transmitHeartBeat()函數(shù)處理中,TaskTracker會創(chuàng)建一個新的TaskTrackerStatus對象記錄目前任務(wù)的執(zhí)行狀況,然后通過IPC接口調(diào)用JobTracker的heartbeat()方法發(fā)送過去,并接受新的指令,即返回值TaskTrackerAction數(shù)組。在這個調(diào)用之前,TaskTracker會先檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。操作成功后再更新相關(guān)的統(tǒng)計信息等。 2.3.2.4     TaskTracker.startNewTask() 啟動新任務(wù) 此函數(shù)的主要任務(wù)就是創(chuàng)建TaskTracker$TaskInProgress對象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊列中。完成后則調(diào)用localizeJob()真正初始化Task并開始執(zhí)行。 2.3.2.5     TaskTracker.localizeJob() 初始化job目錄等 此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊列中。完成后即調(diào)用launchTaskForJob()開始執(zhí)行Task。 2.3.2.6     TaskTracker.launchTaskForJob() 執(zhí)行任務(wù) 啟動Task的工作實際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來執(zhí)行的。 2.3.2.7     TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù) 執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法最后啟動Task獨立的java執(zhí)行子進程。 2.3.2.8     Task.createRunner() 創(chuàng)建啟動Runner對象 Task有兩個實現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會創(chuàng)建MapTaskRunner來啟動Task子進程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動。 2.3.2.9     TaskRunner.start() 啟動子進程真正執(zhí)行Task 這里是真正啟動子進程并執(zhí)行Task的地方。它會調(diào)用run()函數(shù)來處理。執(zhí)行的過程比較復(fù)雜,主要的工作就是初始化啟動java子進程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等(需要將TaskTracker的環(huán)境變量以及job jar的路徑合并起來)。然后裝載job jar包,調(diào)用runChild()方法啟動子進程,即通過ProcessBuilder來創(chuàng)建,同時子進程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過TaskLog類來實現(xiàn)。這里有個小問題,Task子進程只能輸出INFO級別日志,而且該級別是在run()函數(shù)中直接指定,不過改進也不復(fù)雜。 與Job執(zhí)行過程相關(guān)的類和方法如下圖所示 2.4 JobTracker和TaskTracker 如上面所述,JobTracker和TaskTracker是MapReduce框架最基本的兩個服務(wù),其他所有處理均由它們調(diào)度執(zhí)行,下面簡單介紹它們內(nèi)部提供的服務(wù)及創(chuàng)建的線程,詳細(xì)過程下回分解 2.4.1JobTracker的服務(wù)和線程 JobTracker是MapReduce框架中最主要的類之一,所有job的執(zhí)行都由它來調(diào)度,而且Hadoop系統(tǒng)中只配置一個JobTracker應(yīng)用。啟動JobTracker后它會初始化若干個服務(wù)以及若干個內(nèi)部線程用來維護job的執(zhí)行過程和結(jié)果。下面簡單介紹一下它們。 首先,JobTracker會啟動一個interTrackerServer,端口配置在Configuration中的“mapred.job.tracker”參數(shù),缺省是綁定8012端口。它有兩個用途,一是用于接收和處理TaskTracker的heartbeat等請求,即必須實現(xiàn)InterTrackerProtocol接口及協(xié)議。二是用于接收和處理JobClient的請求,如submitJob,killJob等,即必須實現(xiàn)JobSubmissionProtocol接口及協(xié)議。 其次,它會啟動一個infoServer,運行StatusHttpServer,缺省監(jiān)聽50030端口。是一個web服務(wù),用于給用戶提供web界面查詢job執(zhí)行狀況的服務(wù)。 JobTracker還會啟動多個線程,ExpireLaunchingTasks線程用于停止那些未在超時時間內(nèi)報告進度的Tasks。ExpireTrackers線程用于停止那些可能已經(jīng)當(dāng)?shù)舻腡askTracker,即長時間未報告的TaskTracker將不會再分配新的Task。RetireJobs線程用于清除那些已經(jīng)完成很長時間還存在隊列里的jobs。JobInitThread線程用于初始化job,這在前面章節(jié)已經(jīng)介紹。TaskCommitQueue線程用于調(diào)度Task的那些所有與FileSystem操作相關(guān)的處理,并記錄Task的狀態(tài)等信息。 2.4.2TaskTracker的服務(wù)和線程 TaskTracker也是MapReduce框架中最主要的類之一,它運行于每一臺DataNode節(jié)點上,用于調(diào)度Task的實際運行工作。它內(nèi)部也會啟動一些服務(wù)和線程。 TaskTracker也會啟動一個StatusHttpServer服務(wù)來提供web界面的查詢Task執(zhí)行狀態(tài)的工具。 其次,它還會啟動一個taskReportServer服務(wù),這個用于提供給它的子進程即TaskRunner啟動的MapTask或者ReduceTask向它報告狀況,子進程的啟動命令實現(xiàn)在TaskTracker$Child類中,由TaskRunner.run()通過命令行參數(shù)傳入該服務(wù)地址和端口,即調(diào)用TaskTracker的getTaskTrackerReportAddress(),這個地址會在taskReportServer服務(wù)創(chuàng)建時獲得。 TaskTracker也會啟動一個MapEventsFetcherThread線程用于獲取Map任務(wù)的輸出數(shù)據(jù)信息。 2.5 Job狀態(tài)監(jiān)控 未完待

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多