本章節(jié)內(nèi)容: 一、在線動態(tài)計算分類最熱門商品案例回顧 二、基于案例貫通Spark Streaming的運行源碼 先看代碼(源碼場景:用戶、用戶的商品、商品的點擊量排名,按商品、其點擊量排名前三): package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用Spark Streaming+Spark SQL來在線動態(tài)計算電商中不同類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別 * 下最熱門的三種電視,該實例在實際生產(chǎn)環(huán)境下具有非常重大的意義; * @author DT大數(shù)據(jù)夢工廠 * 新浪微博:http://weibo.com/ilovepains/ * 實現(xiàn)技術(shù):Spark Streaming+Spark SQL,之所以Spark Streaming能夠使用ML、sql、graphx等功能是因為有foreachRDD和Transform * 等接口,這些接口中其實是基于RDD進行操作,所以以RDD為基石,就可以直接使用Spark其它所有的功能,就像直接調(diào)用API一樣簡單。 * 假設說這里的數(shù)據(jù)的格式:user item category,例如Rocky Samsung Android */ object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ /** * 第1步:創(chuàng)建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置 * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如 * 只有1G的內(nèi)存)的初學者 */ val conf = new SparkConf() //創(chuàng)建SparkConf對象 conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設置應用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱 //conf.setMaster("spark://Master:7077") //此時,程序在Spark集群 conf.setMaster("local[6]") //設置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口 val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("/root/Documents/SparkApps/checkpoint") val userClickLogsDStream = ssc.socketTextStream("Master", 9999) val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog => (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1)) val categoryUserClickLogsDStream = //窗口的總長度是60秒,每隔20秒滑動一次,應該在過去60秒的總長度加上新的20秒,在新的結(jié)果基礎(chǔ)上減去20秒 formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_, Seconds(60), Seconds(20)) categoryUserClickLogsDStream.foreachRDD { rdd => { if (rdd.isEmpty()) {//判斷RDD是否為空 println("No data inputted!!!") } else { val categoryItemRow = rdd.map(reducedItem => { val category = reducedItem._1.split("_")(0) val item = reducedItem._1.split("_")(1) val click_count = reducedItem._2 Row(category, item, click_count) }) val structType = StructType(Array( StructField("category", StringType, true), StructField("item", StringType, true), StructField("click_count", IntegerType, true) )) //生產(chǎn)環(huán)境下注意用hiveContext,其繼承了SparkContext所有功能 val hiveContext = new HiveContext(rdd.context) val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType) categoryItemDF.registerTempTable("categoryItemTable") val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" + " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " + " WHERE rank <= 3") reseltDataFram.show() val resultRowRDD = reseltDataFram.rdd resultRowRDD.foreachPartition { partitionOfRecords => { if (partitionOfRecords.isEmpty){ println("This RDD is not null but partition is null") } else { val connection = ConnectionPool.getConnection() // ConnectionPool is a static, lazily initialized pool of connections partitionOfRecords.foreach(record => { val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" + record.getAs("item") + "'," + record.getAs("click_count") + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } } } } /** * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job * 2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到 * 數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker * 內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息 * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個 * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢? * 1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙; * 2,有可能設置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持; */ ssc.start() ssc.awaitTermination() } } 演示源代碼效果 啟動Hive服務: 啟動應用程序: webui控制臺監(jiān)控運行情況: 如果報錯的話是數(shù)據(jù)源沒有鏈接上,需要啟動nc –lk 9999即可: 查看運行結(jié)果: 以local模式運行的話刷新local的History-server webui界面: Local的模式,所有任務都在driver上運行: 使用了reduceByKeyAndWindow窗口滑動操作,需要對窗口輸出結(jié)果聚合: 源碼分析
進入StreamingContext內(nèi)部構(gòu)造函數(shù)需要傳入master、appname、batchDuration等參數(shù);關(guān)鍵點createNewSparkContext對象,這說明StreamingContext構(gòu)建是通過再其內(nèi)部會構(gòu)建SparkContext,通過sparkConf構(gòu)建SparkConcoxt,進一步說明SparkStreaming是SparkCore上的一個應用程序。用戶編寫SparkStreaming應用程序在StreamingContext內(nèi)部會構(gòu)建SparkConcoxt。 看下val userClickLogsDStream = ssc.socketTextStream("Master", 9999)源碼,創(chuàng)建一個DStream輸入流即socketinputDSteam: 有了DStream后可以基于Receiver接收數(shù)據(jù),Receiver內(nèi)部的onStart方法開啟run線程來調(diào)用Receiver方法連接上socket: 在Receiver方法中new Socket對象,并根據(jù)應用程序業(yè)務代碼指定的host、port進行接收inputStream輸入數(shù)據(jù),將接收到的數(shù)據(jù)不斷一條一條循環(huán)進行Store: 再看業(yè)務邏輯代碼的一系列transformation,從SparkStreaming框架角度來講不是很重要,因為這些代碼是你自己的業(yè)務邏輯,不是框架代碼,在研究源碼時暫時放在一邊: 接下來很重要的一步就是SparkStreaming的start方法,看下這個方法的描述: /** * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job * 2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor,然后通過ReceiverSupervisor啟動Receiver),在Receiver收到 * 數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker * 內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息 * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個 * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢? * 1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙; * 2,有可能設置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持; */ 我們再深入看看InputDStream源碼,業(yè)務代碼通過StreamingContext對象的socketTextStream方法創(chuàng)建了SocketStream對象: 通過SocketStream對象創(chuàng)建了SocketInputDStream對象,返回的是ReceiverInputDStream類型: 進入ReceiverInputDStream抽象類,其繼承了InputDStream類: 在進入InputDStream抽象類,其繼續(xù)繼承了DStream(其實就是RDD的模板): 進入DStream抽象類,其只是在RDD的基礎(chǔ)上進行簡單的封裝,就是RDD的模板而已,DStream有以下特征:
看下DStream的繼承結(jié)構(gòu)如下(剛才我們看到的InputDStream也在繼承的里面): InputDStream注釋:所有DStream的子類,其提供start和stop方法,當SparkStreaming系統(tǒng)啟動和停止接收數(shù)據(jù)的時候會被調(diào)用,剛才看到具體子類SocketTextStream里面具體的start和stop方法提供其回調(diào): InputDStream對應的子類的start和stop方法被ReceiverSupervisor的start和stop方法回調(diào)。 繼續(xù)看ReceiverInputDStream類的子類,其中有SocketInputDStream類,此類只實現(xiàn)了getReceiver方法: 在看看DStream整個繼承結(jié)構(gòu)中,發(fā)現(xiàn)有一個ForeachDStream類,在ForeachDStream中會產(chǎn)生作業(yè),里面新建了一個job,DStream級別的action操作都會有foreachDStream的產(chǎn)生,會被的generatorJob調(diào)用,當然新建的job會產(chǎn)生基于function操作,實際上是你自己寫的業(yè)務邏輯function: ForeachDStream類中關(guān)鍵代碼generatorRDD根據(jù)時間間隔(batchInterval或batchDuration)生成相應的RDD和RDD的操作,我們對DStream的操作反過來作用于RDD的操作,DStream的本質(zhì)就是按照時間的序列存儲的一系列數(shù)據(jù)流,因為RDD中存儲的就是batchDuration時間片的數(shù)據(jù): 如果要擴展SparkStreaming源碼,注意和SparkStreaming包名一致才可以訪問其源碼。 繼續(xù)看代碼,看到getOrCompute方法,因為其作用于DStream,這個方法很關(guān)鍵,根據(jù)指定的時間獲得RDD,如果緩存中有的話可根據(jù)指定時間從緩存中獲取數(shù)據(jù): DStream類的getOrCompute的操作是生成RDD,其實DStream是rdd的模板,對DStream的getOrCompute操作實際上觸發(fā)運行g(shù)eneratorRDDs,并將生成的RDD放在HashMap中: 所以通過socketTextStream代碼,最終獲得輸入流socketInputDStream的Receiver 在集群上抓取數(shù)據(jù): 通過socketReceiver從網(wǎng)絡上獲取流數(shù)據(jù),這樣就將網(wǎng)絡上的離散流數(shù)據(jù)以HashMap的方式存儲起來了: 源碼說的非常清楚: 假如說一萬年的話每秒一個單位,這就是離散流,用每秒對一萬年進行切分,切成一片一片,切開后就叫離散,離散之間沒什么關(guān)系。 SparkStreaming整體架構(gòu)圖初見: Spark發(fā)行版筆記5 新浪微博:http://weibo.com/ilovepains 微信公眾號:DT_Spark 博客:http://blog.sina.com.cn/ilovepains 手機:18610086859 QQ:1740415547 郵箱:18610086859@vip.126.com |
|