乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      第109講: Spark Streaming電商廣告點(diǎn)擊綜合案例動(dòng)態(tài)黑名單基于數(shù)據(jù)庫MySQL的真正操作代碼實(shí)戰(zhàn)

       看風(fēng)景D人 2019-02-25
      有興趣想學(xué)習(xí)國內(nèi)整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視頻,性價(jià)比超高!
      
      package com.dt.streaming;
      
      import java.sql.Connection;
      import java.sql.DriverManager;
      import java.sql.PreparedStatement;
      import java.sql.ResultSet;
      import java.sql.SQLException;
      import java.util.ArrayList;
      import java.util.HashMap;
      import java.util.HashSet;
      import java.util.Iterator;
      import java.util.List;
      import java.util.Map;
      import java.util.Set;
      import java.util.concurrent.LinkedBlockingQueue;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      import org.apache.spark.api.java.function.VoidFunction;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaDStream;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaPairInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import org.apache.spark.streaming.kafka.KafkaUtils;
      import org.datanucleus.store.rdbms.request.InsertRequest;
      
      import com.google.common.base.Optional;
      
      import kafka.serializer.StringDecoder;
      import scala.Tuple2;
      /**
       * 
       * 在線處理廣告點(diǎn)擊流
       * 廣告點(diǎn)擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
       * 
       * @author hp
       *
       */
      public class AdClickedStreamingStats {
      
         public static void main(String[] args) {
            
            /*
             * 第一步:配置SparkConf:
             * 1,至少2條線程:因?yàn)镾park Streaming應(yīng)用程序在運(yùn)行的時(shí)候,至少有一條
             * 線程用于不斷的循環(huán)接收數(shù)據(jù),并且至少有一條線程用于處理接受的數(shù)據(jù)(否則的話無法
             * 有線程用于處理數(shù)據(jù),隨著時(shí)間的推移,內(nèi)存和磁盤都會(huì)不堪重負(fù));
             * 2,對于集群而言,每個(gè)Executor一般肯定不止一個(gè)Thread,那對于處理Spark Streaming的
             * 應(yīng)用程序而言,每個(gè)Executor一般分配多少Core比較合適?根據(jù)我們過去的經(jīng)驗(yàn),5個(gè)左右的
             * Core是最佳的(一個(gè)段子分配為奇數(shù)個(gè)Core表現(xiàn)最佳,例如3個(gè)、5個(gè)、7個(gè)Core等);
             */
            SparkConf conf = new SparkConf().setMaster("local[5]").
                  setAppName("AdClickedStreamingStats");
            
            /*SparkConf conf = new SparkConf().setMaster("spark://Master:7077").
                  setAppName("SparkStreamingOnKafkaReceiver");*/
            
            /*
             * 第二步:創(chuàng)建SparkStreamingContext:
             * 1,這個(gè)是SparkStreaming應(yīng)用程序所有功能的起始點(diǎn)和程序調(diào)度的核心
             * SparkStreamingContext的構(gòu)建可以基于SparkConf參數(shù),也可基于持久化的SparkStreamingContext的內(nèi)容
             * 來恢復(fù)過來(典型的場景是Driver崩潰后重新啟動(dòng),由于Spark Streaming具有連續(xù)7*24小時(shí)不間斷運(yùn)行的特征,
             * 所有需要在Driver重新啟動(dòng)后繼續(xù)上衣系的狀態(tài),此時(shí)的狀態(tài)恢復(fù)需要基于曾經(jīng)的Checkpoint);
             * 2,在一個(gè)Spark Streaming應(yīng)用程序中可以創(chuàng)建若干個(gè)SparkStreamingContext對象,使用下一個(gè)SparkStreamingContext
             * 之前需要把前面正在運(yùn)行的SparkStreamingContext對象關(guān)閉掉,由此,我們獲得一個(gè)重大的啟發(fā)SparkStreaming框架也只是
             * Spark Core上的一個(gè)應(yīng)用程序而已,只不過Spark Streaming框架箱運(yùn)行的話需要Spark工程師寫業(yè)務(wù)邏輯處理代碼;
             */
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
            
            /*
             * 第三步:創(chuàng)建Spark Streaming輸入數(shù)據(jù)來源input Stream:
             * 1,數(shù)據(jù)輸入來源可以基于File、HDFS、Flume、Kafka、Socket等
             * 2, 在這里我們指定數(shù)據(jù)來源于網(wǎng)絡(luò)Socket端口,Spark Streaming連接上該端口并在運(yùn)行的時(shí)候一直監(jiān)聽該端口
             *        的數(shù)據(jù)(當(dāng)然該端口服務(wù)首先必須存在),并且在后續(xù)會(huì)根據(jù)業(yè)務(wù)需要不斷的有數(shù)據(jù)產(chǎn)生(當(dāng)然對于Spark Streaming
             *        應(yīng)用程序的運(yùn)行而言,有無數(shù)據(jù)其處理流程都是一樣的); 
             * 3,如果經(jīng)常在每間隔5秒鐘沒有數(shù)據(jù)的話不斷的啟動(dòng)空的Job其實(shí)是會(huì)造成調(diào)度資源的浪費(fèi),因?yàn)椴]有數(shù)據(jù)需要發(fā)生計(jì)算,所以
             *        實(shí)例的企業(yè)級生成環(huán)境的代碼在具體提交Job前會(huì)判斷是否有數(shù)據(jù),如果沒有的話就不再提交Job;
             * 4,在本案例中具體參數(shù)含義:
             *        第一個(gè)參數(shù)是StreamingContext實(shí)例;
             *        第二個(gè)參數(shù)是ZooKeeper集群信息(接受Kafka數(shù)據(jù)的時(shí)候會(huì)從ZooKeeper中獲得Offset等元數(shù)據(jù)信息)
             *        第三個(gè)參數(shù)是Consumer Group
             *        第四個(gè)參數(shù)是消費(fèi)的Topic以及并發(fā)讀取Topic中Partition的線程數(shù)
             */
      
      
            
            /**
             * 創(chuàng)建Kafka元數(shù)據(jù),來讓Spark Streaming這個(gè)Kafka Consumer利用
             */
            Map<String, String> kafkaParameters = new HashMap<String, String>();
            kafkaParameters.put("metadata.broker.list", 
                  "Master:9092,Worker1:9092,Worker2:9092");
            
            Set<String> topics =  new HashSet<String>();
            topics.add("AdClicked");
            
            JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(jsc, 
                  String.class, String.class, 
                  StringDecoder.class, StringDecoder.class,
                  kafkaParameters, 
                  topics);
            /**
             * 因?yàn)橐獙诿麊芜M(jìn)行在線過濾,而數(shù)據(jù)是在RDD中的,所以必然使用transform這個(gè)函數(shù);
             * 但是在這里我們必須使用transformToPair,原因是讀取進(jìn)來的Kafka的數(shù)據(jù)是Pair<String,String>類型的,另外
             * 一個(gè)原因是過濾后的數(shù)據(jù)要進(jìn)行進(jìn)一步處理,所以必須是讀進(jìn)來的Kafka數(shù)據(jù)的原始類型DStream<String, String>
             *
             * 在此:再次說明每個(gè)Batch Duration中實(shí)際上講輸入的數(shù)據(jù)就是被一個(gè)且僅僅被一個(gè)RDD封裝的,你可以有多個(gè)
             * InputDstream,但是其實(shí)在產(chǎn)生Job的時(shí)候,這些不同的InputDstream在Batch Duration中就相當(dāng)于Spark基于
             * HDFS數(shù)據(jù)操作的不同文件來源而已罷了。
             */
            JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {
      
               @Override
               public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
                  /**
                   * 在線黑名單過濾思路步驟:
                   * 1,從數(shù)據(jù)庫中獲取黑名單轉(zhuǎn)換成RDD,即新的RDD實(shí)例封裝黑名單數(shù)據(jù);
                   * 2,然后把代表黑名單的RDD的實(shí)例和Batch Duration產(chǎn)生的rdd進(jìn)行join操作,準(zhǔn)確的說是進(jìn)行
                   * leftOuterJoin操作,也就是說使用Batch Duration產(chǎn)生的rdd和代表黑名單的RDD的實(shí)例進(jìn)行
                   * leftOuterJoin操作,如果兩者都有內(nèi)容的話,就會(huì)是true,否則的話就是false;
                   * 
                   * 我們要留下的是leftOuterJoin操作結(jié)果為false;
                   * 
                   */
                  
                  List<String> blackListNames = new ArrayList<>();
                  JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                  jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack(){
      
                     @Override
                     public void resultCallBack(ResultSet result) throws Exception {
                        
                        while(result.next()){
                           blackListNames.add(result.getString(1));
                        }
                     }
                     
                  });
                  
                  List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String, Boolean>>();
                  
                  for (String name : blackListNames){
                     blackListTuple.add(new Tuple2<String,Boolean>(name, true));
                  }
                  
                  List<Tuple2<String, Boolean>> blackListFromDB = blackListTuple; //數(shù)據(jù)來自于查詢的黑名單表并且映射成為<String, Boolean>
                  
                  JavaSparkContext jsc = new JavaSparkContext(rdd.context());
                  
                  /**
                   * 黑名單的表中只有userID,但是如果要進(jìn)行join操作的話,就必須是Key-Value,所以
                   * 在這里我們需要基于數(shù)據(jù)表中的數(shù)據(jù)產(chǎn)生Key-Value類型的數(shù)據(jù)集合;
                   */
                  JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blackListFromDB);
                  
                     
                  /**
                   * 進(jìn)行操作的時(shí)候肯定是基于userID進(jìn)行join的,所以必須把傳入的rdd進(jìn)行mapToPair操作轉(zhuǎn)化成為符合
                   * 格式的rdd
                   * 
                   * 廣告點(diǎn)擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
                   */
                  
                  JavaPairRDD<String, Tuple2<String, String>>  rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String,String>>() {
      
                     @Override
                     public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {
                        String userID = t._2.split("\t")[2];
                        return new Tuple2<String, Tuple2<String, String>>(userID, t);
                     }
                  });
                  
                   JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);
                  
                   JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,
                         Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {
      
                           @Override
                           public Boolean call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> v1)
                                 throws Exception {
                              Optional<Boolean> optional = v1._2._2;
                              
                              if (optional.isPresent() && optional.get()){
                                 return false;
                              } else {
                                 return true;
                              }
                              
                           }
                  }).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {
      
                     @Override
                     public Tuple2<String, String> call(
                           Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t) throws Exception {
                        // TODO Auto-generated method stub
                        return t._2._1;
                     }
                  });
                    
               
                  return result;
               }
            });
            
            
            /*
             * 第四步:接下來就像對于RDD編程一樣基于DStream進(jìn)行編程!??!原因是DStream是RDD產(chǎn)生的模板(或者說類),在Spark Streaming具體
             * 發(fā)生計(jì)算前,其實(shí)質(zhì)是把每個(gè)Batch的DStream的操作翻譯成為對RDD的操作?。?!
             *對初始的DStream進(jìn)行Transformation級別的處理,例如map、filter等高階函數(shù)等的編程,來進(jìn)行具體的數(shù)據(jù)計(jì)算
              *     廣告點(diǎn)擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
              */
         
            
            JavaPairDStream<String, Long> pairs = adClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
      
               @Override
               public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {
                  String[] splited = t._2.split("\t");
                  
                  String timestamp = splited[0]; //yyyy-MM-dd
                  String ip = splited[1];
                  String userID = splited[2];
                  String adID = splited[3];
                  String province = splited[4];
                  String city = splited[5];
                  
                  String clickedRecord = timestamp + "_" + ip + "_" + userID + "_" + adID + "_" 
                        + province + "_" + city;
                  
                  return new Tuple2<String, Long>(clickedRecord, 1L);
               }
            });
            
             /*
                * 第四步:對初始的DStream進(jìn)行Transformation級別的處理,例如map、filter等高階函數(shù)等的編程,來進(jìn)行具體的數(shù)據(jù)計(jì)算
                *   計(jì)算每個(gè)Batch Duration中每個(gè)User的廣告點(diǎn)擊量
                */
            JavaPairDStream<String, Long> adClickedUsers = pairs.reduceByKey(new Function2<Long, Long, Long>(){
      
               @Override
               public Long call(Long v1, Long v2) throws Exception {
                  // TODO Auto-generated method stub
                  return v1 + v2;
               }
                  
            });
            
            
            /**
             * 
             * 計(jì)算出什么叫有效的點(diǎn)擊?
             * 1,復(fù)雜化的一般都是采用機(jī)器學(xué)習(xí)訓(xùn)練好模型直接在線進(jìn)行過濾;
             * 2,簡單的?可以通過一個(gè)Batch Duration中的點(diǎn)擊次數(shù)來判斷是不是非法廣告點(diǎn)擊,但是實(shí)際上講非法廣告
             * 點(diǎn)擊程序會(huì)盡可能模擬真實(shí)的廣告點(diǎn)擊行為,所以通過一個(gè)Batch來判斷是 不完整的,我們需要對例如一天(也可以是每一個(gè)小時(shí))
             * 的數(shù)據(jù)進(jìn)行判斷!
             * 3,比在線機(jī)器學(xué)習(xí)退而求次的做法如下:
             *        例如:一段時(shí)間內(nèi),同一個(gè)IP(MAC地址)有多個(gè)用戶的賬號訪問;
             *        例如:可以統(tǒng)一一天內(nèi)一個(gè)用戶點(diǎn)擊廣告的次數(shù),如果一天點(diǎn)擊同樣的廣告操作50次的話,就列入黑名單;
             * 
             * 黑名單有一個(gè)重點(diǎn)的特征:動(dòng)態(tài)生成?。。∷悦恳粋€(gè)Batch Duration都要考慮是否有新的黑名單加入,此時(shí)黑名單需要存儲(chǔ)起來
             * 具體存儲(chǔ)在什么地方呢,存儲(chǔ)在DB/Redis中即可;
             * 
             * 例如郵件系統(tǒng)中的“黑名單”,可以采用Spark Streaming不斷的監(jiān)控每個(gè)用戶的操作,如果用戶發(fā)送郵件的頻率超過了設(shè)定的值,可以
             * 暫時(shí)把用戶列入“黑名單”,從而阻止用戶過度頻繁的發(fā)送郵件。
             */
             
            JavaPairDStream<String, Long>  filteredClickInBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {
               
               @Override
               public Boolean call(Tuple2<String, Long> v1) throws Exception {
                  if ( 1 < v1._2){
                     //更新一下黑名單的數(shù)據(jù)表
                     return false;
                  } else {
                     return true;
                  }
                  
               }
            });
            
            // Todo。。。。
            
            /*
             * 此處的print并不會(huì)直接出發(fā)Job的執(zhí)行,因?yàn)楝F(xiàn)在的一切都是在Spark Streaming框架的控制之下的,對于Spark Streaming
             * 而言具體是否觸發(fā)真正的Job運(yùn)行是基于設(shè)置的Duration時(shí)間間隔的
             * 
             * 諸位一定要注意的是Spark Streaming應(yīng)用程序要想執(zhí)行具體的Job,對Dtream就必須有output Stream操作,
             * output Stream有很多類型的函數(shù)觸發(fā),類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個(gè)
             * 方法是foraeachRDD,因?yàn)镾park Streaming處理的結(jié)果一般都會(huì)放在Redis、DB、DashBoard等上面,foreachRDD
             * 主要就是用用來完成這些功能的,而且可以隨意的自定義具體數(shù)據(jù)到底放在哪里!??!
             *
             */
      //    filteredClickInBatch.print();
            
            filteredClickInBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
      
               @Override
               public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
                  rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
                     
                     @Override
                     public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
                        /**
                         * 在這里我們使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式把數(shù)據(jù)寫入數(shù)據(jù)庫MySQL;
                         * 由于傳入的參數(shù)是一個(gè)Iterator類型的集合,所以為了更加高效的操作我們需要批量處理
                         * 例如說一次性插入1000條Record,使用insertBatch或者updateBatch類型的操作;
                         * 插入的用戶信息可以只包含:timestamp、ip、userID、adID、province、city
                         * 這里面有一個(gè)問題:可能出現(xiàn)兩條記錄的Key是一樣的,此時(shí)就需要更新累加操作
                         */
                        
                        List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
                        
                        while (partition.hasNext()){
                           Tuple2<String, Long> record = partition.next();
                           String[] splited = record._1.split("\t");
                           
                           UserAdClicked userClicked = new UserAdClicked();
                           userClicked.setTimestamp(splited[0]);
                           userClicked.setIp(splited[1]);
                           userClicked.setUserID(splited[2]);
                           userClicked.setAdID(splited[3]);
                           userClicked.setProvince(splited[4]);
                           userClicked.setCity(splited[5]);
                           userAdClickedList.add(userClicked);
                           
                        }
                        
                        List<UserAdClicked> inserting  = new ArrayList<UserAdClicked>();
                        List<UserAdClicked> updating  = new ArrayList<UserAdClicked>();
                        
                        JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                        
                        //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
                        for (UserAdClicked clicked : userAdClickedList){
                           jdbcWrapper.doQuery("SELECT count(1) FROM adclicked WHERE "
                                 + " timestamp = ? AND userID = ? AND adID = ?",
                                 new Object[]{clicked.getTimestamp(), clicked.getUserID(), clicked.getAdID()},
                                 new ExecuteCallBack() {
                                    
                                    @Override
                                    public void resultCallBack(ResultSet result) throws Exception {
                                       if(result.next()){
                                          long count = result.getLong(1);
                                          clicked.setClickedCount(count);
                                          updating.add(clicked);
                                       } else {
                                          inserting.add(clicked);
                                       }
                                       
                                    }
                                 });
                        }
                     //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
                     ArrayList<Object[]> insertParametersList = new ArrayList<Object[]>();
                     for(UserAdClicked inserRecord : inserting){
                        insertParametersList.add(new Object[]{
                              inserRecord.getTimestamp(),
                              inserRecord.getIp(),
                              inserRecord.getUserID(),
                              inserRecord.getAdID(),
                              inserRecord.getProvince(),
                              inserRecord.getCity(),
                              inserRecord.getClickedCount()
                        });
                     }
                     jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?,?,?,?,?,?,?)", insertParametersList);
                     
                     
                     
                     //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
                     ArrayList<Object[]> updateParametersList = new ArrayList<Object[]>();
                     for(UserAdClicked updateRecord : updating){
                        updateParametersList.add(new Object[]{
                              updateRecord.getTimestamp(),
                              updateRecord.getIp(),
                              updateRecord.getUserID(),
                              updateRecord.getAdID(),
                              updateRecord.getProvince(),
                              updateRecord.getCity(),
                              updateRecord.getClickedCount()
                        });
                     }
                     jdbcWrapper.doBatch("UPDATE adclicked set clickedCount = clickedCount + 1 WHERE "
                                 + " timestamp = ? AND userID = ? AND adID = ?", updateParametersList);
                     
                     
                     
                     }
                  });
                  return null;
               }
               
            
            });
            
            
            JavaPairDStream<String, Long> blackListBasedOnHistory = filteredClickInBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {
      
               @Override
               public Boolean call(Tuple2<String, Long> v1) throws Exception {
                  //廣告點(diǎn)擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
                  String[] splited = v1._1.split("\t");
                  
                  String date = splited[0];
                  String userID = splited[2];
                  String adID = splited[3];
                  
                  /**
                   * 接下來根據(jù)date、userID、adID等條件去查詢用戶點(diǎn)擊廣告的數(shù)據(jù)表,獲得總的點(diǎn)擊次數(shù)
                   * 這個(gè)時(shí)候基于點(diǎn)擊次數(shù)判斷是否屬于黑名單點(diǎn)擊             * 
                   */
                  
                  int clickedCountTotalToday = 81;
                        
                  if (clickedCountTotalToday > 50)
                  {
                     return true;
                  } else {
                     return false;
                  }
                           
               }
            });
            
            
            /**
             * 必須對黑名單的整個(gè)RDD進(jìn)行去重操作?。?!
             */
            
            
            JavaDStream<String> blackListuserIDtBasedOnHistory = blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {
      
               @Override
               public String call(Tuple2<String, Long> v1) throws Exception {
                  // TODO Auto-generated method stub
                  return v1._1.split("\t")[2];
               }
            });
            
            JavaDStream<String> blackListUniqueuserIDtBasedOnHistory = blackListuserIDtBasedOnHistory.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
      
               @Override
               public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                  // TODO Auto-generated method stub
                  return rdd.distinct();
               }
            });
            
            
            
            //下一步寫入黑名單數(shù)據(jù)表中
            
            blackListUniqueuserIDtBasedOnHistory.foreachRDD(new Function<JavaRDD<String>, Void>() {
      
               @Override
               public Void call(JavaRDD<String> rdd) throws Exception {
                  rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
                     
                     @Override
                     public void call(Iterator<String> t) throws Exception {
                        /**
                         * 在這里我們使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式把數(shù)據(jù)寫入數(shù)據(jù)庫MySQL;
                         * 由于傳入的參數(shù)是一個(gè)Iterator類型的集合,所以為了更加高效的操作我們需要批量處理
                         * 例如說一次性插入1000條Record,使用insertBatch或者updateBatch類型的操作;
                         * 插入的用戶信息可以只包含:useID
                         * 此時(shí)直接插入黑名單數(shù)據(jù)表即可。
                         */
                        
                        List<Object[]> blackList = new ArrayList<Object[]>();
                        
                        while(t.hasNext()){
                           blackList.add(new Object[]{(Object)t.next()});
                        }
                        JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                        jdbcWrapper.doBatch("INSERT INTO blacklisttable VALUES (?) ", blackList);
                     }
                  });
                  return null;
               }
            });
            
            /*
             * Spark Streaming執(zhí)行引擎也就是Driver開始運(yùn)行,Driver啟動(dòng)的時(shí)候是位于一條新的線程中的,當(dāng)然其內(nèi)部有消息循環(huán)體,用于
             * 接受應(yīng)用程序本身或者Executor中的消息;
             */
            jsc.start();
            
            jsc.awaitTermination();
            jsc.close();
      
         }
      
      }
      
      class JDBCWrapper {
         
         private static JDBCWrapper jdbcInstance = null;
         private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection> ();
         
         static {
            try {
               Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
            }
         }
         
         
         public static JDBCWrapper getJDBCInstance(){
            if (jdbcInstance == null){
               
               synchronized(JDBCWrapper.class){
                  if (jdbcInstance == null){
                     jdbcInstance = new JDBCWrapper();
                  }           
               }
               
            }
            
            return jdbcInstance;
         }
         
         private JDBCWrapper(){
            
            for (int i = 0; i < 10; i++){
                  
               
               try {
                  Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root","root");
                  dbConnectionPool.put(conn);
               } catch (Exception e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
               }
                  
            }
               
         }
         
         
         public synchronized Connection getConnection(){
            while (0 == dbConnectionPool.size()){
               try {
                  Thread.sleep(20);
               } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
               }
            }
            
            return dbConnectionPool.poll();
         }
         
         public int[] doBatch(String sqlText, List<Object[]> paramsList) {
            
            Connection conn = getConnection();
            PreparedStatement preparedStatement = null;
            int[] result = null;
            try {
               conn.setAutoCommit(false);
               preparedStatement = conn.prepareStatement(sqlText);
               
               for (Object[] parameters : paramsList){
                  for(int i = 0; i < parameters.length; i++){
                     preparedStatement.setObject(i+1, parameters[i]);
                  }
                  
                  preparedStatement.addBatch();
               }
               
               result = preparedStatement.executeBatch();
                     
               
               conn.commit();
               
            } catch (Exception e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
            } finally {
               if (preparedStatement != null){
                  try {
                     preparedStatement.close();
                  } catch (SQLException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                  }
               }
               
               if (conn != null){
                  try {
                     dbConnectionPool.put(conn);
                  } catch (InterruptedException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                  }
               }
            }
            
            
            
            
            return result;
         }
         
         
      public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callBack) {
            
            Connection conn = getConnection();
            PreparedStatement preparedStatement = null;
            ResultSet result = null;
            try {
               
               preparedStatement = conn.prepareStatement(sqlText);
               
               
                  for(int i = 0; i < paramsList.length; i++){
                     preparedStatement.setObject(i+1, paramsList[i]);
                  }
                  
               
               
               result = preparedStatement.executeQuery();
                     
               callBack.resultCallBack(result);
               
               
            } catch (Exception e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
            } finally {
               if (preparedStatement != null){
                  try {
                     preparedStatement.close();
                  } catch (SQLException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                  }
               }
               
               if (conn != null){
                  try {
                     dbConnectionPool.put(conn);
                  } catch (InterruptedException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                  }
               }
            }
               
         }
      }
      
      interface ExecuteCallBack {
         void resultCallBack(ResultSet result) throws Exception;
      }
      
      class UserAdClicked {
         private String timestamp;
         private String ip;
         private String userID;
         private String adID;
         private String province;
         private String city;
         private Long clickedCount;
         
         
         public Long getClickedCount() {
            return clickedCount;
         }
         public void setClickedCount(Long clickedCount) {
            this.clickedCount = clickedCount;
         }
         public String getTimestamp() {
            return timestamp;
         }
         public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
         }
         public String getIp() {
            return ip;
         }
         public void setIp(String ip) {
            this.ip = ip;
         }
         public String getUserID() {
            return userID;
         }
         public void setUserID(String userID) {
            this.userID = userID;
         }
         public String getAdID() {
            return adID;
         }
         public void setAdID(String adID) {
            this.adID = adID;
         }
         public String getProvince() {
            return province;
         }
         public void setProvince(String province) {
            this.province = province;
         }
         public String getCity() {
            return city;
         }
         public void setCity(String city) {
            this.city = city;
         }
      }
      
      
      
      有興趣想學(xué)習(xí)國內(nèi)整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視頻,性價(jià)比超高!
      

        本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多