乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      大數(shù)據(jù)開發(fā)-Flume-頻繁產(chǎn)生小文件原因和處理

       路人甲Java 2022-09-19 發(fā)布于北京

      1.問題背景

      通過flume直接上傳實時數(shù)據(jù)到hdfs,會常遇到的一個問題就是小文件,需要調參數(shù)來設置,往往在生產(chǎn)環(huán)境參數(shù)大小也不同

      1.flume滾動配置為何不起作用?

      2.通過源碼分析得出什么原因?

      3.該如何解決flume小文件?

      2. 過程分析

      接著上一篇,https://blog.csdn.net/hu_lichao/article/details/110358689

      本人在測試hdfs的sink,發(fā)現(xiàn)sink端的文件滾動配置項起不到任何作用,配置如下:

      a1.sinks.k1.type=hdfs  
      a1.sinks.k1.channel=c1  
      a1.sinks.k1.hdfs.useLocalTimeStamp=true  
      a1.sinks.k1.hdfs.path=hdfs://linux121:9000/user/data/logs/%Y-%m-%d
      a1.sinks.k1.hdfs.filePrefix=XXX  
      a1.sinks.k1.hdfs.rollInterval=60  
      a1.sinks.k1.hdfs.rollSize=0  
      a1.sinks.k1.hdfs.rollCount=0  
      a1.sinks.k1.hdfs.idleTimeout=0   
      

      這里配置的是60秒,文件滾動一次,也就每隔60秒,會新產(chǎn)生一個文件【前提,flume的source端有數(shù)據(jù)來】這里注意 useLocalTimeStamp 是使用本地時間戳來對hdfs上的目錄來命名,這個屬性的目的就是相當于時間戳的攔截器,否則%Y 等等這些東西都識別不了

      要么用上面這個屬性,要么用時間戳攔截器。但是當我啟動flume的時候,運行十幾秒,不斷寫入數(shù)據(jù),發(fā)現(xiàn)hdfs端頻繁的產(chǎn)生文件,每隔幾秒就有新文件產(chǎn)生而且在flume的日志輸出可以頻繁看到這句:

      [WARN] Block Under-replication detected. Rotating file.
      

      只要有這句,就會產(chǎn)生一個新的文件,意思就是檢測到復制塊正在滾動文件,結合源碼看下:

      private boolean shouldRotate() {  
      
          boolean doRotate = false;    
          if (writer.isUnderReplicated()) {    
              this.isUnderReplicated = true;    
              doRotate = true;  
          
          } else {  
          
              this.isUnderReplicated = false;  
          
          }  
          
          if ((rollCount > 0) && (rollCount <= eventCounter)) {  
              LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter); 
              doRotate = true;  
          
          }  
          
          if ((rollSize > 0) && (rollSize <= processSize)) {  
              LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);   
              doRotate = true;  
          }     
          return doRotate;  
      
      }   
      

      這是判斷是否滾動文件,但是這里面的第一判斷條件是判斷是否當前的HDFSWriter正在復制塊

      public boolean isUnderReplicated() {  
          try {  
          
              int numBlocks = getNumCurrentReplicas();  
              if (numBlocks == -1) {  
                  return false;  
              }  
          
              int desiredBlocks;  
              if (configuredMinReplicas != null) {    
                  desiredBlocks = configuredMinReplicas;  
              } else {   
                  desiredBlocks = getFsDesiredReplication();     
              }  
          
              return numBlocks < desiredBlocks;  
          
          } catch (IllegalAccessException e) {  
              logger.error("Unexpected error while checking replication factor", e);   
          } catch (InvocationTargetException e) {   
              logger.error("Unexpected error while checking replication factor", e);      
          } catch (IllegalArgumentException e) {     
              logger.error("Unexpected error while checking replication factor", e);     
          }  
          
          return false;  
      
      } 
      

      通過讀取的配置復制塊數(shù)量和當前正在復制的塊比較,判斷是否正在被復制

      if (shouldRotate()) {  
          boolean doRotate = true;  
          
          if (isUnderReplicated) {  
          
              if (maxConsecUnderReplRotations > 0 &&  
                  consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {  
                  doRotate = false;  
          
                  if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {   
                      LOG.error("Hit max consecutive under-replication rotations ({}); " +  
                          "will not continue rolling files under this path due to " +  
                          "under-replication", maxConsecUnderReplRotations);  
                  }  
          
              } else {    
                  LOG.warn("Block Under-replication detected. Rotating file.");  
              }  
          
              consecutiveUnderReplRotateCount++;  
          
          } else {  
          
              consecutiveUnderReplRotateCount = 0;  
      
      }  
      

      以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大于0,會按照你的配置來滾動的,但是在入口進來后,發(fā)現(xiàn),又去判斷了是否有塊在復制;里面就讀取了一個固定變量maxConsecUnderReplRotations=30,也就是正在復制的塊,最多之能滾動出30個文件,如果超過了30次,該數(shù)據(jù)塊如果還在復制中,那么數(shù)據(jù)也不會滾動了,doRotate=false,不會滾動了,所以有的人發(fā)現(xiàn)自己一旦運行一段時間,會出現(xiàn)30個文件,再結合上面的源碼看一下:如果你配置了10秒滾動一次,寫了2秒,恰好這時候該文件內(nèi)容所在的塊在復制中,那么雖然沒到10秒,依然會給你滾動文件的,文件大小,事件數(shù)量的配置同理了。

      為了解決上述問題,我們只要讓程序感知不到寫的文件所在塊正在復制就行了,怎么做呢??只要讓isUnderReplicated()方法始終返回false就行了,該方法是通過當前正在被復制的塊和配置中讀取的復制塊數(shù)量比較的,我們能改的就只有配置項中復制塊的數(shù)量,而官方給出的flume配置項中有該項

      hdfs.minBlockReplicas
      Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. 
      

      默認讀的是hadoop中的dfs.replication屬性,該屬性默認值是3,這里我們也不去該hadoop中的配置,在flume中添加上述屬性為1即可

      完整配置如下:

      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      # taildir source
      a1.sources.r1.type = TAILDIR
      a1.sources.r1.positionFile =
      /data/lagoudw/conf/startlog_position.json
      a1.sources.r1.filegroups = f1
      a1.sources.r1.filegroups.f1 = /opt/hoult/servers/logs/start/.*log
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
      # memorychannel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 100000
      a1.channels.c1.transactionCapacity = 2000
      # hdfs sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
      a1.sinks.k1.hdfs.filePrefix = startlog.
      # 配置文件滾動方式(文件大小32M)
      a1.sinks.k1.hdfs.rollSize = 33554432
      a1.sinks.k1.hdfs.rollCount = 0
      a1.sinks.k1.hdfs.rollInterval = 0
      a1.sinks.k1.hdfs.idleTimeout = 0
      a1.sinks.k1.hdfs.minBlockReplicas = 1
      # 向hdfs上刷新的event的個數(shù)
      a1.sinks.k1.hdfs.batchSize = 1000
      # 使用本地時間
      # a1.sinks.k1.hdfs.useLocalTimeStamp = true
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
       
      

      這樣程序就永遠不會因為文件所在塊的復制而滾動文件了,只會根據(jù)你的配置項來滾動文件了。。。。

      3.總結

      設置minBlockReplicas=1 的時候可以保證會按你設置的幾個參數(shù)來達到不會產(chǎn)生過多的小文件,因為這個參數(shù)在讀取時候優(yōu)先級較高,會首先判斷到有沒有Hdfs的副本復制,導致滾動文件的異常,另外flume接入數(shù)據(jù)時候可以通過過濾器盡可能把一些完全用不到的數(shù)據(jù)進行過濾,清洗時候就 省事一些了。
      吳邪,小三爺,混跡于后臺,大數(shù)據(jù),人工智能領域的小菜鳥。

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約