Apache Spark社區(qū)剛剛發(fā)布了 1.5 版本,大家一定想知道這個版本的主要變化,這篇文章告訴你答案。 DataFrame執(zhí)行后端優(yōu)化(Tungsten第一階段)DataFrame可以說是整個Spark項目最核心的部分,在1.5這個開發(fā)周期內(nèi)最大的變化就是Tungsten項目的 第一階段 已經(jīng)完成。主要的變化是 由 Spark自己來管理內(nèi)存而不是使用JVM,這樣可以避免JVM GC帶來的性能損失。內(nèi)存中的Java對象被存儲成Spark自己的二進制格式,計算直接發(fā)生在二進制格式上,省去了序列化和反序列化時間。同時這種格式也更加緊湊,節(jié)省內(nèi)存空間,而且能更好的估計數(shù)據(jù)量大小和內(nèi)存使用情況。如果大家對這部分的代碼感興趣,可以在源代碼里面搜索那些Unsafe開頭的類即可。在1.4版本只提供UnsafeShuffleManager等少數(shù)功能,剩下的大部分都是1.5版本新加入的功能。 其他優(yōu)化還包括默認使用code generation; cache-aware算法對join, aggregation, shuffle, sorting的增強;window function性能的提高等。 那么性能到底能提升多少呢?可以參考DataBricks給出的這個例子。這是一個16 million行的記錄,有1 million的組合鍵的aggregation查詢分別使用Spark 1.4和1.5版本的性能對比,在這個測試中都是使用的默認配置。 那么如果我們想自己測試下Tungsten第一階段的性能改如何測試呢?Spark 1.4以前的版本中spark.sql.codegen, spark.sql.unsafe.enabled等幾個參數(shù)在1.5版本里面合并成spark.sql.tungsten.enabled并默認為true,只需要修改這一個參數(shù)就可以配置是否開啟tungsten優(yōu)化(默認是開啟的)。 DataFrame/SQL/Hive在DataFrame API方面,實現(xiàn)了新的聚合函數(shù)接口AggregateFunction2以及7個相應(yīng)的build-in的聚合函數(shù),同時基于新接口實現(xiàn)了相應(yīng)的UDAF接口。新的聚合函數(shù)接口把一個聚合函數(shù)拆解為三個動作: initialize/update/merge,然后用戶只需要定義其中的邏輯既可以實現(xiàn)不同的聚合函數(shù)功能。Spark的這個新的聚合函數(shù)實現(xiàn)方法和 Impala 里面非常類似。 Spark內(nèi)置的 expression function 得到了很大的增強,實現(xiàn)了100多個這樣的常用函數(shù),例如string, math, unix_timestamp, from_unixtime, to_date等。同時在處理NaN值的一些特性也在增強,例如 NaN = Nan 返回true;NaN大于任何其他值等約定都越來越符合SQL界的規(guī)則了。 用戶可以在執(zhí)行join操作的時候指定把左邊的表或者右邊的表broadcast出去,因為基于 cardinality的估計并不是每次都是很準的,如果用戶對數(shù)據(jù)了解可以直接指定哪個表更小從而被broadcast出去。 Hive模塊最大的變化是支持連接Hive 1.2版本的metastore,同時支持metastore partition pruning(通過spark.sql.hive.metastorePartitionPruning=true開啟,默認為false)。因為很多公司的Hive集群都升級到了1.2以上,那么這個改進對于需要訪問Hive元數(shù)據(jù)的Spark集群來說非常重要。Spark 1.5支持可以連接Hive 0.13, 0.14, 1.0/0.14.1, 1.1, 1.2的metastore。 在External Data Source方面,Parquet的支持有了很大的加強。Parquet的版本升級到1.7;更快的metadata discovery和schema merging;同時能夠讀取其他工具或者庫生成的非標準合法的parquet文件;以及更快更魯棒的動態(tài)分區(qū)插入。 由于Parquet升級到1.7,原來的一個重要 bug 被修復(fù),所以Spark SQL的Filter Pushdown默認改為開啟狀態(tài)(spark.sql.parquet.filterPushdown=true),能夠幫助查詢過濾掉不必要的IO。 Spark 1.5可以通過指定spark.sql.parquet.output.committer.class參數(shù)選擇不同的output committer類,默認是org.apache.parquet.hadoop.ParquetOutputCommitter,用戶可以繼承這個類實現(xiàn)自己的output committer。由于HDFS和S3這兩種文件存儲系統(tǒng)的區(qū)別,如果需要向S3里面寫入數(shù)據(jù),可以使用DirectParquetOutputCommitter,能夠有效提高寫效率,從而加快Job執(zhí)行速度。 另外還有一些改動,包括:StructType支持排序功能;TimestampType的精度減小到1us;Spark現(xiàn)在的checkpoint是基于HDFS的,從1.5版本開始支持 基于memory和local disk的checkpoint 。這種類型的checkpoint性能更快,雖然不如基于HDFS的可靠,但是對于迭代型機器學(xué)習(xí)運算還是很有幫助的。 機器學(xué)習(xí)MLlibMLlib最大的變化就是從一個機器學(xué)習(xí)的library開始轉(zhuǎn)向構(gòu)建一個機器學(xué)習(xí)工作流的系統(tǒng),這些變化發(fā)生在ML包里面。MLlib模塊下現(xiàn)在有兩個包:MLlib和ML。ML把整個機器學(xué)習(xí)的過程抽象成 Pipeline ,一個Pipeline是由多個Stage組成,每個Stage是Transformer或者Estimator。 以前機器學(xué)習(xí)工程師要花費大量時間在training model之前的feature的抽取、轉(zhuǎn)換等準備工作。ML提供了多個Transformer,極大提高了這些工作的效率。在1.5版本之后,已經(jīng)有了25+個feature transformer,其中 CountVectorizer, Discrete Cosine Transformation, MinMaxScaler, NGram, PCA, RFormula, StopWordsRemover, and VectorSlicer這些feature transformer都是1.5版本新添加的,做機器學(xué)習(xí)的朋友可以看看哪些滿足你的需求。 這里面的一個亮點就是RFormula的支持,目標是使用戶可以把原來用R寫的機器學(xué)習(xí)程序(目前只支持GLM算法)不用修改直接搬到Spark平臺上來執(zhí)行。不過目前只支持集中簡單 的R公式(包括'.', '~', '+'和 '-'),社區(qū)在接下來的版本中會增強這項功能。 另外越來越多的算法也作為Estimator搬到了ML下面,在1.5版本中新搬過來的有Naive Bayes, K-means, Isotonic Regression等。大家不要以為只是簡單的在ML下面提供一個調(diào)用相應(yīng)算法的API,這里面變換還是挺多的。例如Naive Bayes原來的模型分別用Array[Double]和Array[Array[Double]]來存儲pi和theta,而在ML下面新的API里面使用的是Vector和Matrix來存儲。從這也可以看出,新的ML框架下所有的數(shù)據(jù)源都是基于DataFrame,所有的模型也盡量都基于Spark的數(shù)據(jù)類型表示。在ML里面的public API下基本上看不到對RDD的直接操作了,這也與Tungsten項目的設(shè)計目標是一致的。 除了 這些既有的算法在ML API下的實現(xiàn),ML里面也增加了幾個新算法:
另外還有一些現(xiàn)有算法的增強:LDA算法,決策樹和 ensemble算法,GMM算法。
1.5版本的Python API也在不斷加強,越來越多的算法和功能的Python API基本上與Scala API對等了。此外在tuning和evaluator上也有增強。 其他從1.5開始,Standalone, YARN和Mesos三種部署方式全部支持了動態(tài)資源分配。 SparkR支持運行在YARN集群上,同時DataFrame的函數(shù)也提供了一些R風(fēng)格的別名,可以降低熟悉R的用戶的遷移成本。 在Streaming和Graphx方面也有非常大的改進,在這里不在一一贅述,詳細可以參考 release note 。 |
|