一、Spark Streaming概述 Spark是美國加州伯克利大學AMP實驗室推出的新一代分布式計算框架,其核心概念是RDD,一個只讀的、有容錯機制的分布式數(shù)據(jù)集,RDD可以全部緩存在內(nèi)存中,并在多次計算中重復使用。相比于MapReduce編程模型,Spark具有以下幾個優(yōu)點:
SparkStreaming是Spark生態(tài)系統(tǒng)中的重要組成部分,在實現(xiàn)上復用Spark計算引擎。如圖1所示,Spark Streaming支持的數(shù)據(jù)源有很多,如Kafka、Flume、TCP等。SparkStreaming內(nèi)部的數(shù)據(jù)表示形式為DStream(DiscretizedStream,離散的數(shù)據(jù)流),其接口設(shè)計與RDD非常相似,這使得它對Spark用戶非常友好。SparkStreaming的核心思想是把流式處理轉(zhuǎn)化為“微批處理”,即以時間為單位切分數(shù)據(jù)流,每個切片內(nèi)的數(shù)據(jù)對應一個RDD,進而可以采用Spark引擎進行快速計算。由于SparkStreaming采用了微批處理方式,是近實時的處理系統(tǒng),而不是嚴格意義上的流式處理系統(tǒng)。 圖1:Spark Streaming數(shù)據(jù)流 另一個著名的開源流式計算引擎是Storm,這是一個真正的流式處理系統(tǒng),它每次從數(shù)據(jù)源讀一條數(shù)據(jù),然后單獨處理。相比于SparkStreaming,Storm有更快速的響應時間(小于一秒),更適合低延遲的應用場景,比如信用卡欺詐系統(tǒng),廣告系統(tǒng)等。相比之下,SparkStreaming的優(yōu)勢是吞吐量大,響應時間也可以接受(秒級),并且兼容Spark系統(tǒng)中的其他工具庫如MLlib和GraphX。對于時間不敏感且流量很大的系統(tǒng),Spark Streaming是更優(yōu)的選擇。
二、Spark Streaming在Hulu應用 Hulu是美國的專業(yè)在線視頻網(wǎng)站,每天會有大量用戶在線觀看視頻,進而產(chǎn)生大量用戶觀看行為數(shù)據(jù),這些數(shù)據(jù)通過收集系統(tǒng)進入Hulu的大數(shù)據(jù)平臺,從而進行存儲和進一步處理。在大數(shù)據(jù)平臺之上,各個團隊會根據(jù)需要設(shè)計相應的算法對數(shù)據(jù)進行分析和挖掘以便產(chǎn)生商業(yè)價值:推薦團隊從這些數(shù)據(jù)里挖掘出用戶感興趣的內(nèi)容并做精準推薦,廣告團隊根據(jù)用戶的歷史行為推送最合適的廣告,數(shù)據(jù)團隊從數(shù)據(jù)的各個維度進行分析從而為公司的策略制定提供可靠依據(jù)。 Hulu大數(shù)據(jù)平臺的實現(xiàn)依循Lambda架構(gòu)。Lambda架構(gòu)是一個通用的大數(shù)據(jù)處理框架,包含離線的批處理層、在線的加速層和服務層三部分,具體如圖2所示。服務層一般使用HTTP服務或自定制的客戶端對外提供數(shù)據(jù)訪問,離線的批處理層一般使用批處理計算框架Spark和MapReduce進行數(shù)據(jù)分析,在線的加速層一般使用流式實時計算框架SparkStreaming和Storm進行數(shù)據(jù)分析。 圖2:lambda架構(gòu)原理圖 對于實時計算部分,Hulu內(nèi)部使用了Kafka、Codis和SparkStreaming。下面按照數(shù)據(jù)流的過程,介紹我們的項目。 1. 收集數(shù)據(jù) - 從服務器日志中收集數(shù)據(jù),流程如圖3所示:
圖3:Hulu數(shù)據(jù)收集流程 2. 存儲標簽數(shù)據(jù) - Hulu使用HBase存儲用戶標簽數(shù)據(jù),包括基本信息如性別、年齡、是否付費,以及其他模型推測出來的偏好屬性。這些屬性需要作為計算模型的輸入,同時HBase隨機讀取的速度比較慢,所以需要將數(shù)據(jù)同步到緩存服務器中以加快數(shù)據(jù)讀取速度。Redis是一個應用廣泛的開源緩存服務器,一個免費開源的高性能Key-Value數(shù)據(jù)庫,但其本身是個單機系統(tǒng),不能很好地支持大量數(shù)據(jù)的緩存。為解決Redis擴展性差的問題,豌豆莢開源了Codis,一個分布式Redis解決方案。Hulu將Codis打成Docker鏡像,并實現(xiàn)一鍵式構(gòu)建緩存系統(tǒng),附帶自動監(jiān)控和修復功能。為了更精細的監(jiān)控,我們構(gòu)建了多個Codis緩存,分別是:
3. 實時處理數(shù)據(jù) - 準備就緒,啟動Spark Streaming程序: 1) SparkStreaming啟動Kafka Receiver,持續(xù)地從Kafka服務器拉取數(shù)據(jù); 2) 每隔兩秒,Kafka的數(shù)據(jù)被整理成一個RDD,交給Spark引擎處理; 3) 對一條用戶行為,Spark會從codis-action緩存中拿到該用戶的行為記錄,然后把新的行為追加進去; 4) Spark從codis-action和codis-profile中獲得該用戶的所有相關(guān)屬性,然后執(zhí)行廣告和推薦的計算模型,最后把結(jié)果寫入codis-result,進而供服務層實時讀取這些結(jié)果。
三、Spark Streaming優(yōu)化經(jīng)驗 實踐中,業(yè)務邏輯首先保證完成,使得在Kafka輸入數(shù)據(jù)量較小的情況下系統(tǒng)穩(wěn)定運行,且輸入輸出滿足項目需求。然后開始調(diào)優(yōu),修改SparkStreaming的參數(shù),比如Executor的數(shù)量,Core的數(shù)量,Receiver的流量等。最后發(fā)現(xiàn)僅調(diào)參數(shù)無法完全滿足本項目的業(yè)務場景,所以有更進一步的優(yōu)化方案,總結(jié)如下: 1. Executor初始化 很多機器學習的模型在第一次運行時,需要執(zhí)行初始化方法,還會連接外部的數(shù)據(jù)庫,常常需要5-10分鐘,這會成為潛在的不穩(wěn)定因素。在Spark Streaming應用中,當Receiver完成初始化,它就開始源源不斷地接收數(shù)據(jù),并且由Driver定期調(diào)度任務消耗這些數(shù)據(jù)。如果剛啟動時Executor需要幾分鐘做準備,會導致第一個作業(yè)一直沒有完成,這段時間內(nèi)Driver不會調(diào)度新的作業(yè)。這時候在Kafka Receiver端會有數(shù)據(jù)積壓,隨著積壓的數(shù)據(jù)量越來越大,大部分數(shù)據(jù)會撐過新生代進入老年代,進而給Java GC帶來嚴重的壓力,容易引發(fā)應用程序崩潰。 本項目的解決方案是,修改Spark內(nèi)核,在每個Executor接收任務之前先執(zhí)行一個用戶自定義的初始化函數(shù),初始化函數(shù)中可以執(zhí)行一些獨立的用戶邏輯。示例代碼如下:
該方案需要更改Spark的任務調(diào)度器,首先將每個Executor設(shè)置為未初始化狀態(tài)。此時,調(diào)度器只會給未初始化狀態(tài)的Executor分配初始化任務(執(zhí)行前面提到的初始化函數(shù))。等初始化任務完畢,調(diào)度器更新Executor的狀態(tài)為已初始化,這樣的Executor才可以分配正常的計算任務。 2. 異步處理Task中的業(yè)務邏輯 本項目中,模型的輸入?yún)?shù)均來自Codis,甚至模型內(nèi)部也可能訪問外部存儲,直接導致模型計算時長不穩(wěn)定,很多時間消耗在網(wǎng)絡等待上。 為提高系統(tǒng)吞吐量,增大并行度是比較通用的優(yōu)化方案,但在本項目的場景中并不適用。因為Spark作業(yè)的調(diào)度策略是,等待上一個作業(yè)的所有Task執(zhí)行完畢,然后調(diào)度下一個作業(yè)。如果單個Task的運行時間不穩(wěn)定,易發(fā)生個別Task拖慢整個作業(yè)的情況,以至于資源利用率不高,系統(tǒng)吞吐量上不去;甚至并行度越大,該問題越嚴重。一種常用的解決Task不穩(wěn)定的方案是增大Spark Streaming的micro batch的時間間隔,該方案會使整個實時系統(tǒng)的延遲變長,并不推薦。 該問題的解決方案是異步處理Task中的業(yè)務邏輯。如下文的代碼所示,同步方案中,Task內(nèi)執(zhí)行業(yè)務邏輯,處理時間不定;異步方案中,Task把業(yè)務邏輯嵌入線程,交給線程池執(zhí)行,Task立刻結(jié)束,Executor向Driver報告執(zhí)行完畢,異步處理的時間非常短,在100ms以內(nèi)。另外,當線程池中積壓的線程數(shù)量太大時(代碼中qsize>100的情況),會暫時使用同步處理,配合反壓機制(見下文的參數(shù)spark.streaming.backpressure.enabled),可以保證不會因為數(shù)據(jù)積壓過多而導致系統(tǒng)崩潰。為設(shè)置合適的線程池大小,我們借助JVisualVM工具監(jiān)控Executor的CPU使用率,通過調(diào)整參數(shù)找到最優(yōu)并發(fā)線程數(shù)。經(jīng)實驗驗證,該方案大大提高了系統(tǒng)的吞吐量。
異步化Task也存在缺點:如果Executor發(fā)生異常,存放在線程池中的業(yè)務邏輯無法重新計算,會導致部分數(shù)據(jù)丟失。經(jīng)實驗驗證,僅當Executor異常崩潰時有數(shù)據(jù)丟失,且不常見,在本項目的場景中可以接受。 3. Kafka Receiver的穩(wěn)定性 本項目使用了SparkStreaming中的Kafka Receiver,本質(zhì)上調(diào)用Kafka官方的客戶端ZookeeperConsumerConnector。其策略是每個客戶端在Zookeeper的固定路徑下把自己注冊為臨時節(jié)點,于是所有客戶端都知道其他客戶端的存在,然后自動協(xié)調(diào)和分配Kafka的數(shù)據(jù)資源。該策略存在一個弊端,當一個客戶端與Zookeeper的連接狀態(tài)發(fā)生改變(斷開或者連上),所有的客戶端都會通過Zookeeper協(xié)調(diào),重新分配Kafka的數(shù)據(jù)資源;在此期間所有客戶端都斷開與Kafka的連接,系統(tǒng)接收不到Kafka的數(shù)據(jù),直到重新分配成功。如果網(wǎng)絡質(zhì)量不佳,并且Receiver的個數(shù)較多,這種策略會造成數(shù)據(jù)輸入不穩(wěn)定,很多SparkStreaming用戶遇到這樣的問題。在我們的系統(tǒng)中,該策略并沒有產(chǎn)生明顯的負面影響。值得注意的是,Kafka 客戶端與Zookeeper有個默認的參數(shù)zookeeper.session.timeout.ms=6000,表示客戶端與Zookeeper連接的session有效時間為6秒,我們的客戶端多次出現(xiàn)因為Full GC超過6秒而與Zookeeper斷開連接,之后再次連接上,期間所有客戶端都受到影響,系統(tǒng)表現(xiàn)不穩(wěn)定。所以項目中設(shè)置參數(shù)zookeeper.session.timeout.ms=30000。 4. YARN資源搶占問題 在Hulu內(nèi)部,Spark Streaming這樣的長時服務與MapRedue,Spark,Hive等批處理應用共享YARN集群資源。在共享環(huán)境中,經(jīng)常因一個批處理應用占用大量網(wǎng)絡資源或者CPU資源,導致Spark Streaming服務不穩(wěn)定(盡管我們采用了CGroup進行資源隔離,但效果不佳)。更嚴重的問題是,如果個別Container崩潰Driver需要向YARN申請新的Container,或者如果整個應用崩潰需要重啟,SparkStreaming不能保證很快申請到足夠的資源,也就無法保證線上服務的質(zhì)量。為解決該問題,Hulu使用label-based scheduling的調(diào)度策略,從YARN集群中隔離出若干節(jié)點專門運行SparkStreaming和其他長時服務,避免與批處理程序競爭資源。 5. 完善監(jiān)控信息 監(jiān)控反映系統(tǒng)運行的性能狀態(tài),也是一切優(yōu)化的基礎(chǔ)。SparkStreaming Web界面提供了比較豐富的監(jiān)控信息,同時本項目依據(jù)業(yè)務邏輯的特點增加了更多監(jiān)控。Hulu使用Graphite和Grafana作為第三方監(jiān)控系統(tǒng),本項目把系統(tǒng)中關(guān)鍵的性能參數(shù)(如計算時長和次數(shù))發(fā)送給Graphite服務器,就能夠在Grafana網(wǎng)頁上看到直觀的統(tǒng)計圖。
圖4是統(tǒng)計Kafka中日志的剩余數(shù)量,一條線對應于一個partition的歷史余量,大部分情況下余量接近零,符合預期。圖中09:55左右日志余量開始出現(xiàn)很明顯的尖峰,之后又迅速逼近零。事后經(jīng)過多種數(shù)據(jù)核對,證實Kafka的數(shù)據(jù)一直穩(wěn)定,而當時Spark Streaming執(zhí)行作業(yè)突然變慢,反壓機制生效,于是Kafka Receiver減小讀取日志的速率,造成Kafka數(shù)據(jù)積壓;一段時間之后SparkStreaming又恢復正常,快速消耗了Kafka中的數(shù)據(jù)余量。 直觀的監(jiān)控系統(tǒng)能有效地暴露問題,進而理解和強化系統(tǒng)。對于不同的業(yè)務邏輯,需要監(jiān)控的信息也不相同。在我們的實踐中,主要的監(jiān)控指標有:
另外,有腳本定期分析這些統(tǒng)計數(shù)據(jù),出現(xiàn)異常則發(fā)郵件報警。比如圖4中 Kafka 的日志余量過大時,會有連續(xù)的報警郵件。我們的經(jīng)驗是,監(jiān)控越細致,之后的優(yōu)化工作越輕松。同時,優(yōu)秀的監(jiān)控也需要對系統(tǒng)深刻的理解。 6. 參數(shù)優(yōu)化 下表列出本項目中比較關(guān)鍵的幾個參數(shù):
四、總結(jié) Spark Streaming的產(chǎn)品上線運行一年多,期間進行了多次Spark版本升級,從最早期的0.8版本到最近的1.5.x版本。總體上Spark Streaming是一款優(yōu)秀的實時計算框架,可以在線上使用。但仍然存在一些不足,包括: 1. Spark同時使用堆內(nèi)和堆外的內(nèi)存,缺乏一些有效的監(jiān)控信息,遇到OOM時分析和調(diào)試比較困難; 2. 缺少Executor初始化接口; 3. Spark采用函數(shù)式編程方式,抽象層次高,好處是使用方便,壞處是理解和優(yōu)化困難; 4. 新版本的Spark有一些異常,如Shuffle過程中Block丟失、內(nèi)存溢出。 |
|