乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Spark入門實戰(zhàn)系列

       青葉i生活 2018-01-23

      【注】該系列文章以及使用到安裝包/測試數(shù)據(jù) 可以在《傾情大奉送--Spark入門實戰(zhàn)系列》獲取

      1SparkSQL的發(fā)展歷程

      1.1 Hive and Shark

      SparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,它是當時唯一運行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運行效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產(chǎn)生,其中表現(xiàn)較為突出的是:

      l MapRDrill

      l ClouderaImpala

      l Shark

      其中Shark是伯克利實驗室Spark生態(tài)環(huán)境的組件之一,它修改了下圖所示的右下角的內(nèi)存管理、物理計劃、執(zhí)行三個模塊,并使之能運行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。

      clip_image002

      1.2 SharkSparkSQL 

      但是,隨著Spark的發(fā)展,對于野心勃勃的Spark團隊來說,Shark對于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等),制約了SparkOne Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,所以提出了SparkSQL項目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優(yōu)點,如內(nèi)存列存儲(In-Memory Columnar Storage)、Hive兼容性等,重新開發(fā)了SparkSQL代碼;由于擺脫了對Hive的依賴性,SparkSQL無論在數(shù)據(jù)兼容、性能優(yōu)化、組件擴展方面都得到了極大的方便,真可謂“退一步,海闊天空”。

      l數(shù)據(jù)兼容方面  不但兼容Hive,還可以從RDD、parquet文件、JSON文件中獲取數(shù)據(jù),未來版本甚至支持獲取RDBMS數(shù)據(jù)以及cassandraNOSQL數(shù)據(jù);

      l性能優(yōu)化方面  除了采取In-Memory Columnar Storagebyte-code generation等優(yōu)化技術外、將會引進Cost Model對查詢進行動態(tài)評估、獲取最佳物理計劃等等;

      l組件擴展方面  無論是SQL的語法解析器、分析器還是優(yōu)化器都可以重新定義,進行擴展。

      201461Shark項目和SparkSQL項目的主持人Reynold Xin宣布:停止對Shark的開發(fā),團隊將所有資源放SparkSQL項目上,至此,Shark的發(fā)展畫上了句話,但也因此發(fā)展出兩個直線:SparkSQLHive on Spark

      clip_image004

      其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一個Hive的發(fā)展計劃,該計劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限于一個引擎,可以采用Map-Reduce、Tez、Spark等引擎。

      1.3 SparkSQL的性能

      Shark的出現(xiàn),使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高:

      clip_image006

      那么,擺脫了Hive的限制,SparkSQL的性能又有怎么樣的表現(xiàn)呢?雖然沒有Shark相對于Hive那樣矚目地性能提升,但也表現(xiàn)得非常優(yōu)異:

      clip_image008

      為什么SparkSQL的性能會得到怎么大的提升呢?主要SparkSQL在下面幾點做了優(yōu)化:

      A:內(nèi)存列存儲(In-Memory Columnar Storage

      SparkSQL的表數(shù)據(jù)在內(nèi)存中存儲不是采用原生態(tài)的JVM對象存儲方式,而是采用內(nèi)存列存儲,如下圖所示。

      clip_image010

      該存儲方式無論在空間占用量和讀取吞吐率上都占有很大優(yōu)勢。

      對于原生態(tài)的JVM對象存儲方式,每個對象通常要增加12-16字節(jié)的額外開銷,對于一個270MBTPC-H lineitem table數(shù)據(jù),使用這種方式讀入內(nèi)存,要使用970MB左右的內(nèi)存空間(通常是25倍于原生數(shù)據(jù)空間);另外,使用這種方式,每個數(shù)據(jù)記錄產(chǎn)生一個JVM對象,如果是大小為200B的數(shù)據(jù)記錄,32G的堆棧將產(chǎn)生1.6億個對象,這么多的對象,對于GC來說,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數(shù)量呈線性相關)。顯然這種內(nèi)存存儲方式對于基于內(nèi)存計算的Spark來說,很昂貴也負擔不起。

      對于內(nèi)存列存儲來說,將所有原生數(shù)據(jù)類型的列采用原生數(shù)組來存儲,將Hive支持的復雜數(shù)據(jù)類型(如array、map等)先序化后并接成一個字節(jié)數(shù)組來存儲。這樣,每個列創(chuàng)建一個JVM對象,從而導致可以快速的GC和緊湊的數(shù)據(jù)存儲;額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低內(nèi)存開銷;更有趣的是,對于分析查詢中頻繁使用的聚合特定列,性能會得到很大的提高,原因就是這些列的數(shù)據(jù)放在一起,更容易讀入內(nèi)存進行計算。

      B:字節(jié)碼生成技術(bytecode generation,即CG

      在數(shù)據(jù)庫查詢中有一個昂貴的操作是查詢語句中的表達式,主要是由于JVM的內(nèi)存模型引起的。比如如下一個查詢:

      SELECT a + b FROM table

      在這個查詢里,如果采用通用的SQL語法途徑去處理,會先生成一個表達式樹(有兩個節(jié)點的Add樹,參考后面章節(jié)),在物理處理這個表達式樹的時候,將會如圖所示的7個步驟:

      1.  調(diào)用虛函數(shù)Add.eval(),需要確認Add兩邊的數(shù)據(jù)類型

      2.  調(diào)用虛函數(shù)a.eval(),需要確認a的數(shù)據(jù)類型

      3.  確定a的數(shù)據(jù)類型是Int,裝箱

      4.  調(diào)用虛函數(shù)b.eval(),需要確認b的數(shù)據(jù)類型

      5.  確定b的數(shù)據(jù)類型是Int,裝箱

      6.  調(diào)用Int類型的Add

      7.  返回裝箱后的計算結(jié)果

      其中多次涉及到虛函數(shù)的調(diào)用,虛函數(shù)的調(diào)用會打斷CPU的正常流水線處理,減緩執(zhí)行。

      Spark1.1.0catalyst模塊的expressions增加了codegen模塊,如果使用動態(tài)字節(jié)碼生成技術(配置spark.sql.codegen參數(shù)),SparkSQL在執(zhí)行物理計劃的時候,對匹配的表達式采用特定的代碼,動態(tài)編譯,然后運行。如上例子,匹配到Add方法:

      clip_image012

      然后,通過調(diào)用,最終調(diào)用:

      clip_image014

      最終實現(xiàn)效果類似如下偽代碼:

      val a: Int = inputRow.getInt(0)

      val b: Int = inputRow.getInt(1)

      val result: Int = a + b

      resultRow.setInt(0, result)

      對于Spark1.1.0,對SQL表達式都作了CG優(yōu)化,具體可以參看codegen模塊。CG優(yōu)化的實現(xiàn)主要還是依靠scala2.10的運行時放射機制(runtime reflection)。對于SQL查詢的CG優(yōu)化,可以簡單地用下圖來表示:

      clip_image016

      CScala代碼優(yōu)化

      另外,SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對于用戶來說,還是使用統(tǒng)一的接口,沒受到使用上的困難。下圖是一個Scala代碼優(yōu)化的示意圖:

      clip_image018

      2SparkSQL運行架構(gòu)

      類似于關系型數(shù)據(jù)庫,SparkSQL也是語句也是由Projectiona1,a2,a3)、Data SourcetableA)、Filtercondition)組成,分別對應sql查詢過程中的ResultData SourceOperation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。

      clip_image020

       

      當執(zhí)行SparkSQL語句的順序為:

      1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROMWHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規(guī)范;

      2.SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列、表、視圖等等)進行綁定(Bind),如果相關的ProjectionData Source等都是存在的話,就表示這個SQL語句是可以執(zhí)行的;

      3.一般的數(shù)據(jù)庫會提供幾個執(zhí)行計劃,這些計劃一般都有運行統(tǒng)計數(shù)據(jù),數(shù)據(jù)庫會在這些計劃中選擇一個最優(yōu)計劃(Optimize);

      4.計劃執(zhí)行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執(zhí)行過程有時候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運行剛運行過的SQL語句,可能直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。

      2.1 TreeRule

      SparkSQLSQL語句的處理和關系型數(shù)據(jù)庫對SQL語句的處理采用了類似的方法,首先會將SQL語句進行解析(Parse),然后形成一個Tree,在后續(xù)的如綁定、優(yōu)化等處理過程都是對Tree的操作,而操作的方法是采用Rule,通過模式匹配,對不同類型的節(jié)點采用不同的操作。在整個sql語句的處理過程中,TreeRule相互配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優(yōu)化、物理計劃等過程,最終生成可以執(zhí)行的物理計劃。

      2.1.1 Tree

      l  Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees

      l  Logical Plans、ExpressionsPhysical Operators都可以使用Tree表示

      l  Tree的具體操作是通過TreeNode來實現(xiàn)的

      ?  SparkSQL定義了catalyst.trees的日志,通過這個日志可以形象的表示出樹的結(jié)構(gòu)

      ?  TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進行操作

      ?  有了TreeNode,通過Tree中各個TreeNode之間的關系,可以對Tree進行遍歷操作,如使用transformDowntransformUpRule應用到給定的樹段,然后用結(jié)果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對一個給定的節(jié)點進行操作,通過迭代將Rule應用到該節(jié)點以及子節(jié)點。

      l  TreeNode可以細分成三種類型的Node

      ?  UnaryNode 一元節(jié)點,即只有一個子節(jié)點。如Limit、Filter操作

      ?  BinaryNode 二元節(jié)點,即有左右子節(jié)點的二叉節(jié)點。如Jion、Union操作

      ?  LeafNode 葉子節(jié)點,沒有子節(jié)點的節(jié)點。主要用戶命令類操作,如SetCommand

       

      2.1.2 Rule

      l  Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules

      l  RuleSparkSQLAnalyzerOptimizer、SparkPlan等各個組件中都有應用到

      l  Rule是一個抽象類,具體的Rule實現(xiàn)是通過RuleExecutor完成

      l  Rule通過定義batchbatchs,可以簡便的、模塊化地對Tree進行transform操作

      l  Rule通過定義OnceFixedPoint,可以對Tree進行一次操作或多次操作(如對某些Tree進行多次迭代操作的時候,達到FixedPoint次數(shù)迭代或達到前后兩次的樹結(jié)構(gòu)沒變化才停止操作,具體參看RuleExecutor.apply

      2.2 sqlContexthiveContext的運行過程

      SparkSQL有兩個分支,sqlContexthiveContextsqlContext現(xiàn)在只支持SQL語法解析器(SQL-92語法);hiveContext現(xiàn)在支持SQL語法解析器和hivesql語法解析器,默認為hiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法,

      2.2.1 sqlContext的運行過程

      sqlContext總的一個過程如下圖所示:

      1.SQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan

      2.使用analyzer結(jié)合數(shù)據(jù)數(shù)據(jù)字典(catalog)進行綁定,生成resolvedLogicalPlan

      3.使用optimizerresolvedLogicalPlan進行優(yōu)化,生成optimizedLogicalPlan;

      4.使用SparkPlanLogicalPlan轉(zhuǎn)換成PhysicalPlan;

      5.使用prepareForExecution()PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計劃;

      6.使用execute()執(zhí)行可執(zhí)行物理計劃;

      7.生成SchemaRDD。

      在整個運行過程中涉及到多個SparkSQL的組件,如SqlParse、analyzer、optimizerSparkPlan等等

      clip_image022

      2.2.2hiveContext的運行過程

      hiveContext總的一個過程如下圖所示:

      1.SQL語句經(jīng)過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程中對hiveql語句使用getAst()獲取AST樹,然后再進行解析;

      2.使用analyzer結(jié)合數(shù)據(jù)hive源數(shù)據(jù)Metastore(新的catalog)進行綁定,生成resolved LogicalPlan

      3.使用optimizerresolved LogicalPlan進行優(yōu)化,生成optimized LogicalPlan,優(yōu)化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;

      4.使用hivePlannerLogicalPlan轉(zhuǎn)換成PhysicalPlan;

      5.使用prepareForExecution()PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計劃;

      6.使用execute()執(zhí)行可執(zhí)行物理計劃;

      7.執(zhí)行后,使用map(_.copy)將結(jié)果導入SchemaRDD

      clip_image024

      2.3 catalyst優(yōu)化器

      SparkSQL1.1總體上由四個模塊組成:core、catalysthive、hive-Thriftserver

      l  core處理數(shù)據(jù)的輸入輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù)(RDD、Parquet、json等),將查詢結(jié)果輸出成schemaRDD;

      l  catalyst處理查詢語句的整個處理過程,包括解析、綁定、優(yōu)化、物理計劃等,說其是優(yōu)化器,還不如說是查詢引擎;

      l  hivehive數(shù)據(jù)的處理

      l  hive-ThriftServer提供CLIJDBC/ODBC接口

      在這四個模塊中,catalyst處于最核心的部分,其性能優(yōu)劣將影響整體的性能。由于發(fā)展時間尚短,還有很多不足的地方,但其插件式的設計,為未來的發(fā)展留下了很大的空間。下面是catalyst的一個設計圖:

      clip_image026

       

      其中虛線部分是以后版本要實現(xiàn)的功能,實線部分是已經(jīng)實現(xiàn)的功能。從上圖看,catalyst主要的實現(xiàn)組件有:

      lsqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;

      lAnalyzer,主要完成綁定工作,將不同來源的Unresolved LogicalPlan和數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan

      loptimizerresolved LogicalPlan進行優(yōu)化,生成optimized LogicalPlan;

      l PlannerLogicalPlan轉(zhuǎn)換成PhysicalPlan

      l CostModel,主要根據(jù)過去的性能統(tǒng)計數(shù)據(jù),選擇最佳的物理執(zhí)行計劃

      這些組件的基本實現(xiàn)方法:

      l 先將sql語句通過解析生成Tree,然后在不同階段使用不同的Rule應用到Tree上,通過轉(zhuǎn)換完成各個組件的功能。

      l Analyzer使用Analysis Rules,配合數(shù)據(jù)元數(shù)據(jù)(如hive metastoreSchema catalog),完善Unresolved LogicalPlan的屬性而轉(zhuǎn)換成resolved LogicalPlan

      l optimizer使用Optimization Rules,對resolved LogicalPlan進行合并、列裁剪、過濾器下推等優(yōu)化作業(yè)而轉(zhuǎn)換成optimized LogicalPlan;

      l Planner使用Planning Strategies,對optimized LogicalPlan

      3、SparkSQL CLI

      CLICommand-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執(zhí)行指令的界面,它通常不支持鼠標,用戶通過鍵盤輸入指令,計算機接收到指令后予以執(zhí)行。Spark CLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進行執(zhí)行,在界面中顯示運行過程和最終的結(jié)果。

      Spark1.1相較于Spark1.0最大的差別就在于Spark1.1增加了Spark SQL CLIThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數(shù)據(jù)庫管理員較容易地上手,真正意義上進入了SQL時代。

      【注】Spark CLISpark Thrift Server實驗環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

      3.1  運行環(huán)境說明

      3.1.1 硬軟件環(huán)境

      l  主機操作系統(tǒng):Windows 64位,雙核4線程,主頻2.2G,10G內(nèi)存

      l  虛擬軟件:VMware? Workstation 9.0.0 build-812388

      l  虛擬機操作系統(tǒng):CentOS 64位,單核

      l  虛擬機運行環(huán)境:

      ?  JDK1.7.0_55 64

      ?  Hadoop2.2.0(需要編譯為64位)

      ?  Scala2.11.4

      ?  Spark1.1.0(需要編譯)

      ?  Hive0.13.1

      3.1.2 機器網(wǎng)絡環(huán)境

      集群包含三個節(jié)點,節(jié)點之間可以免密碼SSH訪問,節(jié)點IP地址和主機名分布如下:

      序號

      IP地址

      機器名

      類型

      核數(shù)/內(nèi)存

      用戶名

      目錄

      1

      192.168.0.61

      hadoop1

      NN/DN/RM

      Master/Worker

      1/3G

      hadoop

      /app 程序所在路徑

      /app/scala-...

      /app/hadoop

      /app/complied

      2

      192.168.0.62

      hadoop2

      DN/NM/Worker

      1/2G

      hadoop

      3

      192.168.0.63

      hadoop3

      DN/NM/Worker

      1/2G

      hadoop

      3.2 配置并啟動

      3.2.1 創(chuàng)建并配置hive-site.xml

      在運行Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下創(chuàng)建hive-site.xml文件,然后在該配置文件中,添加hive.metastore.uris屬性,具體如下:

      <configuration> 

        <property>

          <name>hive.metastore.uris</name>

          <value>thrift://hadoop1:9083</value>

          <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

        </property>

      </configuration>

      clip_image028

      3.2.2 啟動Hive

      在使用Spark SQL CLI之前需要啟動Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動HadoopHDFS),使用如下命令可以使Hive Metastore啟動后運行在后臺,可以通過jobs查詢:

      $nohup hive --service metastore > metastore.log 2>&1 &

      clip_image030

      3.2.3 啟動Spark集群和Spark SQL CLI

      通過如下命令啟動Spark集群和Spark SQL CLI

      $cd /app/hadoop/spark-1.1.0

      $sbin/start-all.sh

      $bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

      在集群監(jiān)控頁面可以看到啟動了SparkSQL應用程序:

      clip_image032

      這時就可以使用HQL語句對Hive數(shù)據(jù)進行查詢,另外可以使用COMMAND,如使用set進行設置參數(shù):默認情況下,SparkSQL Shuffle的時候是200partition,可以使用如下命令修改該參數(shù):

      SET spark.sql.shuffle.partitions=20;

      運行同一個查詢語句,參數(shù)改變后,Taskpartition)的數(shù)量就由200變成了20。

      clip_image034

      3.2.4 命令參數(shù)

      通過bin/spark-sql --help可以查看CLI命令參數(shù):

      clip_image036

      clip_image038

      其中[options] CLI啟動一個SparkSQL應用程序的參數(shù),如果不設置--master的話,將在啟動spark-sql的機器以local方式運行,只能通過http://機器名:4040進行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數(shù)。

      [cli option]CLI的參數(shù),通過這些參數(shù)CLI可以直接運行SQL文件、進入命令行運行SQL命令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到HiveMetastore,然后對Hive數(shù)據(jù)進行查詢。

      3.3 實戰(zhàn)Spark SQL CLI

      3.3.1 獲取訂單每年的銷售單數(shù)、銷售總額

      第一步   設置任務個數(shù),在這里修改為20

      spark-sql>SET spark.sql.shuffle.partitions=20;

      clip_image040

      第二步   運行SQL語句

      spark-sql>use hive;

      clip_image042

      spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

      clip_image044

      第三步   查看運行結(jié)果

      clip_image046

      clip_image048

      3.3.2 計算所有訂單每年的總金額

      第一步   執(zhí)行SQL語句

      spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

      clip_image050

      第二步   執(zhí)行結(jié)果

      使用CLI執(zhí)行結(jié)果如下:

      clip_image052

      clip_image054

      3.3.3 計算所有訂單每年最大金額訂單的銷售額

      第一步   執(zhí)行SQL語句

      spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

      clip_image056

      第二步   執(zhí)行結(jié)果

      使用CLI執(zhí)行結(jié)果如下:

      clip_image058

       

      clip_image060

      4、Spark Thrift Server

      ThriftServer是一個JDBC/ODBC接口,用戶可以通過JDBC/ODBC連接ThriftServer來訪問SparkSQL的數(shù)據(jù)。ThriftServer在啟動的時候,會啟動了一個SparkSQL的應用程序,而通過JDBC/ODBC連接進來的客戶端共同分享這個SparkSQL應用程序的資源,也就是說不同的用戶之間可以共享數(shù)據(jù);ThriftServer啟動時還開啟一個偵聽器,等待JDBC客戶端的連接和提交查詢。所以,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,如果要使用Hive數(shù)據(jù)的話,還要提供Hive Metastoreuris

      【注】Spark CLISpark Thrift Server實驗環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

      4.1 配置并啟動

      4.1.1 創(chuàng)建并配置hive-site.xml

      第一步   創(chuàng)建hive-site.xml配置文件

      $SPARK_HOME/conf目錄下修改hive-site.xml配置文件(如果在Spark SQL CLI中已經(jīng)添加,可以省略):

      $cd /app/hadoop/spark-1.1.0/conf

      $sudo vi hive-site.xml

      clip_image062

      第二步   修改配置文件

      設置hadoop1Metastore服務器,hadoop2Thrift Server服務器,配置內(nèi)容如下:

      <configuration>

        <property>

          <name>hive.metastore.uris</name>

          <value>thrift://hadoop1:9083</value>

          <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

        </property>

       

        <property>

          <name>hive.server2.thrift.min.worker.threads</name>

          <value>5</value>

          <description>Minimum number of Thrift worker threads</description>

        </property>

       

        <property>

          <name>hive.server2.thrift.max.worker.threads</name>

          <value>500</value>

          <description>Maximum number of Thrift worker threads</description>

        </property>

       

        <property>

          <name>hive.server2.thrift.port</name>

          <value>10000</value>

          <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>

        </property>

       

        <property>

          <name>hive.server2.thrift.bind.host</name>

          <value>hadoop2</value>

          <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>

        </property>

      </configuration>

      clip_image064

      4.1.2 啟動Hive

      hadoop1節(jié)點中,在后臺啟動Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動HadoopHDFS):

      $nohup hive --service metastore > metastore.log 2>&1 &

      clip_image066

      4.1.3 啟動Spark集群和Thrift Server

      hadoop1節(jié)點啟動Spark集群

      $cd /app/hadoop/spark-1.1.0/sbin

      $./start-all.sh

      hadoop2節(jié)點上進入SPARK_HOME/sbin目錄,使用如下命令啟動Thrift Server

      $cd /app/hadoop/spark-1.1.0/sbin

      $./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g

      clip_image068

      注意Thrift Server需要按照配置在hadoop2啟動!

      在集群監(jiān)控頁面可以看到啟動了SparkSQL應用程序:

      clip_image070

      4.1.4 命令參數(shù)

      使用sbin/start-thriftserver.sh --help可以查看ThriftServer的命令參數(shù):

      $sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]

              Thrift server options: Use value for given property

      clip_image072

      clip_image074

      其中[options] Thrift Server啟動一個SparkSQL應用程序的參數(shù),如果不設置--master的話,將在啟動Thrift Server的機器以local方式運行,只能通過http://機器名:4040進行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數(shù)。在集群中提供Thrift Server的話,一定要配置master、executor-memory等參數(shù)。

      [thrift server options]Thrift Server的參數(shù),可以使用-dproperty=value的格式來定義;在實際應用上,因為參數(shù)比較多,通常使用conf/hive-site.xml配置。

      4.2 實戰(zhàn)Thrift Server

      4.2.1 遠程客戶端連接

      可以在任意節(jié)點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,因為沒有采用權(quán)限管理,所以用戶名用運行bin/beeline的用戶hadoop,密碼為空:

      $cd /app/hadoop/spark-1.1.0/bin

      $./beeline

      beeline>!connect jdbc:hive2://hadoop2:10000

      clip_image076

      4.2.2 基本操作

      第一步   顯示hive數(shù)據(jù)庫所有表

      beeline>show database;

      beeline>use hive;

      beeline>show tables;

      clip_image078

      第二步   創(chuàng)建表testThrift

      beeline>create table testThrift(field1 String , field2 Int);

      beeline>show tables;

      clip_image080

      第三步   tbStockDetail表中金額大于3000插入到testThrift表中

      beeline>insert into table testThrift select ordernumber,amount from tbStockDetail  where amount>3000;

      beeline>select * from testThrift;

      clip_image082

      第四步   重新創(chuàng)建testThrift表中,把年度最大訂單插入該表中

      beeline>drop table testThrift;

      beeline>create table testThrift (field1 String , field2 Int);

      beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

      beeline>select * from testThrift;

      clip_image084

      4.2.3 計算所有訂單每年的訂單數(shù)

      第一步   執(zhí)行SQL語句

      spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

      第二步   執(zhí)行結(jié)果

      clip_image086

      Stage監(jiān)控頁面:

      clip_image088

      查看Details for Stage 28

      clip_image090

      4.2.4 計算所有訂單月銷售額前十名

      第一步   執(zhí)行SQL語句

      spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;

      第二步   執(zhí)行結(jié)果

      clip_image092

      Stage監(jiān)控頁面:

      clip_image094

      在其第一個Task中,從本地讀入數(shù)據(jù)

      clip_image096

      在后面的Task是從內(nèi)存中獲取數(shù)據(jù)

      clip_image098

      4.2.5 緩存表數(shù)據(jù)

      第一步   緩存數(shù)據(jù)

      beeline>cache table tbStock;

      beeline>select count(*) from tbStock;

      clip_image100

      第二步   運行4.2.4中的“計算所有訂單月銷售額前十名”

      beeline>select count(*) from tbStock;

      clip_image102

      本次計算劃給11.233秒,查看webUI,數(shù)據(jù)已經(jīng)緩存,緩存率為100%

      clip_image104

      第三步   在另外節(jié)點再次運行

      hadoop3節(jié)點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,然后直接運行對tbStock計數(shù)(注意沒有進行數(shù)據(jù)庫的切換):

      clip_image106

      用時0.343秒,再查看webUI中的stage

      clip_image108

      Locality LevelPROCESS,顯然是使用了緩存表。

      從上可以看出,ThriftServer可以連接多個JDBC/ODBC客戶端,并相互之間可以共享數(shù)據(jù)。順便提一句,ThriftServer啟動后處于監(jiān)聽狀態(tài),用戶可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。

      4.2.6 IDEAJDBC訪問

      有了ThriftServer,開發(fā)人員可以非常方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個scala代碼,查詢表tbStockDetail,返回amount>3000的單據(jù)號和交易金額:

      第一步   IDEA創(chuàng)建class6包和類JDBCofSparkSQL

      參見《Spark編程模型(下)--IDEA搭建及實戰(zhàn)》在IDEA中創(chuàng)建class6包并新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大于3000的訂單:

      package class6

      import java.sql.DriverManager

       

      object JDBCofSparkSQL {

        def main(args: Array[String]) {

          Class.forName("org.apache.hive.jdbc.HiveDriver")

          val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")

          try {

            val statement = conn.createStatement

      val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail  where amount>3000")

            while (rs.next) {

              val ordernumber = rs.getString("ordernumber")

              val amount = rs.getString("amount")

              println("ordernumber = %s, amount = %s".format(ordernumber, amount))

            }

          } catch {

            case e: Exception => e.printStackTrace

          }

          conn.close

        }

      }

      第二步   查看運行結(jié)果

      IDEA中可以觀察到,在運行日志窗口中沒有運行過程的日志,只顯示查詢結(jié)果

      clip_image110

      第三步   查看監(jiān)控結(jié)果

      Spark監(jiān)控界面中觀察到,該Job有一個編號為6Stage,該Stage2Task,分別運行在hadoop1hadoop2節(jié)點,獲取數(shù)據(jù)為NODE_LOCAL方式。

      clip_image112

      clip_image114

      clip_image116

      hadoop2中觀察Thrift Server運行日志如下:

      clip_image118

       

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多