乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Hadoop源碼之我見

       gerial 2011-11-23

      為了不遺忘和可以速查源碼,準(zhǔn)備重新讀一遍Hadoop的MapReduce部分的源碼,記錄下來(lái),盡量詳細(xì)點(diǎn)。如要轉(zhuǎn)載,請(qǐng)標(biāo)明出處。

       

      寫MapReduce程序首先接觸的是Job類,Job類是管理一個(gè)集群作業(yè)的類,包含了一個(gè)作業(yè)的所有信息和向集群提交作業(yè)的方法。

       

       

      如圖所示,它有以上一些方法,我們寫程序是調(diào)用waitForCompletion()方法,方法實(shí)現(xiàn)如下:

       

       

      1. public boolean waitForCompletion(boolean verbose  
      2.                                  ) throws IOException, InterruptedException,  
      3.                                           ClassNotFoundException {  
      4.   if (state == JobState.DEFINE) {  
      5.     submit();  
      6.   }  
      7.   if (verbose) {  
      8.     jobClient.monitorAndPrintJob(conf, info);  
      9.   } else {  
      10.     info.waitForCompletion();  
      11.   }  
      12.   return isSuccessful();  
      13. }  
       

       

       

      它調(diào)用了submit向集群提交作業(yè),下面看下submit()方法:

       

       

      1. public void submit() throws IOException, InterruptedException,   
      2.                             ClassNotFoundException {  
      3.   ensureState(JobState.DEFINE);  
      4. 建立新的API,檢查兼容性   
      5.   setUseNewAPI();  
      6.   info = jobClient.submitJobInternal(conf);  
      7.   state = JobState.RUNNING;  
      8.  }  
       

       

      jobClient是在初始化時(shí)候建立的。

       

      1. public Job(Configuration conf) throws IOException {  
      2.   super(conf, null);  
      3.   jobClient = new JobClient((JobConf) getConfiguration());  
      4. }  
       

       

      JobClient類 建立了一個(gè)代理,用于連接JobTracker(集群上的master結(jié)點(diǎn)),

       

      1. public JobClient(JobConf conf) throws IOException {  
      2.   setConf(conf);  
      3.   init(conf);  
      4. }  
      5. /** 
      6.  * Connect to the default {@link JobTracker}. 
      7.  * @param conf the job configuration. 
      8.  * @throws IOException 
      9.  */  
      10. public void init(JobConf conf) throws IOException {  
      11.   String tracker = conf.get("mapred.job.tracker""local");  
      12.   if ("local".equals(tracker)) {  
      13.     this.jobSubmitClient = new LocalJobRunner(conf);  
      14.   } else {  
      15.     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
      16.   }          
      17. }  
       

       

      這個(gè)代理會(huì)檢查mapred.job.tracker 這個(gè)屬性有沒有建立,默認(rèn)值是local,如果建立了,則建立一個(gè)連接JobTracker的代理。這個(gè)代理負(fù)責(zé)上傳作業(yè)的配置和作業(yè)內(nèi)容到集群中。

       

      1. private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,  
      2.     Configuration conf) throws IOException {  
      3.   return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,  
      4.       JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,  
      5.       NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));  
      6. }  
       

      發(fā)現(xiàn)他實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)對(duì)象

      1. public static VersionedProtocol getProxy(Class<?> protocol,  
      2.     long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,  
      3.     Configuration conf, SocketFactory factory) throws IOException {      
      4.       
      5.   VersionedProtocol proxy =  
      6.       (VersionedProtocol) Proxy.newProxyInstance(  
      7.           protocol.getClassLoader(), new Class[] { protocol },  
      8.           new Invoker(addr, ticket, conf, factory));  
      9.   long serverVersion = proxy.getProtocolVersion(protocol.getName(),   
      10.                                                 clientVersion);  
      11.   if (serverVersion == clientVersion) {  
      12.     return proxy;  
      13.   } else {  
      14.     throw new VersionMismatch(protocol.getName(), clientVersion,   
      15.                               serverVersion);  
      16.   }  
      17. }  
       

       

       

      總之,Job類使用了一個(gè)實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)代理,這個(gè)代理對(duì)象可以用來(lái)和集群通信,job類的一些方法也可以用來(lái)幫助我們對(duì)集群和任務(wù)的進(jìn)展情況進(jìn)行查看。

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多