乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      第103講: 動手實(shí)戰(zhàn)聯(lián)合使用Spark Streaming、Broadcast、Accumulator實(shí)現(xiàn)在線黑名單過濾和計數(shù)

       看風(fēng)景D人 2019-02-25

      有興趣想學(xué)習(xí)國內(nèi)整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視頻,性價比超高!

      1:廣播可以自定義,例如你自定義廣播里面的內(nèi)容,就有很多你可以自定義的操作。尤其是結(jié)合了Broadcast和Accumulator的時候,他可以實(shí)現(xiàn)一些非常復(fù)雜的功能。

      2:廣播和計數(shù)器在企業(yè)的實(shí)際開發(fā)中,非常重要,主要是可以自定義,自定義的時候可以實(shí)現(xiàn)非常復(fù)雜的邏輯。計數(shù)器Accumulator可以計數(shù)黑名單。黑名單數(shù)據(jù)可以寫在廣播里面

      3:下面直接上代碼,當(dāng)然,這只是初步的使用,廣播和計算器的自定義,絕對是高端的spark技術(shù)。它們倆者結(jié)合自定義會發(fā)揮非常強(qiáng)大的作用。很多一線互聯(lián)網(wǎng)公司,它們很多復(fù)雜的業(yè)務(wù),都需要聯(lián)合使用和自定義廣播和計數(shù)器。

      package com.dt.streaming;
      
      import java.io.BufferedReader;
      import java.io.InputStreamReader;
      import java.net.ConnectException;
      import java.net.Socket;
      import java.util.Arrays;
      import java.util.List;
      
      import org.apache.spark.Accumulator;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.PairFunction;
      import org.apache.spark.broadcast.Broadcast;
      import org.apache.spark.storage.StorageLevel;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.Time;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import org.apache.spark.streaming.receiver.Receiver;
      
      import scala.Tuple2;
      
      public class SparkStreamingBroadcastAccumulator {
         /*
         *第103講
          */
         //這個是基于原子型的變量,保存黑名單
         private static volatile Broadcast<List<String>> broadcastList = null;
         private static volatile Accumulator<Integer> accumulator = null;
         public static void main(String[] args) {
      
            SparkConf conf = new SparkConf().setMaster("local[2]").
                  setAppName("WordCountOnline");
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(15));
            
            /**
             * 實(shí)例化廣播,使用Broadcast廣播黑名單到每個Executor中,廣播是基于SparkContext的。而不是StreamingContext。
             * 沒有action,廣播是不能發(fā)出的
             */
            broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
            
            /**
             * 全局計數(shù)器,用于統(tǒng)計在線過濾了多少個黑名單
             * 第一個參數(shù)計數(shù)初始值肯定是0,第2個參數(shù),accumulator的name
             */
            accumulator = jsc.sparkContext().accumulator(0, "OnlineBlacklistCounter");
            
            
            JavaReceiverInputDStream lines = jsc.socketTextStream("master1", 9999);
         
            
            JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
      
               @Override
               public Tuple2<String, Integer> call(String word) throws Exception {
                  return new Tuple2<String, Integer>(word, 1);
               }
            });
            
            
            JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //對相同的Key,進(jìn)行Value的累計(包括Local和Reducer級別同時Reduce)
               
               @Override
               public Integer call(Integer v1, Integer v2) throws Exception {
                  return v1 + v2;
               }
            });
            //過濾黑名單
            wordsCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void> (){
      
               @Override
               public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
                  //對數(shù)據(jù)rdd進(jìn)行過濾
                  rdd.filter(new Function<Tuple2<String, Integer>,Boolean>(){
      
                     @Override
                     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
                        //判斷現(xiàn)在循環(huán)的每個key,是否是在黑名單中
                        if (broadcastList.value().contains(wordPair._1)){
                           accumulator.add(wordPair._2);//這里添加過濾掉的黑名單的個數(shù),用于全局通知
                           return false;//包含,return  false,過濾掉
                        } else {
                           return true;//不包含,return true,不過濾
                        }
                        
                        
                     }
      
                     
                  }).collect();//action觸發(fā)下
                  //連接上nc -lk 9999,輸入Hadoop,Spark,Hive,Scala,就會輸出2次,是累加的。代表總共過濾了2次黑名單
                  System.out.println(" BlackList appeared : " + accumulator.value() + " times");
                  return null;
               }
               
            });
            
            
            /*
             * Spark Streaming執(zhí)行引擎也就是Driver開始運(yùn)行,Driver啟動的時候是位于一條新的線程中的,當(dāng)然其內(nèi)部有消息循環(huán)體,用于
             * 接受應(yīng)用程序本身或者Executor中的消息;
             */
            jsc.start();
            
            jsc.awaitTermination();
            jsc.close();
      
         }
      
      }
      
      
      

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多