乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      大數(shù)據(jù)IMF傳奇行動(dòng)絕密課程第100

       看風(fēng)景D人 2019-02-24

      使用Spark Streaming+Spark SQL+Kafka+FileSystem綜合案例

      1、項(xiàng)目分析流程圖
      2、項(xiàng)目代碼實(shí)戰(zhàn)

      圖100-1 SparkStreaming案例分析架構(gòu)圖

      Flume sink到Kafka需要一個(gè)jar包支持
      https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin

      編輯flume-conf.properties

      #配置sink
      agent1.sinks.sink1.type=org.apache.flume.plugins.KafkaSink
      agent1.sinks.sink1.metadata.broker.list=Master:9092,Worker1:9092,Worker2.9092
      agent1.sinks.sink1.partition.key=0
      agent1.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
      agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
      agent1.sinks.sink1.request.requiredacks=0
      agent1.sinks.sink1.max.message.size=1000000
      agent1.sinks.sink1.producer.type=sync
      agent1.sinks.sink1.custom.encoding=UTF-8
      agent1.sinks.sink1.custom.topic.name=HelloKafka
      agent1.sinks.sink1.channel= channel1

      Kafka也可以監(jiān)控文件夾,但為什么要用Flume?Kafka只能接收json格式的文件
      數(shù)據(jù)來源?
      互聯(lián)網(wǎng):電商、社交網(wǎng)絡(luò)等的網(wǎng)站和App程序
      傳統(tǒng)行業(yè):金融、電信、醫(yī)療、農(nóng)業(yè)、生產(chǎn)制造行業(yè);
      例如說:在京東上進(jìn)行廣告的推送,當(dāng)我們點(diǎn)擊廣告的時(shí)候,此時(shí)肯定有日志記錄Log發(fā)送回到Server中,或者說我們使用Android,iOS等中的App,都會(huì)設(shè)置有數(shù)據(jù)記錄的關(guān)鍵點(diǎn)(埋點(diǎn))
      如果是網(wǎng)站,經(jīng)典的方式是通過JS透過Ajax把日志穿回到服務(wù)器上,如果是移動(dòng)App等一般是通過Socket,其他的傳感器或者工業(yè)設(shè)備可以通過自己的通信協(xié)議把數(shù)據(jù)傳回到服務(wù)器端

      為了應(yīng)對高并發(fā)訪問,一般采用Nginx等作為Server前段,Server的分布式集群來做負(fù)載均衡

      Tomcat、Apache、WebLogic作為Server后端

      Server中接收到請求路由后一般都會(huì)對每個(gè)請求在文件中寫一條Log

      Logs Cluster可以專門設(shè)置日志服務(wù)器集群,所有的Server和J2EE類型的業(yè)務(wù)邏輯在執(zhí)行過程中產(chǎn)生的日志信息都可以在后臺同步到日志服務(wù)器集群中

      Server中接收到請求路由后一般都會(huì)對每個(gè)請求在文件中寫一條Log,可以自動(dòng)配置Server寫日志

      企業(yè)中一般都會(huì)有Crontab等定時(shí)工具來通過日志整理工具來把當(dāng)天的日志采集、合并和初步的處理形成一份日志文件,然后發(fā)送到Flume監(jiān)控目錄中

      當(dāng)Flume發(fā)現(xiàn)有新的日志文件進(jìn)來的時(shí)候會(huì)按照配置把數(shù)據(jù)通過Channel來Sink到目的地,這里是Sink到Kafka集群中

      HDFS:
      1、使用MapReduce作業(yè)對數(shù)據(jù)進(jìn)行出不清洗,并寫入新的HDFS文件中。
      2、清洗后的數(shù)據(jù)一般導(dǎo)入到Hive數(shù)據(jù)倉庫中,可以采用分區(qū)表
      3、通過Hive中的SQL,在數(shù)據(jù)倉庫的基礎(chǔ)上,進(jìn)行ETL,此時(shí)的ETL會(huì)把原始的數(shù)據(jù)生成很多張目標(biāo)的table

      企業(yè)生產(chǎn)環(huán)境下,Spark數(shù)據(jù)都是來自Hive

      一個(gè)小例子

      package com.tom.spark.sparkstreaming
      
      import org.apache.commons.codec.StringDecoder
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.hive.HiveContext
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
      
      /**
        * 使用Scala開發(fā)集群運(yùn)行的Spark來實(shí)現(xiàn)在線熱搜詞
        */
      case class MessageItem(name: String, age: Int)
      
      object SparkStreamingFromKafkaFlume2Hive {
        def main(args: Array[String]): Unit = {
      
          if(args.length < 2) {
            System.err.println("Please input your kafka broker list and topics to consume")
            System.exit(1)
          }
          val conf = new SparkConf().setAppName("SparkStreamingFromKafkaFlume2Hive").setMaster("local[2]")
      
          val ssc = new StreamingContext(conf, Durations.seconds(5))
      
          val Array(brokers, topics) = args
          val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
          val topicsParams = topics.split(",").toSet
      
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsParams)
            .map(_._2.split(",")).foreachRDD(rdd => {
            val hiveContext = new HiveContext(rdd.sparkContext)
      
            import hiveContext.implicits._
            rdd.map(record => MessageItem(record(0).trim,record(1).trim.toInt)).toDF().registerTempTable("temp")
            hiveContext.sql("SELECT count(*) FROM temp").show()
          })
      
          // Flume會(huì)作為Kafka的Producer把數(shù)據(jù)寫入到Kafka供本程序消費(fèi)
          ssc.start()
          ssc.awaitTermination()
        }
      }

        本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多