【注】該系列文章以及使用到安裝包/測試數(shù)據(jù) 可以在《傾情大奉送--Spark入門實戰(zhàn)系列》獲取1、SparkSQL的發(fā)展歷程1.1 Hive and SharkSparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,它是當時唯一運行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運行效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產(chǎn)生,其中表現(xiàn)較為突出的是: l MapR的Drill l Cloudera的Impala l Shark 其中Shark是伯克利實驗室Spark生態(tài)環(huán)境的組件之一,它修改了下圖所示的右下角的內(nèi)存管理、物理計劃、執(zhí)行三個模塊,并使之能運行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。 1.2 Shark和SparkSQL但是,隨著Spark的發(fā)展,對于野心勃勃的Spark團隊來說,Shark對于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,所以提出了SparkSQL項目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優(yōu)點,如內(nèi)存列存儲(In-Memory Columnar Storage)、Hive兼容性等,重新開發(fā)了SparkSQL代碼;由于擺脫了對Hive的依賴性,SparkSQL無論在數(shù)據(jù)兼容、性能優(yōu)化、組件擴展方面都得到了極大的方便,真可謂“退一步,海闊天空”。 l數(shù)據(jù)兼容方面 不但兼容Hive,還可以從RDD、parquet文件、JSON文件中獲取數(shù)據(jù),未來版本甚至支持獲取RDBMS數(shù)據(jù)以及cassandra等NOSQL數(shù)據(jù); l性能優(yōu)化方面 除了采取In-Memory Columnar Storage、byte-code generation等優(yōu)化技術外、將會引進Cost Model對查詢進行動態(tài)評估、獲取最佳物理計劃等等; l組件擴展方面 無論是SQL的語法解析器、分析器還是優(yōu)化器都可以重新定義,進行擴展。 2014年6月1日Shark項目和SparkSQL項目的主持人Reynold Xin宣布:停止對Shark的開發(fā),團隊將所有資源放SparkSQL項目上,至此,Shark的發(fā)展畫上了句話,但也因此發(fā)展出兩個直線:SparkSQL和Hive on Spark。 其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一個Hive的發(fā)展計劃,該計劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限于一個引擎,可以采用Map-Reduce、Tez、Spark等引擎。 1.3 SparkSQL的性能Shark的出現(xiàn),使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高: 那么,擺脫了Hive的限制,SparkSQL的性能又有怎么樣的表現(xiàn)呢?雖然沒有Shark相對于Hive那樣矚目地性能提升,但也表現(xiàn)得非常優(yōu)異: 為什么SparkSQL的性能會得到怎么大的提升呢?主要SparkSQL在下面幾點做了優(yōu)化: A:內(nèi)存列存儲(In-Memory Columnar Storage) SparkSQL的表數(shù)據(jù)在內(nèi)存中存儲不是采用原生態(tài)的JVM對象存儲方式,而是采用內(nèi)存列存儲,如下圖所示。 該存儲方式無論在空間占用量和讀取吞吐率上都占有很大優(yōu)勢。 對于原生態(tài)的JVM對象存儲方式,每個對象通常要增加12-16字節(jié)的額外開銷,對于一個270MB的TPC-H lineitem table數(shù)據(jù),使用這種方式讀入內(nèi)存,要使用970MB左右的內(nèi)存空間(通常是2~5倍于原生數(shù)據(jù)空間);另外,使用這種方式,每個數(shù)據(jù)記錄產(chǎn)生一個JVM對象,如果是大小為200B的數(shù)據(jù)記錄,32G的堆棧將產(chǎn)生1.6億個對象,這么多的對象,對于GC來說,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數(shù)量呈線性相關)。顯然這種內(nèi)存存儲方式對于基于內(nèi)存計算的Spark來說,很昂貴也負擔不起。 對于內(nèi)存列存儲來說,將所有原生數(shù)據(jù)類型的列采用原生數(shù)組來存儲,將Hive支持的復雜數(shù)據(jù)類型(如array、map等)先序化后并接成一個字節(jié)數(shù)組來存儲。這樣,每個列創(chuàng)建一個JVM對象,從而導致可以快速的GC和緊湊的數(shù)據(jù)存儲;額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低內(nèi)存開銷;更有趣的是,對于分析查詢中頻繁使用的聚合特定列,性能會得到很大的提高,原因就是這些列的數(shù)據(jù)放在一起,更容易讀入內(nèi)存進行計算。 B:字節(jié)碼生成技術(bytecode generation,即CG) 在數(shù)據(jù)庫查詢中有一個昂貴的操作是查詢語句中的表達式,主要是由于JVM的內(nèi)存模型引起的。比如如下一個查詢: SELECT a + b FROM table 在這個查詢里,如果采用通用的SQL語法途徑去處理,會先生成一個表達式樹(有兩個節(jié)點的Add樹,參考后面章節(jié)),在物理處理這個表達式樹的時候,將會如圖所示的7個步驟: 1. 調(diào)用虛函數(shù)Add.eval(),需要確認Add兩邊的數(shù)據(jù)類型 2. 調(diào)用虛函數(shù)a.eval(),需要確認a的數(shù)據(jù)類型 3. 確定a的數(shù)據(jù)類型是Int,裝箱 4. 調(diào)用虛函數(shù)b.eval(),需要確認b的數(shù)據(jù)類型 5. 確定b的數(shù)據(jù)類型是Int,裝箱 6. 調(diào)用Int類型的Add 7. 返回裝箱后的計算結(jié)果 其中多次涉及到虛函數(shù)的調(diào)用,虛函數(shù)的調(diào)用會打斷CPU的正常流水線處理,減緩執(zhí)行。 Spark1.1.0在catalyst模塊的expressions增加了codegen模塊,如果使用動態(tài)字節(jié)碼生成技術(配置spark.sql.codegen參數(shù)),SparkSQL在執(zhí)行物理計劃的時候,對匹配的表達式采用特定的代碼,動態(tài)編譯,然后運行。如上例子,匹配到Add方法: 然后,通過調(diào)用,最終調(diào)用: 最終實現(xiàn)效果類似如下偽代碼: val a: Int = inputRow.getInt(0) val b: Int = inputRow.getInt(1) val result: Int = a + b resultRow.setInt(0, result) 對于Spark1.1.0,對SQL表達式都作了CG優(yōu)化,具體可以參看codegen模塊。CG優(yōu)化的實現(xiàn)主要還是依靠scala2.10的運行時放射機制(runtime reflection)。對于SQL查詢的CG優(yōu)化,可以簡單地用下圖來表示: C:Scala代碼優(yōu)化 另外,SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對于用戶來說,還是使用統(tǒng)一的接口,沒受到使用上的困難。下圖是一個Scala代碼優(yōu)化的示意圖: 2、 SparkSQL運行架構(gòu)類似于關系型數(shù)據(jù)庫,SparkSQL也是語句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)組成,分別對應sql查詢過程中的Result、Data Source、Operation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。
當執(zhí)行SparkSQL語句的順序為: 1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規(guī)范; 2.將SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列、表、視圖等等)進行綁定(Bind),如果相關的Projection、Data Source等都是存在的話,就表示這個SQL語句是可以執(zhí)行的; 3.一般的數(shù)據(jù)庫會提供幾個執(zhí)行計劃,這些計劃一般都有運行統(tǒng)計數(shù)據(jù),數(shù)據(jù)庫會在這些計劃中選擇一個最優(yōu)計劃(Optimize); 4.計劃執(zhí)行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執(zhí)行過程有時候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運行剛運行過的SQL語句,可能直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。 2.1 Tree和RuleSparkSQL對SQL語句的處理和關系型數(shù)據(jù)庫對SQL語句的處理采用了類似的方法,首先會將SQL語句進行解析(Parse),然后形成一個Tree,在后續(xù)的如綁定、優(yōu)化等處理過程都是對Tree的操作,而操作的方法是采用Rule,通過模式匹配,對不同類型的節(jié)點采用不同的操作。在整個sql語句的處理過程中,Tree和Rule相互配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優(yōu)化、物理計劃等過程,最終生成可以執(zhí)行的物理計劃。 2.1.1 Treel Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees l Logical Plans、Expressions、Physical Operators都可以使用Tree表示 l Tree的具體操作是通過TreeNode來實現(xiàn)的 ? SparkSQL定義了catalyst.trees的日志,通過這個日志可以形象的表示出樹的結(jié)構(gòu) ? TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進行操作 ? 有了TreeNode,通過Tree中各個TreeNode之間的關系,可以對Tree進行遍歷操作,如使用transformDown、transformUp將Rule應用到給定的樹段,然后用結(jié)果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對一個給定的節(jié)點進行操作,通過迭代將Rule應用到該節(jié)點以及子節(jié)點。 l TreeNode可以細分成三種類型的Node: ? UnaryNode 一元節(jié)點,即只有一個子節(jié)點。如Limit、Filter操作 ? BinaryNode 二元節(jié)點,即有左右子節(jié)點的二叉節(jié)點。如Jion、Union操作 ? LeafNode 葉子節(jié)點,沒有子節(jié)點的節(jié)點。主要用戶命令類操作,如SetCommand
2.1.2 Rulel Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules l Rule在SparkSQL的Analyzer、Optimizer、SparkPlan等各個組件中都有應用到 l Rule是一個抽象類,具體的Rule實現(xiàn)是通過RuleExecutor完成 l Rule通過定義batch和batchs,可以簡便的、模塊化地對Tree進行transform操作 l Rule通過定義Once和FixedPoint,可以對Tree進行一次操作或多次操作(如對某些Tree進行多次迭代操作的時候,達到FixedPoint次數(shù)迭代或達到前后兩次的樹結(jié)構(gòu)沒變化才停止操作,具體參看RuleExecutor.apply) 2.2 sqlContext和hiveContext的運行過程SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext現(xiàn)在只支持SQL語法解析器(SQL-92語法);hiveContext現(xiàn)在支持SQL語法解析器和hivesql語法解析器,默認為hiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法, 2.2.1 sqlContext的運行過程sqlContext總的一個過程如下圖所示: 1.SQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan; 2.使用analyzer結(jié)合數(shù)據(jù)數(shù)據(jù)字典(catalog)進行綁定,生成resolvedLogicalPlan; 3.使用optimizer對resolvedLogicalPlan進行優(yōu)化,生成optimizedLogicalPlan; 4.使用SparkPlan將LogicalPlan轉(zhuǎn)換成PhysicalPlan; 5.使用prepareForExecution()將PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計劃; 6.使用execute()執(zhí)行可執(zhí)行物理計劃; 7.生成SchemaRDD。 在整個運行過程中涉及到多個SparkSQL的組件,如SqlParse、analyzer、optimizer、SparkPlan等等 2.2.2hiveContext的運行過程hiveContext總的一個過程如下圖所示: 1.SQL語句經(jīng)過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程中對hiveql語句使用getAst()獲取AST樹,然后再進行解析; 2.使用analyzer結(jié)合數(shù)據(jù)hive源數(shù)據(jù)Metastore(新的catalog)進行綁定,生成resolved LogicalPlan; 3.使用optimizer對resolved LogicalPlan進行優(yōu)化,生成optimized LogicalPlan,優(yōu)化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理; 4.使用hivePlanner將LogicalPlan轉(zhuǎn)換成PhysicalPlan; 5.使用prepareForExecution()將PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計劃; 6.使用execute()執(zhí)行可執(zhí)行物理計劃; 7.執(zhí)行后,使用map(_.copy)將結(jié)果導入SchemaRDD。 2.3 catalyst優(yōu)化器SparkSQL1.1總體上由四個模塊組成:core、catalyst、hive、hive-Thriftserver: l core處理數(shù)據(jù)的輸入輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù)(RDD、Parquet、json等),將查詢結(jié)果輸出成schemaRDD; l catalyst處理查詢語句的整個處理過程,包括解析、綁定、優(yōu)化、物理計劃等,說其是優(yōu)化器,還不如說是查詢引擎; l hive對hive數(shù)據(jù)的處理 l hive-ThriftServer提供CLI和JDBC/ODBC接口 在這四個模塊中,catalyst處于最核心的部分,其性能優(yōu)劣將影響整體的性能。由于發(fā)展時間尚短,還有很多不足的地方,但其插件式的設計,為未來的發(fā)展留下了很大的空間。下面是catalyst的一個設計圖:
其中虛線部分是以后版本要實現(xiàn)的功能,實線部分是已經(jīng)實現(xiàn)的功能。從上圖看,catalyst主要的實現(xiàn)組件有: lsqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器; lAnalyzer,主要完成綁定工作,將不同來源的Unresolved LogicalPlan和數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan; loptimizer對resolved LogicalPlan進行優(yōu)化,生成optimized LogicalPlan; l Planner將LogicalPlan轉(zhuǎn)換成PhysicalPlan; l CostModel,主要根據(jù)過去的性能統(tǒng)計數(shù)據(jù),選擇最佳的物理執(zhí)行計劃 這些組件的基本實現(xiàn)方法: l 先將sql語句通過解析生成Tree,然后在不同階段使用不同的Rule應用到Tree上,通過轉(zhuǎn)換完成各個組件的功能。 l Analyzer使用Analysis Rules,配合數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉(zhuǎn)換成resolved LogicalPlan; l optimizer使用Optimization Rules,對resolved LogicalPlan進行合并、列裁剪、過濾器下推等優(yōu)化作業(yè)而轉(zhuǎn)換成optimized LogicalPlan; l Planner使用Planning Strategies,對optimized LogicalPlan 3、SparkSQL CLICLI(Command-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執(zhí)行指令的界面,它通常不支持鼠標,用戶通過鍵盤輸入指令,計算機接收到指令后予以執(zhí)行。Spark CLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進行執(zhí)行,在界面中顯示運行過程和最終的結(jié)果。 Spark1.1相較于Spark1.0最大的差別就在于Spark1.1增加了Spark SQL CLI和ThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數(shù)據(jù)庫管理員較容易地上手,真正意義上進入了SQL時代。 【注】Spark CLI和Spark Thrift Server實驗環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建 3.1 運行環(huán)境說明3.1.1 硬軟件環(huán)境l 主機操作系統(tǒng):Windows 64位,雙核4線程,主頻2.2G,10G內(nèi)存 l 虛擬軟件:VMware? Workstation 9.0.0 build-812388 l 虛擬機操作系統(tǒng):CentOS 64位,單核 l 虛擬機運行環(huán)境: ? JDK:1.7.0_55 64位 ? Hadoop:2.2.0(需要編譯為64位) ? Scala:2.11.4 ? Spark:1.1.0(需要編譯) ? Hive:0.13.1 3.1.2 機器網(wǎng)絡環(huán)境集群包含三個節(jié)點,節(jié)點之間可以免密碼SSH訪問,節(jié)點IP地址和主機名分布如下:
3.2 配置并啟動3.2.1 創(chuàng)建并配置hive-site.xml在運行Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下創(chuàng)建hive-site.xml文件,然后在該配置文件中,添加hive.metastore.uris屬性,具體如下: <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://hadoop1:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property> </configuration> 3.2.2 啟動Hive在使用Spark SQL CLI之前需要啟動Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動Hadoop的HDFS),使用如下命令可以使Hive Metastore啟動后運行在后臺,可以通過jobs查詢: $nohup hive --service metastore > metastore.log 2>&1 & 3.2.3 啟動Spark集群和Spark SQL CLI通過如下命令啟動Spark集群和Spark SQL CLI: $cd /app/hadoop/spark-1.1.0 $sbin/start-all.sh $bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g 在集群監(jiān)控頁面可以看到啟動了SparkSQL應用程序: 這時就可以使用HQL語句對Hive數(shù)據(jù)進行查詢,另外可以使用COMMAND,如使用set進行設置參數(shù):默認情況下,SparkSQL Shuffle的時候是200個partition,可以使用如下命令修改該參數(shù): SET spark.sql.shuffle.partitions=20; 運行同一個查詢語句,參數(shù)改變后,Task(partition)的數(shù)量就由200變成了20。 3.2.4 命令參數(shù)通過bin/spark-sql --help可以查看CLI命令參數(shù): 其中[options] 是CLI啟動一個SparkSQL應用程序的參數(shù),如果不設置--master的話,將在啟動spark-sql的機器以local方式運行,只能通過http://機器名:4040進行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數(shù)。 [cli option]是CLI的參數(shù),通過這些參數(shù)CLI可以直接運行SQL文件、進入命令行運行SQL命令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到Hive的Metastore,然后對Hive數(shù)據(jù)進行查詢。 3.3 實戰(zhàn)Spark SQL CLI3.3.1 獲取訂單每年的銷售單數(shù)、銷售總額第一步 設置任務個數(shù),在這里修改為20個 spark-sql>SET spark.sql.shuffle.partitions=20; 第二步 運行SQL語句 spark-sql>use hive; spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; 第三步 查看運行結(jié)果 3.3.2 計算所有訂單每年的總金額第一步 執(zhí)行SQL語句 spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; 第二步 執(zhí)行結(jié)果 使用CLI執(zhí)行結(jié)果如下: 3.3.3 計算所有訂單每年最大金額訂單的銷售額spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear; 使用CLI執(zhí)行結(jié)果如下:
4、Spark Thrift ServerThriftServer是一個JDBC/ODBC接口,用戶可以通過JDBC/ODBC連接ThriftServer來訪問SparkSQL的數(shù)據(jù)。ThriftServer在啟動的時候,會啟動了一個SparkSQL的應用程序,而通過JDBC/ODBC連接進來的客戶端共同分享這個SparkSQL應用程序的資源,也就是說不同的用戶之間可以共享數(shù)據(jù);ThriftServer啟動時還開啟一個偵聽器,等待JDBC客戶端的連接和提交查詢。所以,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,如果要使用Hive數(shù)據(jù)的話,還要提供Hive Metastore的uris。 【注】Spark CLI和Spark Thrift Server實驗環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建 4.1 配置并啟動4.1.1 創(chuàng)建并配置hive-site.xml第一步 創(chuàng)建hive-site.xml配置文件 在$SPARK_HOME/conf目錄下修改hive-site.xml配置文件(如果在Spark SQL CLI中已經(jīng)添加,可以省略): $cd /app/hadoop/spark-1.1.0/conf $sudo vi hive-site.xml 第二步 修改配置文件 設置hadoop1為Metastore服務器,hadoop2為Thrift Server服務器,配置內(nèi)容如下: <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://hadoop1:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property>
<property> <name>hive.server2.thrift.min.worker.threads</name> <value>5</value> <description>Minimum number of Thrift worker threads</description> </property>
<property> <name>hive.server2.thrift.max.worker.threads</name> <value>500</value> <description>Maximum number of Thrift worker threads</description> </property>
<property> <name>hive.server2.thrift.port</name> <value>10000</value> <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description> </property>
<property> <name>hive.server2.thrift.bind.host</name> <value>hadoop2</value> <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description> </property> </configuration> 4.1.2 啟動Hive在hadoop1節(jié)點中,在后臺啟動Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動Hadoop的HDFS): $nohup hive --service metastore > metastore.log 2>&1 & 4.1.3 啟動Spark集群和Thrift Server在hadoop1節(jié)點啟動Spark集群 $cd /app/hadoop/spark-1.1.0/sbin $./start-all.sh 在hadoop2節(jié)點上進入SPARK_HOME/sbin目錄,使用如下命令啟動Thrift Server $cd /app/hadoop/spark-1.1.0/sbin $./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g 注意:Thrift Server需要按照配置在hadoop2啟動! 在集群監(jiān)控頁面可以看到啟動了SparkSQL應用程序: 4.1.4 命令參數(shù)使用sbin/start-thriftserver.sh --help可以查看ThriftServer的命令參數(shù): $sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options] Thrift server options: Use value for given property 其中[options] 是Thrift Server啟動一個SparkSQL應用程序的參數(shù),如果不設置--master的話,將在啟動Thrift Server的機器以local方式運行,只能通過http://機器名:4040進行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數(shù)。在集群中提供Thrift Server的話,一定要配置master、executor-memory等參數(shù)。 [thrift server options]是Thrift Server的參數(shù),可以使用-dproperty=value的格式來定義;在實際應用上,因為參數(shù)比較多,通常使用conf/hive-site.xml配置。 4.2 實戰(zhàn)Thrift Server4.2.1 遠程客戶端連接可以在任意節(jié)點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,因為沒有采用權(quán)限管理,所以用戶名用運行bin/beeline的用戶hadoop,密碼為空: $cd /app/hadoop/spark-1.1.0/bin $./beeline beeline>!connect jdbc:hive2://hadoop2:10000 4.2.2 基本操作第一步 顯示hive數(shù)據(jù)庫所有表 beeline>show database; beeline>use hive; beeline>show tables; 第二步 創(chuàng)建表testThrift beeline>create table testThrift(field1 String , field2 Int); beeline>show tables; 第三步 把tbStockDetail表中金額大于3000插入到testThrift表中 beeline>insert into table testThrift select ordernumber,amount from tbStockDetail where amount>3000; beeline>select * from testThrift; 第四步 重新創(chuàng)建testThrift表中,把年度最大訂單插入該表中 beeline>drop table testThrift; beeline>create table testThrift (field1 String , field2 Int); beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear; beeline>select * from testThrift; 4.2.3 計算所有訂單每年的訂單數(shù)spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; Stage監(jiān)控頁面: 查看Details for Stage 28 4.2.4 計算所有訂單月銷售額前十名spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10; Stage監(jiān)控頁面: 在其第一個Task中,從本地讀入數(shù)據(jù) 在后面的Task是從內(nèi)存中獲取數(shù)據(jù) 4.2.5 緩存表數(shù)據(jù)第一步 緩存數(shù)據(jù) beeline>cache table tbStock; beeline>select count(*) from tbStock; 第二步 運行4.2.4中的“計算所有訂單月銷售額前十名” beeline>select count(*) from tbStock; 本次計算劃給11.233秒,查看webUI,數(shù)據(jù)已經(jīng)緩存,緩存率為100%: 第三步 在另外節(jié)點再次運行 在hadoop3節(jié)點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,然后直接運行對tbStock計數(shù)(注意沒有進行數(shù)據(jù)庫的切換): 用時0.343秒,再查看webUI中的stage: Locality Level是PROCESS,顯然是使用了緩存表。 從上可以看出,ThriftServer可以連接多個JDBC/ODBC客戶端,并相互之間可以共享數(shù)據(jù)。順便提一句,ThriftServer啟動后處于監(jiān)聽狀態(tài),用戶可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。 4.2.6 在IDEA中JDBC訪問有了ThriftServer,開發(fā)人員可以非常方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個scala代碼,查詢表tbStockDetail,返回amount>3000的單據(jù)號和交易金額: 第一步 在IDEA創(chuàng)建class6包和類JDBCofSparkSQL 參見《Spark編程模型(下)--IDEA搭建及實戰(zhàn)》在IDEA中創(chuàng)建class6包并新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大于3000的訂單: package class6 import java.sql.DriverManager
object JDBCofSparkSQL { def main(args: Array[String]) { Class.forName("org.apache.hive.jdbc.HiveDriver") val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "") try { val statement = conn.createStatement val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail where amount>3000") while (rs.next) { val ordernumber = rs.getString("ordernumber") val amount = rs.getString("amount") println("ordernumber = %s, amount = %s".format(ordernumber, amount)) } } catch { case e: Exception => e.printStackTrace } conn.close } } 第二步 查看運行結(jié)果 在IDEA中可以觀察到,在運行日志窗口中沒有運行過程的日志,只顯示查詢結(jié)果 第三步 查看監(jiān)控結(jié)果 從Spark監(jiān)控界面中觀察到,該Job有一個編號為6的Stage,該Stage有2個Task,分別運行在hadoop1和hadoop2節(jié)點,獲取數(shù)據(jù)為NODE_LOCAL方式。 在hadoop2中觀察Thrift Server運行日志如下:
|
|