a. Spark Streaming Job 架構(gòu)和運(yùn)行機(jī)制 注:本講內(nèi)容基于Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節(jié)回顧:上節(jié)課談到Spark Streaming是基于DStream編程。DStream是邏輯級別的,而RDD是物理級別的。DStream是隨著時間的流動內(nèi)部將集合封裝RDD。對DStream的操作,歸根結(jié)底還是對其RDD進(jìn)行的操作。 如果將Spark Streaming放在坐標(biāo)系中,并以Y軸表示對RDD的操作,RDD的依賴關(guān)系構(gòu)成了整個job的邏輯應(yīng)用,以X軸作為時間。隨著時間的流逝,以固定的時間間隔(Batch Interval)產(chǎn)生一個個job實(shí)例,進(jìn)而在集群中運(yùn)行。 同時也為大家詳細(xì)總結(jié)并揭秘 Spark Streaming五大核心特征:特征1:邏輯管理、特征2:時間管理、特征3:流式輸入和輸出、特征4:高容錯、特征5:事務(wù)處理。最后結(jié)合Spark Streaming源碼做了進(jìn)一步解析。 ** 開講** 由上一講可以得知,以固定的時間間隔(Batch Interval)產(chǎn)生一個個job實(shí)例。那么在時間維度和空間維度組成的時空維度的Spark Streaming中,Job的架構(gòu)和運(yùn)行機(jī)制、及其容錯架構(gòu)和運(yùn)行機(jī)制是怎樣的呢? 那我們從愛因斯坦的相對時空講起吧: a、時間和空間是緊密聯(lián)系的統(tǒng)一體,也稱為時空連續(xù)體。 就是說Job的實(shí)例產(chǎn)生和單向流動的時間之間,沒有必然的聯(lián)系;在這里時間只是一種假象。 怎么更好的理解這句話呢?那我們就得從以下方面為大家逐步解答。 什么是Spark Streaming Job 架構(gòu)和運(yùn)行機(jī)制 ? 對于一般的Spark應(yīng)用程序來說,是RDD的action操作觸發(fā)了Job的運(yùn)行。那對于SparkStreaming來說,Job是怎么樣運(yùn)行的呢?我們在編寫SparkStreaming程序的時候,設(shè)置了BatchDuration,Job每隔BatchDuration時間會自動觸發(fā),這個功能是Spark Streaming框架提供了一個定時器,時間一到就將編寫的程序提交給Spark,并以Spark job的方式運(yùn)行。 通過案例透視Job架構(gòu)和運(yùn)行機(jī)制 案例代碼如下: 將上述代碼打成JAR包,再上傳到集群中運(yùn)行 集群中運(yùn)行結(jié)果如下 運(yùn)行過程總圖如下 案例詳情解析 a、 首先通過StreamingContext調(diào)用start方法,其內(nèi)部再啟動JobScheduler的Start方法,進(jìn)行消息循環(huán); (StreamingContext.scala,610行代碼) b、 在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker; (JobScheduler.scala,82、83行代碼) c、 然后調(diào)用JobGenerator和ReceiverTacker的start方法執(zhí)行以下操作: (JobScheduler.scala,79、98行代碼)
(JobScheduler.scala,208行代碼)
1.對Receiver的運(yùn)行進(jìn)行管理,ReceiverTracker啟動時會調(diào)用lanuchReceivers()方法,進(jìn)而會使用rpc通信啟動Receiver(實(shí)際代碼中,Receiver外面還有一層包裝ReceiverSupervisor實(shí)現(xiàn)高可用) (ReceiverTracker.scala,423行代碼) 2.管理Receiver的元數(shù)據(jù),供Job對數(shù)據(jù)進(jìn)行索引,元數(shù)據(jù)的核心結(jié)構(gòu)是receivedBlockTracker (ReceiverTracker.scala,106~112行代碼) d、 在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor的BlockManager中 ; e、 同時把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息; 這里面涉及到兩個Job的概念: 每個BatchInterval會產(chǎn)生一個具體的Job,其實(shí)這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實(shí)例,此時要想運(yùn)行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨(dú)的線程來提交Job到集群運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行) 為什么使用線程池呢? a 、作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙; Spark Streaming Job 容錯架構(gòu)和運(yùn)行機(jī)制 Spark Streaming是基于DStream的容錯機(jī)制,DStream是隨著時間流逝不斷的產(chǎn)生RDD,也就是說DStream是在固定的時間上操作RDD,容錯會劃分到每一次所形成的RDD。 Spark Streaming的容錯包括 Executor 與 Driver兩方面的容錯機(jī)制 : a、 Executor 容錯: 1. 數(shù)據(jù)接收:分布式方式、wal方式,先寫日志再保存數(shù)據(jù)到Executor 2. 任務(wù)執(zhí)行安全性 Job基于RDD容錯 : b、Driver容錯 : checkpoint 。 基于RDD的特性,它的容錯機(jī)制主要就是兩種: 1. 基于checkpoint; 在stage之間,是寬依賴,產(chǎn)生了shuffle操作,lineage鏈條過于復(fù)雜和冗長,這時候就需要做checkpoint。 2. 基于lineage(血統(tǒng))的容錯: 一般而言,spark選擇血統(tǒng)容錯,因?yàn)閷τ诖笠?guī)模的數(shù)據(jù)集,做檢查點(diǎn)的成本很高。 考慮到RDD的依賴關(guān)系,每個stage內(nèi)部都是窄依賴,此時一般基于lineage容錯,方便高效。 總結(jié): stage內(nèi)部做lineage,stage之間做checkpoint。 有興趣想學(xué)習(xí)國內(nèi)頂級整套Spark+Spark Streaming+Machine learning課程的,歡迎加我qq 471186150。共享視頻,性價比超高! |
|