乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      貫通Spark Streaming流計算框架的運行源碼

       陳永正的圖書館 2017-07-03

      本章節(jié)內(nèi)容:

      一、在線動態(tài)計算分類最熱門商品案例回顧

      二、基于案例貫通Spark Streaming的運行源碼

      先看代碼(源碼場景:用戶、用戶的商品、商品的點擊量排名,按商品、其點擊量排名前三):

      package com.dt.spark.sparkstreaming

      import org.apache.spark.SparkConf

      import org.apache.spark.sql.Row

      import org.apache.spark.sql.hive.HiveContext

      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

      import org.apache.spark.streaming.{Seconds, StreamingContext}

      /**

        * 使用Spark Streaming+Spark SQL來在線動態(tài)計算電商中不同類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別

        * 下最熱門的三種電視,該實例在實際生產(chǎn)環(huán)境下具有非常重大的意義;

        * @author DT大數(shù)據(jù)夢工廠

        * 新浪微博:http://weibo.com/ilovepains/

        *   實現(xiàn)技術(shù):Spark Streaming+Spark SQL,之所以Spark Streaming能夠使用ML、sql、graphx等功能是因為有foreachRDD和Transform

        * 等接口,這些接口中其實是基于RDD進行操作,所以以RDD為基石,就可以直接使用Spark其它所有的功能,就像直接調(diào)用API一樣簡單。

        *  假設說這里的數(shù)據(jù)的格式:user item category,例如Rocky Samsung Android

        */

      object OnlineTheTop3ItemForEachCategory2DB {

        def main(args: Array[String]){

          /**

            * 第1步:創(chuàng)建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,

            * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置

            * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如

            * 只有1G的內(nèi)存)的初學者

            */

          val conf = new SparkConf() //創(chuàng)建SparkConf對象

          conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設置應用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱

        //conf.setMaster("spark://Master:7077") //此時,程序在Spark集群

          conf.setMaster("local[6]")

          //設置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口

          val ssc = new StreamingContext(conf, Seconds(5))

          ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

          val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

          val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>

              (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

          val categoryUserClickLogsDStream =

      //窗口的總長度是60秒,每隔20秒滑動一次,應該在過去60秒的總長度加上新的20秒,在新的結(jié)果基礎(chǔ)上減去20秒

      formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_, Seconds(60), Seconds(20))

          categoryUserClickLogsDStream.foreachRDD { rdd => {

            if (rdd.isEmpty()) {//判斷RDD是否為空

              println("No data inputted!!!")

            } else {

              val categoryItemRow = rdd.map(reducedItem => {

                val category = reducedItem._1.split("_")(0)

                val item = reducedItem._1.split("_")(1)

                val click_count = reducedItem._2

                Row(category, item, click_count)

              })

              val structType = StructType(Array(

                StructField("category", StringType, true),

                StructField("item", StringType, true),

                StructField("click_count", IntegerType, true)

              ))

              //生產(chǎn)環(huán)境下注意用hiveContext,其繼承了SparkContext所有功能

              val hiveContext = new HiveContext(rdd.context)

              val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

              categoryItemDF.registerTempTable("categoryItemTable")

              val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +

                " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +

                " WHERE rank <= 3")

              reseltDataFram.show()

              val resultRowRDD = reseltDataFram.rdd

              resultRowRDD.foreachPartition { partitionOfRecords => {

                if (partitionOfRecords.isEmpty){

                  println("This RDD is not null but partition is null")

                } else {

                  val connection = ConnectionPool.getConnection()  // ConnectionPool is a static, lazily initialized pool of connections

                  partitionOfRecords.foreach(record => {

                    val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +

                      record.getAs("item") + "'," + record.getAs("click_count") + ")"

                    val stmt = connection.createStatement();

                    stmt.executeUpdate(sql);

                  })

                  ConnectionPool.returnConnection(connection) // return to the pool for future reuse

                }

              }

              }

            }

          }

          }

          /**

            * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler

            * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法:

            *   1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

            *   2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到

            *   數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker

            *   內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

            * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

            * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個

            * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢?

            *   1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;

            *   2,有可能設置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持;

            */

          ssc.start()

          ssc.awaitTermination()

        }

      }

      演示源代碼效果

      啟動Hive服務:

      啟動應用程序:

       webui控制臺監(jiān)控運行情況:

      如果報錯的話是數(shù)據(jù)源沒有鏈接上,需要啟動nc –lk 9999即可:

      查看運行結(jié)果:

      以local模式運行的話刷新local的History-server webui界面:

      Local的模式,所有任務都在driver上運行:

      使用了reduceByKeyAndWindow窗口滑動操作,需要對窗口輸出結(jié)果聚合:

      源碼分析

      1. 創(chuàng)建sparkConf的配置。
      2. 創(chuàng)建實例StreamingContext

      進入StreamingContext內(nèi)部構(gòu)造函數(shù)需要傳入master、appname、batchDuration等參數(shù);關(guān)鍵點createNewSparkContext對象,這說明StreamingContext構(gòu)建是通過再其內(nèi)部會構(gòu)建SparkContext,通過sparkConf構(gòu)建SparkConcoxt,進一步說明SparkStreaming是SparkCore上的一個應用程序。用戶編寫SparkStreaming應用程序在StreamingContext內(nèi)部會構(gòu)建SparkConcoxt。

      看下val userClickLogsDStream = ssc.socketTextStream("Master", 9999)源碼,創(chuàng)建一個DStream輸入流即socketinputDSteam:

      有了DStream后可以基于Receiver接收數(shù)據(jù),Receiver內(nèi)部的onStart方法開啟run線程來調(diào)用Receiver方法連接上socket:

      在Receiver方法中new Socket對象,并根據(jù)應用程序業(yè)務代碼指定的host、port進行接收inputStream輸入數(shù)據(jù),將接收到的數(shù)據(jù)不斷一條一條循環(huán)進行Store:

      再看業(yè)務邏輯代碼的一系列transformation,從SparkStreaming框架角度來講不是很重要,因為這些代碼是你自己的業(yè)務邏輯,不是框架代碼,在研究源碼時暫時放在一邊:

      接下來很重要的一步就是SparkStreaming的start方法,看下這個方法的描述:

      /**

            * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler

            * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法:

            *   1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

            *   2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor,然后通過ReceiverSupervisor啟動Receiver),在Receiver收到

            *   數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker

            *   內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

            * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

            * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個

            * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢?

            *   1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;

            *   2,有可能設置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持;

          */

      我們再深入看看InputDStream源碼,業(yè)務代碼通過StreamingContext對象的socketTextStream方法創(chuàng)建了SocketStream對象:

      通過SocketStream對象創(chuàng)建了SocketInputDStream對象,返回的是ReceiverInputDStream類型:

      進入ReceiverInputDStream抽象類,其繼承了InputDStream類:

      在進入InputDStream抽象類,其繼續(xù)繼承了DStream(其實就是RDD的模板):

      進入DStream抽象類,其只是在RDD的基礎(chǔ)上進行簡單的封裝,就是RDD的模板而已,DStream有以下特征:

      1. DStream依賴其他的DStreams
      2. 依賴DStream模板構(gòu)建出RDD之間的依賴
      3. 基于DStream它會有一個function,function被用來基于batchDuration或batchIntval時間片生成RDD,這個和定時器有關(guān)系
      4. DStream是RDD的模板,DStream上的很多操作轉(zhuǎn)過來其實是對RDD的操作

      看下DStream的繼承結(jié)構(gòu)如下(剛才我們看到的InputDStream也在繼承的里面):

      InputDStream注釋:所有DStream的子類,其提供start和stop方法,當SparkStreaming系統(tǒng)啟動和停止接收數(shù)據(jù)的時候會被調(diào)用,剛才看到具體子類SocketTextStream里面具體的start和stop方法提供其回調(diào):

      InputDStream對應的子類的start和stop方法被ReceiverSupervisor的start和stop方法回調(diào)。

      繼續(xù)看ReceiverInputDStream類的子類,其中有SocketInputDStream類,此類只實現(xiàn)了getReceiver方法:

      在看看DStream整個繼承結(jié)構(gòu)中,發(fā)現(xiàn)有一個ForeachDStream類,在ForeachDStream中會產(chǎn)生作業(yè),里面新建了一個job,DStream級別的action操作都會有foreachDStream的產(chǎn)生,會被的generatorJob調(diào)用,當然新建的job會產(chǎn)生基于function操作,實際上是你自己寫的業(yè)務邏輯function:

      ForeachDStream類中關(guān)鍵代碼generatorRDD根據(jù)時間間隔(batchInterval或batchDuration)生成相應的RDD和RDD的操作,我們對DStream的操作反過來作用于RDD的操作,DStream的本質(zhì)就是按照時間的序列存儲的一系列數(shù)據(jù)流,因為RDD中存儲的就是batchDuration時間片的數(shù)據(jù):

      如果要擴展SparkStreaming源碼,注意和SparkStreaming包名一致才可以訪問其源碼。

      繼續(xù)看代碼,看到getOrCompute方法,因為其作用于DStream,這個方法很關(guān)鍵,根據(jù)指定的時間獲得RDD,如果緩存中有的話可根據(jù)指定時間從緩存中獲取數(shù)據(jù):

      DStream類的getOrCompute的操作是生成RDD,其實DStream是rdd的模板,對DStream的getOrCompute操作實際上觸發(fā)運行g(shù)eneratorRDDs,并將生成的RDD放在HashMap中:

      所以通過socketTextStream代碼,最終獲得輸入流socketInputDStream的Receiver 在集群上抓取數(shù)據(jù):

      通過socketReceiver從網(wǎng)絡上獲取流數(shù)據(jù),這樣就將網(wǎng)絡上的離散流數(shù)據(jù)以HashMap的方式存儲起來了:

      源碼說的非常清楚:

        假如說一萬年的話每秒一個單位,這就是離散流,用每秒對一萬年進行切分,切成一片一片,切開后就叫離散,離散之間沒什么關(guān)系。

      SparkStreaming整體架構(gòu)圖初見:

      Spark發(fā)行版筆記5

      新浪微博:http://weibo.com/ilovepains

      微信公眾號:DT_Spark

      博客:http://blog.sina.com.cn/ilovepains

      手機:18610086859

      QQ:1740415547

      郵箱:18610086859@vip.126.com

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多