乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      SparkShell和IDEA中編寫Spark程序

       好程序員IT 2019-05-28

      spark-shell是Spark自帶的交互式Shell程序,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用Scala編寫Spark程序。spark-shell程序一般用作Spark程序測(cè)試練習(xí)來用。spark-shell屬于Spark的特殊應(yīng)用程序,我們可以在這個(gè)特殊的應(yīng)用程序中提交應(yīng)用程序

      spark-shell啟動(dòng)有兩種模式,local模式和cluster模式,分別為

      local模式:

      spark-shell

      local模式僅在本機(jī)啟動(dòng)一個(gè)SparkSubmit進(jìn)程,沒有與集群建立聯(lián)系,雖然進(jìn)程中有SparkSubmit但是不會(huì)被提交到集群紅


      Cluster模式(集群模式):

      spark-shell \
      --master spark://hadoop01:7077 \
      --executor-memory 512m \
      --total-executor-cores 1

      后兩個(gè)命令不是必須的 --master這條命令是必須的(除非在jar包中已經(jīng)指可以不指定,不然就必須指定)

      退出shell

      千萬不要ctrl+c spark-shell 正確退出 :quit 千萬不要ctrl+c退出 這樣是錯(cuò)誤的 若使用了ctrl+c退出 使用命令查看監(jiān)聽端口 netstat - apn | grep 4040 在使用kill -9 端口號(hào) 殺死即可

      3.25.11 spark2.2shellspark1.6shell對(duì)比


      ps:啟動(dòng)spark-shell若是集群模式,在webUI會(huì)有一個(gè)一直執(zhí)行的任務(wù)

      通過IDEA創(chuàng)建Spark工程

      ps:工程創(chuàng)建之前步驟省略,在scala中已經(jīng)講解,直接默認(rèn)是創(chuàng)建好工程的

      對(duì)工程中的pom.xml文件配置

       <!-- 聲明公有的屬性 -->
      <properties>
              <maven.compiler.source>1.8</maven.compiler.source>
              <maven.compiler.target>1.8</maven.compiler.target>
              <encoding>UTF-8</encoding>
              <scala.version>2.11.8</scala.version>
              <spark.version>2.2.0</spark.version>
              <hadoop.version>2.7.1</hadoop.version>
              <scala.compat.version>2.11</scala.compat.version>
          </properties>
      <!-- 聲明并引入公有的依賴 -->
          <dependencies>
              <dependency>
                  <groupId>org.scala-lang</groupId>
                  <artifactId>scala-library</artifactId>
                  <version>${scala.version}</version>
              </dependency>
              <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId>spark-core_2.11</artifactId>
              <version>${spark.version}</version>
          </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-client</artifactId>
                  <version>${hadoop.version}</version>
              </dependency>
          </dependencies> 

      Spark實(shí)現(xiàn)WordCount程序

      Scala版本
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}

      object SparkWordCount {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")
          //創(chuàng)建sparkContext對(duì)象
          val sc =  new SparkContext(conf)
          //通過sparkcontext對(duì)象就可以處理數(shù)據(jù)
          //讀取文件 參數(shù)是一個(gè)String類型的字符串 傳入的是路徑
          val lines: RDD[String] = sc.textFile(dir/wordcount)
          //切分?jǐn)?shù)據(jù)
          val words: RDD[String] = lines.flatMap(_.split(" "))
          //將每一個(gè)單詞生成元組 (單詞,1)
          val tuples: RDD[(String, Int)] = words.map((_,1))
          //spark中提供一個(gè)算子 reduceByKey 相同key 為一組進(jìn)行求和 計(jì)算value
          val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
          //對(duì)當(dāng)前這個(gè)結(jié)果進(jìn)行排序 sortBy scalasotrBy是不一樣的 多了一個(gè)參數(shù)
          //默認(rèn)是升序  false就是降序
          val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)
          //將數(shù)據(jù)提交到集群存儲(chǔ) 無法返回值
           sorted.foreach(println)
          //回收資源停止sc,結(jié)束任務(wù)
          sc.stop()
        }
      }

      Java版本

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      import scala.Tuple2;
      import java.util.Arrays;
      import java.util.Iterator;
      import java.util.List
      public class JavaWordCount {
          public static void main(String[] args) {
      //1.先創(chuàng)建conf對(duì)象進(jìn)行配置主要是設(shè)置名稱,為了設(shè)置運(yùn)行模式
              SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
      //2.創(chuàng)建context對(duì)象
              JavaSparkContext jsc = new JavaSparkContext(conf);
              JavaRDD<String> lines = jsc.textFile("dir/file");
      //進(jìn)行切分?jǐn)?shù)據(jù) flatMapFunction是具體實(shí)現(xiàn)類
              JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                  @Override
                  public Iterator<String> call(String s) throws Exception {
                      List<String> splited = Arrays.asList(s.split(" "));
                      return splited.iterator();
                  }

              });
      //將數(shù)據(jù)生成元組
      //第一個(gè)泛型是輸入的數(shù)據(jù)類型 后兩個(gè)參數(shù)是輸出參數(shù)元組的數(shù)據(jù)
              JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String,
                      Integer>() {
                  @Override
                  public Tuple2<String, Integer> call(String s) throws Exception {
                      return new Tuple2<String, Integer>(s, 1);
                  }
              });
      //聚合
              JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer,
                      Integer>() {
                  @Override
      //第一個(gè)Integer是相同key對(duì)應(yīng)的value
      //第二個(gè)Integer是相同key 對(duì)應(yīng)的value
                  public Integer call(Integer v1, Integer v2) throws Exception {
                      return v1 + v2;
                  }
              });
      //因?yàn)?/span>Java api沒有提供sortBy算子,此時(shí)需要將元組中的數(shù)據(jù)進(jìn)行位置調(diào)換,然后在排序,排完序在換回
      //第一次交換是為了排序
              JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String,
                      Integer>, Integer, String>() {
                  @Override
                  public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
                      return tup.swap();
                  }
              });
      //排序
              JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
      //第二次交換是為了最終結(jié)果 <單詞,數(shù)量>
              JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer,
                      String>, String, Integer>() {
                  @Override
                  public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception
                  {
                      return tuple2.swap();
                  }
              });
              System.out.println(res.collect());
              res.saveAsTextFile("out1");
              jsc.stop();
          }
      }

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多