RDD有幾種操作:Transformation(算子),Action(觸發(fā)作業(yè),的結(jié)果foreach、reduce、saveasTextFile等),Controller(性能和容錯(cuò)方面persist、cache、checkpoint)
reduce要符合交換律和結(jié)合律 foreach不可以進(jìn)行模式匹配 collect把各個(gè)節(jié)點(diǎn)計(jì)算的結(jié)果匯總到Driver package com.tom.spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* 計(jì)算相同行出現(xiàn)的次數(shù)
*/
object TextLines {
def main(args: Array[String]): Unit = {
val conf = new SparkConf() //創(chuàng)建SparkConf對(duì)象
conf.setAppName("Wow My First Spark App!") //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到
conf.setMaster("local") //此時(shí)程序在本地運(yùn)行,不需要安裝Spark集群
val sc = new SparkContext(conf) //創(chuàng)建SparkContext對(duì)象,通過(guò)傳入SparkConf實(shí)例,來(lái)定制Spark運(yùn)行的具體參數(shù)和配置信息
val lines = sc.textFile("F:/helloSpark.txt") //通過(guò)HadoopRDD以及MapPartitionsRDD獲取文件中每一行的內(nèi)容本身
val lineCount = lines.map( (_, 1)) //每一行變成行的內(nèi)容與1構(gòu)成的Tuple
val textLine = lineCount.reduceByKey(_ + _)
textLine.collect.foreach( pair => println(pair._1 + ":" +pair._2)) //collect是把結(jié)果抓到Driver上,foreach的Array中只有一個(gè)元素,只不過(guò)元素
是一個(gè)Tuple。
}
}
例子中Array中只有一個(gè)元素,只不過(guò)元素是一個(gè)Tuple。 shuffle觸發(fā)新的stage,action觸發(fā)job。saveAsTextJob內(nèi)部有Action,所以會(huì)觸發(fā)job

|