乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Spark Streaming實踐和優(yōu)化

       melon1024 2016-08-16







      一、Spark Streaming概述

      Spark是美國加州伯克利大學AMP實驗室推出的新一代分布式計算框架,其核心概念是RDD,一個只讀的、有容錯機制的分布式數(shù)據(jù)集,RDD可以全部緩存在內(nèi)存中,并在多次計算中重復使用。相比于MapReduce編程模型,Spark具有以下幾個優(yōu)點:

      1. 更大的靈活性和更高的抽象層次,使得用戶用更少的代碼即可實現(xiàn)同樣的功能;

      2. 適合迭代算法,在MapReduce編程模型中,每一輪迭代都需要讀寫一次HDFS,磁盤IO負載很大,相比之下,Spark中的RDD可以緩存在內(nèi)存中,只需第一次讀入HDFS文件,之后迭代的數(shù)據(jù)全都存儲在內(nèi)存中,這使得程序的計算速度可提升約10-100倍。

      SparkStreamingSpark生態(tài)系統(tǒng)中的重要組成部分,在實現(xiàn)上復用Spark計算引擎。如圖1所示,Spark Streaming支持數(shù)據(jù)源有很多,如Kafka、Flume、TCP等。SparkStreaming內(nèi)部的數(shù)據(jù)表示形式為DStreamDiscretizedStream離散的數(shù)據(jù)流,接口設(shè)計與RDD非常相似,這使得它Spark用戶非常友好。SparkStreaming的核心思想是把流式處理轉(zhuǎn)化為“微批處理”,即以時間為單位切分數(shù)據(jù)流,每個切片內(nèi)的數(shù)據(jù)對應一個RDD,進而可以采用Spark引擎進行快速計算。由于SparkStreaming采用了微批處理方式,是近實時的處理系統(tǒng),而不是嚴格意義上的流式處理系統(tǒng)。

      1Spark Streaming數(shù)據(jù)流

              另一個著名的開源流式計算引擎是Storm,這是一個真正的流式處理系統(tǒng),它每次從數(shù)據(jù)源讀一條數(shù)據(jù),然后單獨處理。相比于SparkStreaming,Storm有更快速的響應時間(小于一秒),更適合低延遲的應用場景,比如信用卡欺詐系統(tǒng),廣告系統(tǒng)等。相比之下,SparkStreaming的優(yōu)勢是吞吐量大,響應時間也可以接受(秒級),并且兼容Spark系統(tǒng)中的其他工具庫如MLlibGraphX。對于時間不敏感且流量很大的系統(tǒng),Spark Streaming是更優(yōu)的選擇。

       

      二、Spark StreamingHulu應用

      Hulu是美國的專業(yè)在線視頻網(wǎng)站,每天會有大量用戶在線觀看視頻,進而產(chǎn)生大量用戶觀看行為數(shù)據(jù),這些數(shù)據(jù)通過收集系統(tǒng)進入Hulu的大數(shù)據(jù)平臺,從而進行存儲和進一步處理。在大數(shù)據(jù)平臺之上,各個團隊會根據(jù)需要設(shè)計相應的算法對數(shù)據(jù)進行分析和挖掘以便產(chǎn)生商業(yè)價值:推薦團隊從這些數(shù)據(jù)里挖掘出用戶感興趣的內(nèi)容并做精準推薦,廣告團隊根據(jù)用戶的歷史行為推送最合適的廣告,數(shù)據(jù)團隊從數(shù)據(jù)的各個維度進行分析從而為公司的策略制定提供可靠依據(jù)。

      Hulu大數(shù)據(jù)平臺的實現(xiàn)依循Lambda架構(gòu)。Lambda架構(gòu)是一個通用的大數(shù)據(jù)處理框架,包含離線的批處理層、在線的加速層和服務層三部分,具體如圖2所示。服務層一般使用HTTP服務或自定制的客戶端對外提供數(shù)據(jù)訪問,離線的批處理層一般使用批處理計算框架SparkMapReduce進行數(shù)據(jù)分析,在線的加速層一般使用流式實時計算框架SparkStreamingStorm進行數(shù)據(jù)分析。



      2lambda架構(gòu)原理圖

      對于實時計算部分,Hulu內(nèi)部使用了Kafka、CodisSparkStreaming。面按照數(shù)據(jù)流的過程,介紹我們的項目。

      1.     收集數(shù)據(jù) - 從服務器日志中收集數(shù)據(jù),流程如圖3所示:

      1. 來自網(wǎng)頁、手機App、機頂盒等設(shè)備的用戶產(chǎn)生視頻觀看、廣告點擊等行為,這些行為數(shù)據(jù)記錄在各自的Nginx服務的日志中;

      2. 使用Flume將用戶行為數(shù)據(jù)同時導入HDFSKafka,其中HDFS中的數(shù)據(jù)用于離線分析,而Kafka中數(shù)據(jù)則用于流式實時分析。

      3Hulu數(shù)據(jù)收集流程

      2.     存儲標簽數(shù)據(jù) - Hulu使用HBase存儲用戶標簽數(shù)據(jù),包括基本信息如性別、年齡、是否付費,以及其他模型推測出來的偏好屬性。這些屬性需要作為計算模型的輸入,同時HBase隨機讀取的速度比較慢,所以需要將數(shù)據(jù)同步到緩存服務器中以加快數(shù)據(jù)讀取速度。Redis是一個應用廣泛的開源緩存服務器,一個免費開源的高性能Key-Value數(shù)據(jù)庫,但其本身是個單機系統(tǒng),不能很好地支持大量數(shù)據(jù)的緩存。為解決Redis擴展性差的問題,豌豆莢開源了Codis,一個分布式Redis解決方案。HuluCodis打成Docker鏡像,并實現(xiàn)一鍵式構(gòu)建緩存系統(tǒng),附帶自動監(jiān)控和修復功能。為了更精細的監(jiān)控,我們構(gòu)建了多個Codis緩存,分別是:

      1.   codis-profile,同步HBase中的用戶屬性;

      2.   codis-action,緩存來自Kafka的用戶行為;

      3.   codis-result,記錄計算結(jié)果。

      3.     實時處理數(shù)據(jù) - 準備就緒,啟動Spark Streaming程序

      1)      SparkStreaming啟動Kafka Receiver,持續(xù)Kafka服務器拉數(shù)據(jù);

      2)      每隔兩秒,Kafka的數(shù)據(jù)被整理成一個RDD,交給Spark引擎處理;

      3)      對一條用戶行為,Spark會從codis-action緩存中拿到該用戶的行為記錄,然后把新的行為追加進去;

      4)      Sparkcodis-actioncodis-profile中獲得該用戶的所有相關(guān)屬性,然后執(zhí)行廣告推薦的計算模型,最后把結(jié)果寫入codis-result,進而供服務層實時讀取這些結(jié)果。

       

      三、Spark Streaming優(yōu)化經(jīng)驗

              實踐中,業(yè)務邏輯首先保證完成,使得在Kafka輸入數(shù)據(jù)量較小的情況下系統(tǒng)穩(wěn)定運行,且輸入輸出滿足項目需求。然后開始調(diào)優(yōu),修改SparkStreaming的參數(shù),比如Executor的數(shù)量,Core的數(shù)量,Receiver的流量等。最后發(fā)現(xiàn)僅調(diào)參數(shù)無法完全滿足本項目的業(yè)務場景,所以有更進一步的優(yōu)化方案,總結(jié)如下:

      1.    Executor初始化

      很多機器學習的模型在第一次運行時,需要執(zhí)行初始化方法,還會連接外部的數(shù)據(jù)庫,常常需要5-10分鐘,這會成為潛在的不穩(wěn)定因素。在Spark Streaming應用中,當Receiver完成初始化,它就開始源源不斷地接收數(shù)據(jù),并且由Driver定期調(diào)度任務消耗這些數(shù)據(jù)。如果剛啟動時Executor需要幾分鐘做準備,會導致第一個作業(yè)一直沒有完成,這段時間內(nèi)Driver不會調(diào)度新的作業(yè)。這時候在Kafka Receiver端會有數(shù)據(jù)積壓,隨著積壓的數(shù)據(jù)量越來越大,大部分數(shù)據(jù)會撐過新生代進入老年代,進而給Java GC帶來嚴重的壓力,容易引發(fā)應用程序崩潰。

      本項目的解決方案是,修改Spark內(nèi)核,在每個Executor接收任務之前先執(zhí)行一個用戶自定義的初始化函數(shù),初始化函數(shù)中可以執(zhí)行一些獨立的用戶邏輯。示例代碼如下:

        // sc:SparkContext, setupEnvironmentHulu擴展的API

        sc.setupEnvironment(() => {

          application.initialize() // 用戶應用程序初始化,需執(zhí)行幾分鐘

          println(“Invoke executor  setup method successfully.”)

        })

      該方案需要更改Spark的任務調(diào)度器,首先將每個Executor設(shè)置為未初始化狀態(tài)。此時,調(diào)度器只會給未初始化狀態(tài)的Executor分配初始化任務(執(zhí)行前面提到的初始化函數(shù))。等初始化任務完畢,調(diào)度器更新Executor的狀態(tài)為已初始化,這樣的Executor才可以分配正常的計算任務。

      2.    異步處理Task中的業(yè)務邏輯

      本項目中,模型的輸入?yún)?shù)均來自Codis,甚至模型內(nèi)部也可能訪問外部存儲,直接導致模型計算時長不穩(wěn)定,很多時間消耗在網(wǎng)絡等待上。

      為提高系統(tǒng)吞吐量,增大并行度是比較通用的優(yōu)化方案,但在本項目的場景中并不適用。因為Spark作業(yè)的調(diào)度策略是,等待上一個作業(yè)的所有Task執(zhí)行完畢,然后調(diào)度下一個作業(yè)。如果單個Task的運行時間不穩(wěn)定,易發(fā)生個別Task拖慢整個作業(yè)的情況,以至于資源利用率不高,系統(tǒng)吞吐量上不去;甚至并行度越大,該問題越嚴重。一種常用的解決Task不穩(wěn)定的方案是增大Spark Streamingmicro batch的時間間隔,該方案會使整個實時系統(tǒng)的延遲變長,并不推薦。

      該問題的解決方案是異步處理Task中的業(yè)務邏輯。如下文的代碼所示,同步方案中,Task內(nèi)執(zhí)行業(yè)務邏輯,處理時間不定;異步方案中,Task把業(yè)務邏輯嵌入線程,交給線程池執(zhí)行,Task立刻結(jié)束,ExecutorDriver報告執(zhí)行完畢,異步處理的時間非常短,在100ms以內(nèi)。另外,當線程池中積壓的線程數(shù)量太大時(代碼中qsize>100的情況),會暫時使用同步處理,配合反壓機制(見下文的參數(shù)spark.streaming.backpressure.enabled),可以保證不會因為數(shù)據(jù)積壓過多而導致系統(tǒng)崩潰。為設(shè)置合適的線程池大小,我們借助JVisualVM工具監(jiān)控ExecutorCPU使用率,通過調(diào)整參數(shù)找到最優(yōu)并發(fā)線程數(shù)。經(jīng)實驗驗證,該方案大大提高了系統(tǒng)的吞吐量。

        // 同步處理

        // 函數(shù)runBusinessLogic Task 中的業(yè)務邏輯,執(zhí)行時間不定

        rdd.foreachPartition(partition  => runBusinessLogic (partition))

       

        // 異步處理,threadPool是線程池

         rdd.foreachPartition(partition => {

          val  qsize = threadPool.getQueue.size // 線程池中積壓的線程數(shù)

          if  (qsize > 100) {

            runBusinessLogic(partition)  // 暫時同步處理

          }

           threadPool.execute(new Runnable {

             override def run() = runBusinessLogic(partition)

          })

        })

      異步化Task也存在缺點:如果Executor發(fā)生異常,存放在線程池中的業(yè)務邏輯無法重新計算,會導致部分數(shù)據(jù)丟失。經(jīng)實驗驗證,僅當Executor異常崩潰時有數(shù)據(jù)丟失,且不常見,在本項目的場景中可以接受。

      3.     Kafka Receiver的穩(wěn)定性

      本項目使用SparkStreaming中的Kafka Receiver,本質(zhì)上調(diào)用Kafka官方的客戶端ZookeeperConsumerConnector。其策略是每個客戶端在Zookeeper的固定路徑下把自己注冊為臨時節(jié)點,于是所有客戶端都知道其他客戶端的存在,然后自動協(xié)調(diào)和分配Kafka的數(shù)據(jù)資源。該策略存在一個弊端,當一個客戶端與Zookeeper的連接狀態(tài)發(fā)生改變(斷開或者連上),所有的客戶端都會通過Zookeeper協(xié)調(diào),重新分配Kafka的數(shù)據(jù)資源;在此期間所有客戶端都斷開與Kafka的連接,系統(tǒng)接收不到Kafka的數(shù)據(jù),直到重新分配成功。如果網(wǎng)絡質(zhì)量不佳,并且Receiver的個數(shù)較多,這種策略會造成數(shù)據(jù)輸入不穩(wěn)定,很多SparkStreaming用戶遇到這樣的問題。在我們的系統(tǒng)中,該策略并沒有產(chǎn)生明顯的負面影響。值得注意的是,Kafka 客戶端與Zookeeper有個默認的參數(shù)zookeeper.session.timeout.ms=6000,表示客戶端與Zookeeper連接的session有效時間為6秒,我們的客戶端多次出現(xiàn)因為Full GC超過6秒而與Zookeeper斷開連接,之后再次連接上,期間所有客戶端都受到影響,系統(tǒng)表現(xiàn)不穩(wěn)定。所以項目設(shè)置參數(shù)zookeeper.session.timeout.ms=30000。

      4.     YARN資源搶占問題

             Hulu內(nèi)部,Spark Streaming這樣的長時服務與MapRedue,SparkHive等批處理應用共享YARN集群資源。在共享環(huán)境中,經(jīng)常因一個批處理應用占用大量網(wǎng)絡資源或者CPU資源,導致Spark Streaming服務不穩(wěn)定(盡管我們采用了CGroup進行資源隔離,但效果不佳)。更嚴重的問題是,如果個別Container崩潰Driver需要向YARN申請新的Container,或者如果整個應用崩潰需要重啟,SparkStreaming不能保證很快申請到足夠的資源,也就無法保證線上服務的質(zhì)量。為解決該問題,Hulu使用label-based scheduling的調(diào)度策略,從YARN集群中隔離出若干節(jié)點專門運行SparkStreaming和其他長時服務,避免與批處理程序競爭資源。

      5.     完善監(jiān)控信息

      監(jiān)控反映系統(tǒng)運行的性能狀態(tài),也是一切優(yōu)化的基礎(chǔ)。SparkStreaming Web界面提供了比較豐富的監(jiān)控信息,同時本項目依據(jù)業(yè)務邏輯的特點增加了更多監(jiān)控。Hulu使用GraphiteGrafana作為第三方監(jiān)控系統(tǒng),本項目把系統(tǒng)中關(guān)鍵的性能參數(shù)(如計算時長和次數(shù))發(fā)送給Graphite服務器,就能夠在Grafana網(wǎng)頁上看到直觀的統(tǒng)計圖。


      4Graphite監(jiān)控信息,展示了Kafka中日志的剩余數(shù)量,一條線對應于一個partition的歷史余量

      4是統(tǒng)計Kafka中日志的剩余數(shù)量,一條線對應于一個partition的歷史余量,大部分情況下余量接近零,符合預期。圖中09:55左右日志余量開始出現(xiàn)很明顯的尖峰,之后又迅速逼近零。事后經(jīng)過多種數(shù)據(jù)核對,證實Kafka的數(shù)據(jù)一直穩(wěn)定,而當時Spark Streaming執(zhí)行作業(yè)突然變慢,反壓機制生效,于是Kafka Receiver減小讀取日志的速率,造成Kafka數(shù)據(jù)積壓;一段時間之后SparkStreaming又恢復正常,快速消耗了Kafka中的數(shù)據(jù)余量。

      直觀的監(jiān)控系統(tǒng)能有效地暴露問題,進而理解和強化系統(tǒng)。對于不同的業(yè)務邏輯,需要監(jiān)控的信息也不相同。在我們的實踐中,主要的監(jiān)控指標有:

      1.    Kafka的剩余數(shù)據(jù)量

      2.   Spark的作業(yè)運行時間和調(diào)度時間

      3.   每個Task的計算時間

      4.   Codis的訪問次數(shù)、時間、命中率

      另外,有腳本定期分析這些統(tǒng)計數(shù)據(jù),出現(xiàn)異常則發(fā)郵件報警。比如圖4 Kafka 的日志余量過大時,會有連續(xù)的報警郵件。我們的經(jīng)驗是,監(jiān)控越細致,之后的優(yōu)化工作越輕松。同時,優(yōu)秀的監(jiān)控也需要對系統(tǒng)深刻的理解。

      6.    參數(shù)優(yōu)化

      下表列出本項目中比較關(guān)鍵的幾個參數(shù):

      spark.yarn.max.executor.failures

      Executor允許的失敗上限如果超過該上限,整個Spark Streaming會失敗,需要設(shè)置比較大

      spark.yarn.executor.memoryOverhead

      ExecutorJVM的開銷,與堆內(nèi)存不一樣,設(shè)置太小會導致內(nèi)存溢出異常

      spark.receivers.num

      Kafka Receiver的個數(shù)

      spark.streaming.receiver.maxRate

      每個Receiver能夠接受數(shù)據(jù)的最大速率;這個值超過峰值約50%

      spark.streaming.backpressure.enabled

      反壓機制;如果目前系統(tǒng)的延遲較長,Receiver端會自動減小接受數(shù)據(jù)的速率,避免系統(tǒng)因數(shù)據(jù)積壓過多而崩潰

      spark.locality.wait

      系統(tǒng)調(diào)度Task會盡量考慮數(shù)據(jù)的局部性,如果超過spark.locality.wait設(shè)置時間的上限,就放棄局部性;該參數(shù)直接影響Task的調(diào)度時間

      spark.cleaner.ttl

      Spark系統(tǒng)內(nèi)部的元信息的超時時間;Streaming長期運行,元信息累積太多會影響性能

      四、總結(jié)

              Spark Streaming的產(chǎn)品上線運行一年多,期間進行了多次Spark版本升級,從最早期的0.8版本到最近1.5.x版本。總體上Spark Streaming是一款優(yōu)秀的實時計算框架,可以在線上使用。但仍然存在一些不足,包括:

      1.     Spark同時使用堆內(nèi)和堆外的內(nèi)存,缺乏一些有效的監(jiān)控信息,遇到OOM時分析和調(diào)試比較困難;

      2.     缺少Executor初始化接口;

      3.     Spark采用函數(shù)式編程方式,抽象層次高,好處是使用方便,壞處是理解和優(yōu)化困難;

      4.     新版本的Spark有一些異常,如Shuffle過程中Block丟失、內(nèi)存溢出。


        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多