為了不遺忘和可以速查源碼,準(zhǔn)備重新讀一遍Hadoop的MapReduce部分的源碼,記錄下來(lái),盡量詳細(xì)點(diǎn)。如要轉(zhuǎn)載,請(qǐng)標(biāo)明出處。
寫MapReduce程序首先接觸的是Job類,Job類是管理一個(gè)集群作業(yè)的類,包含了一個(gè)作業(yè)的所有信息和向集群提交作業(yè)的方法。

如圖所示,它有以上一些方法,我們寫程序是調(diào)用waitForCompletion()方法,方法實(shí)現(xiàn)如下:
- public boolean waitForCompletion(boolean verbose
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- if (state == JobState.DEFINE) {
- submit();
- }
- if (verbose) {
- jobClient.monitorAndPrintJob(conf, info);
- } else {
- info.waitForCompletion();
- }
- return isSuccessful();
- }
它調(diào)用了submit向集群提交作業(yè),下面看下submit()方法:
- public void submit() throws IOException, InterruptedException,
- ClassNotFoundException {
- ensureState(JobState.DEFINE);
- 建立新的API,檢查兼容性
- setUseNewAPI();
- info = jobClient.submitJobInternal(conf);
- state = JobState.RUNNING;
- }
jobClient是在初始化時(shí)候建立的。
- public Job(Configuration conf) throws IOException {
- super(conf, null);
- jobClient = new JobClient((JobConf) getConfiguration());
- }
JobClient類 建立了一個(gè)代理,用于連接JobTracker(集群上的master結(jié)點(diǎn)),
- public JobClient(JobConf conf) throws IOException {
- setConf(conf);
- init(conf);
- }
-
-
-
-
-
- public void init(JobConf conf) throws IOException {
- String tracker = conf.get("mapred.job.tracker", "local");
- if ("local".equals(tracker)) {
- this.jobSubmitClient = new LocalJobRunner(conf);
- } else {
- this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
- }
- }
這個(gè)代理會(huì)檢查mapred.job.tracker 這個(gè)屬性有沒有建立,默認(rèn)值是local,如果建立了,則建立一個(gè)連接JobTracker的代理。這個(gè)代理負(fù)責(zé)上傳作業(yè)的配置和作業(yè)內(nèi)容到集群中。
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
- }
發(fā)現(xiàn)他實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)對(duì)象
- public static VersionedProtocol getProxy(Class<?> protocol,
- long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
-
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- } else {
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
總之,Job類使用了一個(gè)實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)代理,這個(gè)代理對(duì)象可以用來(lái)和集群通信,job類的一些方法也可以用來(lái)幫助我們對(duì)集群和任務(wù)的進(jìn)展情況進(jìn)行查看。
|