Spark SQL內(nèi)置函數(shù)解密與實(shí)戰(zhàn)1、Spark Sql內(nèi)置函數(shù)解析 2、Spark Sql內(nèi)置函數(shù)實(shí)戰(zhàn) /**
* scala代碼
*/
package com.tom.spark.sql
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
/**
* 使用Spark SQL中的內(nèi)置函數(shù)對(duì)數(shù)據(jù)進(jìn)行分析,與普通的Spark SQL API不同的是DataFrame內(nèi)置函數(shù)操作的結(jié)果是返回一個(gè)Column對(duì)象,
* 而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數(shù)據(jù)的復(fù)雜分析建立了堅(jiān)實(shí)的基礎(chǔ)
* 并提供了極大的方便性,例如在操作DataFrame的方法中可以隨時(shí)調(diào)用內(nèi)置函數(shù)進(jìn)行業(yè)務(wù)需要的處理,這之于我們構(gòu)建復(fù)雜的業(yè)務(wù)邏輯而言
* 非??梢詷O大地減少不必要的時(shí)間消耗(基本上就是實(shí)際模型的映射),讓我們聚焦在數(shù)據(jù)分析上,這對(duì)于提高工程師的生產(chǎn)力而言是非常有價(jià)值的。
* Spark 1.5.x開(kāi)始提供了大量的內(nèi)置函數(shù),例如agg:
* def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
* groupBy().agg(aggExpr, aggExprs : _*)
* }
* 還有max,mean,min,sum,avg,explode,size,sort_array,day,to_date,abs,acos,asin,atan
* 總體而言,內(nèi)置函數(shù)包含五大基本類型:
* 1,聚合函數(shù),例如countDistinct,sumDistinct等
* 2,集合函數(shù),例如sort_array
* 3,日期時(shí)間函數(shù),例如hour,quarter,next_day
* 4,數(shù)學(xué)函數(shù),例如asin,atan,sqrt,tan,round等;
* 5,開(kāi)窗函數(shù),例如rowNumber等
* 6,字符串函數(shù),concat,format_number,rexexp_extract
* 7,其它函數(shù),isNaN,sha,randn,callUDF,callUDAF
*/
object SparkSQLAgg {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkSQLAgg")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//要使用Spark SQL的內(nèi)置函數(shù),就一定要導(dǎo)入SQLContext下的隱式轉(zhuǎn)換
import sqlContext.implicits._
/**
* 模擬電商訪問(wèn)的數(shù)據(jù),實(shí)際情況會(huì)比模擬數(shù)據(jù)復(fù)雜很多,最后生成RDD
*/
val userData = Array(
"2016-3-27,001,http://spark./,1000",
"2016-3-27,001,http://hadoop./,1001",
"2016-3-27,002,http://flink./,1002",
"2016-3-28,003,http://kafka./,1020",
"2016-3-28,004,http://spark./,1010",
"2016-3-28,002,http://hive./,1200",
"2016-3-28,001,http://parquet./,1500",
"2016-3-28,001,http://spark./,1800"
)
val userDataRDD = sc.parallelize(userData) //生成RDD分布式集合對(duì)象
/**
* 根據(jù)業(yè)務(wù)需要對(duì)數(shù)據(jù)進(jìn)行預(yù)處理生成DataFrame,要想把RDD轉(zhuǎn)換成DataFrame,需要先把RDD中的元素類型變成Row類型,
* 與此同時(shí)要提供DataFrame中的columns的元數(shù)據(jù)信息描述
*/
val userDataRDDRow = userDataRDD.map(row=>{
val splited = row.split(",")
Row(splited(0), splited(1).toInt, splited(2), splited(3).toInt)
})
val structType = StructType(Array(
StructField("time", StringType, true),
StructField("id", IntegerType, true),
StructField("url", StringType, true),
StructField("amount", IntegerType, true)
))
val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structType)
/**
* 使用Spark SQL提供的內(nèi)置函數(shù)對(duì)DataFrame進(jìn)行操作,特別注意:內(nèi)置函數(shù)生成的Column對(duì)象且自動(dòng)進(jìn)行CG
*/
userDataDF.groupBy("time").agg('time, countDistinct('id))
.map(row=> Row(row(1),row(2))).collect.foreach(println)
userDataDF.groupBy("time").agg('time, sum('amount)).show
}
}
|