乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Flink Sort-Shuffle 實現  原本的基于 hash 的實現對大規(guī)模批作業(yè)不可用

       看見就非常 2022-01-25

      本文介紹 Sort-Shuffle 如何幫助 Flink 在應對大規(guī)模批數據處理任務時更加游刃有余。主要內容包括:

      1. 數據 Shuffle 簡介
      2. 引入 Sort-Shuffle 的意義
      3. Flink Sort-Shuffle 實現
      4. 測試結果
      5. 調優(yōu)參數
      6. 未來展望

      Flink 作為批流一體的大數據計算引擎,大規(guī)模批數據處理也是 Flink 數據處理能力的重要組成部分。隨著 Flink 的版本迭代,其批數據處理能力也在不斷增強,sort-shuffle 的引入,使得 Flink 在應對大規(guī)模批數據處理任務時更加游刃有余。

      一、數據 Shuffle 簡介

      數據 shuffle 是批數據處理作業(yè)的一個重要階段,在這一階段中,上游處理節(jié)點的輸出數據會被持久化到外部存儲中,之后下游的計算節(jié)點會讀取這些數據并進行處理。這些持久化的數據不僅僅是一種計算節(jié)點間的數據交換形式,還在錯誤恢復中發(fā)揮著重要作用。

      目前,有兩種批數據 shuffle 模型被現有的大規(guī)模分布式計算系統(tǒng)采用,分別是基于 hash 的方式以及基于 sort 的方式:

      1. 基于 hash 方式的核心思路是將發(fā)送給下游不同并發(fā)消費任務的數據寫到單獨的文件中,這樣文件本身就成了一個自然的區(qū)分不同數據分區(qū)的邊界;

      2. 基于 sort 方式的核心思路是先將所有分區(qū)的數據寫在一起,然后通過 sort 來區(qū)分不同數據分區(qū)的邊界。

      我們在 Flink 1.12 版本將基于 sort 的批處理 shuffle 實現引入了 Flink 并在后續(xù)進行了持續(xù)的性能與穩(wěn)定性優(yōu)化;到 Flink 1.13 版本,sort-shuffle 已經實現生產可用。

      二、引入 Sort-Shuffle 的意義

      我們之所以要在 Flink 中引入 sort-shuffle 的實現,一個重要的原因是 Flink 原本的基于 hash 的實現對大規(guī)模批作業(yè)不可用。這個也是被現有的其他大規(guī)模分布式計算系統(tǒng)所證明的:

      1. **穩(wěn)定性方面:**對于高并發(fā)批作業(yè),基于 hash 的實現會產生大量的文件,并且會對這些文件進行并發(fā)讀寫,這會消耗很多資源并對文件系統(tǒng)會產生較大的壓力。文件系統(tǒng)需要維護大量的文件元數據,會產生文件句柄以及 inode 耗盡等不穩(wěn)定風險。

      2. **性能方面:**對于高并發(fā)批作業(yè),并發(fā)讀寫大量的文件意味著大量的隨機 IO,并且每次 IO 實際讀寫的數據量可能是非常少的,這對于 IO 性能是一個巨大的挑戰(zhàn),在機械硬盤上,這使得數據 shuffle 很容易成為批處理作業(yè)的性能瓶頸。

      通過引入基于 sort 的批數據 shuffle 實現,并發(fā)讀寫的文件數量可以大大降低,有利于實現更好的數據順序讀寫,從而能夠提高 Flink 大規(guī)模批處理作業(yè)的穩(wěn)定性與性能。除此之外,新的 sort-shuffle 實現還可以減小內存緩沖區(qū)的消耗。對于基于 hash 的實現,每個數據分區(qū)都需要一塊讀寫緩沖區(qū),內存緩沖區(qū)消耗和并發(fā)成正比。而基于 sort 的實現則可以做到內存緩沖區(qū)消耗和作業(yè)并發(fā)解耦(盡管更大的內存可能會帶來更高的性能)。

      更為重要的一點是我們實現了新的存儲結構與讀寫 IO 優(yōu)化,這使得 Flink 的批數據 shuffle 相比于其他的大規(guī)模分布式數據處理系統(tǒng)更具優(yōu)勢。下面的章節(jié)會更為詳細的介紹 Flink 的 sort-shuffle 實現以及所取得的結果。

      三、Flink Sort-Shuffle 實現

      和其他分布式系統(tǒng)的批數據 sort-shuffle 實現類似,Flink 的整個 shuffle 過程分為幾個重要的階段,包括寫數據到內存緩沖區(qū)、對內存緩沖區(qū)進行排序、將排好序的數據寫出到文件以及從文件中讀取 shuffle 數據并發(fā)送給下游。但是,與其他系統(tǒng)相比,Flink 的實現有一些根本性的不同,包括多段數據存儲格式、省掉數據合并流程以及數據讀取 IO 調度等。這些都使得 Flink 的實現有著更優(yōu)秀的表現。

      1. 設計目標

      在 Flink sort-shuffle 的整個實現過程中,我們把下面這些點作為主要的設計目標加以考量:

      1.1 減少文件數量

      正如上面所討論的,基于 hash 的實現會產生大量的文件,而減少文件的數量有利于提高穩(wěn)定性和性能。Sort-Spill-Merge 的方式被分布式計算系統(tǒng)廣泛采納以達到這一目標,首先將數據寫入內存緩沖區(qū),當內存緩沖區(qū)填滿后對數據進行排序,排序后的數據被寫出到一個文件中,這樣總的文件數量是:(總數據量 / 內存緩沖區(qū)大?。?,從而文件數量被減少。當所有數據寫出完成后,將產生的文件合并成一個文件,從而進一步減少文件數量并增大每個數據分區(qū)的大?。ㄓ欣陧樞蜃x?。?。

      相比于其他系統(tǒng)的實現,Flink 的實現有一個重要的不同,即 Flink 始終向同一個文件中不斷追加數據,而不會寫多個文件再進行合并,這樣的好處始終只有一個文件,文件數量實現了最小化。

      1.2 打開更少的文件

      同時打開的文件過多會消耗更多的資源,同時容易導致文件句柄不夠用的問題,導致穩(wěn)定性變差。因此,打開更少的文件有利于提升系統(tǒng)的穩(wěn)定性。對于數據寫出,如上所述,通過始終向同一個文件中追加數據,每個并發(fā)任務始終只打開一個文件。對于數據讀取,雖然每個文件都需要被大量下游的并發(fā)任務讀取,Flink 依然通過只打開文件一次,并在這些并發(fā)讀取任務間共享文件句柄實現了每個文件只打開一次的目標。

      1.3 最大化順序讀寫

      文件的順序讀寫對文件的 IO 性能至關重要。通過減少 shuffle 文件數量,我們已經在一定程度上減少了隨機文件 IO。除此之外,Flink 的批數據 sort-shuffle 還實現了更多 IO 優(yōu)化來最大化文件的順序讀寫。在數據寫階段,通過將要寫出的數據緩沖區(qū)聚合成更大的批并通過 wtitev 系統(tǒng)調用寫出從而實現了更好的順序寫。在數據讀取階段,通過引入讀取 IO 調度,總是按照文件的偏移順序服務數據讀取請求從而最大限度的實現的文件的順序讀。實驗表明這些優(yōu)化極大的提升了批數據 shuffle 的性能。

      1.4 減少讀寫 IO 放大

      傳統(tǒng)的 sort-spill-merge 方式通過將生成的多個文件合并成一個更大的文件從增大讀取數據塊的大小。這種實現方案雖然帶來了好處,但也有一些不足,最終要的一點便是讀寫 IO 放大,對于計算節(jié)點間的數據 shuffle 而言,在不發(fā)生錯誤的情況下,本身只需要寫入和讀取數據一次,但是數據合并使得相同的數據被讀寫多次,從而導致 IO 總量變多,并且存儲空間的消耗也會變大。

      Flink 的實現通過不斷向同一個文件中追加數據以及獨特的存儲結構規(guī)避了文件和并的過程,雖然單個數據塊的大小小于和并后的大小,但由于規(guī)避了文件合并的開銷再結合 Flink 獨有的 IO 調度,最終可以實現比 sort-spill-merge 方案更高的性能。

      1.5 減少內存緩沖區(qū)消耗

      類似于其他分布式計算系統(tǒng)中 sort-shuffle 的實現,Flink 利用一塊固定大小的內存緩沖區(qū)進行數據的緩存與排序。這塊內存緩沖區(qū)的大小是與并發(fā)無關的,從而使得上游 shuffle 數據寫所需要的內存緩沖區(qū)大小與并發(fā)解耦。結合另一個內存管理方面的優(yōu)化 FLINK-16428 可以同時實現下游 shuffle 數據讀取的內存緩沖區(qū)消耗并發(fā)無關化,從而可以減少大規(guī)模批作業(yè)的內存緩沖區(qū)消耗。(注:FLINK-16428 同時適用于批作業(yè)與流作業(yè))

      2. 實現細節(jié)

      2.1 內存數據排序

      在 shuffle 數據的 sort-spill 階段,每條數據被首先序列化并寫入到排序緩沖區(qū)中,當緩沖區(qū)被填滿后,會對緩沖區(qū)中的所有二進制數據按照數據分區(qū)的順序進行排序。此后,排好序的數據會按照數據分區(qū)的順序被寫出到文件中。雖然,目前并沒有對數據本身進行排序,但是排序緩沖區(qū)的接口足夠的泛化,可以實現后續(xù)潛在的更為復雜的排序要求。排序緩沖區(qū)的接口定義如下:

      public interface SortBuffer {
       
         */** Appends data of the specified channel to this SortBuffer. \*/*
         boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
       
         */** Copies data in this SortBuffer to the target MemorySegment. \*/*
         BufferWithChannel copyIntoSegment(MemorySegment target);
       
         long numRecords();
       
         long numBytes();
       
         boolean hasRemaining();
       
         void finish();
       
         boolean isFinished();
       
         void release();
       
         boolean isReleased();
       }
      復制代碼

      在排序算法上,我們選擇了復雜度較低的 bucket-sort。具體而言,每條序列化后的數據前面都會被插入一個 16 字節(jié)的元數據。包括 4 字節(jié)的長度、4 字節(jié)的數據類型以及 8 字節(jié)的指向同一數據分區(qū)中下一條數據的指針。結構如下圖所示:

      img

      當從緩沖區(qū)中讀取數據時,只需要按照每個數據分區(qū)的鏈式索引結構就可以讀取到屬于這個數據分區(qū)的所有數據,并且這些數據保持了數據寫入時的順序。這樣按照數據分區(qū)的順序讀取所有的數據就可以達到按照數據分區(qū)排序的目標。

      2.2 文件存儲結構

      如前所述,每個并行任務產生的 shuffle 數據會被寫到一個物理文件中。每個物理文件包含多個數據區(qū)塊(data region),每個數據區(qū)塊由數據緩沖區(qū)的一次 sort-spill 生成。在每個數據區(qū)塊中,所有屬于不同數據分區(qū)(data partition,由下游計算節(jié)點不同并行任務消費)的數據按照數據分區(qū)的序號順序進行排序聚合。下圖展示了 shuffle 數據文件的詳細結構。其中(R1,R2,R3)是 3 個不同的數據區(qū)塊,分別對應 3 次數據的 sort-spill 寫出。每個數據塊中有 3 個不同的數據分區(qū),分別將由(C1,C2,C3)3 個不同的并行消費任務進行讀取。也就是說數據 B1.1,B2.1 及 B3.1 將由 C1 處理,數據 B1.2,B2.2 及 B3.2 將由 C2 處理,而數據 B1.3,B2.3 及 B3.3 將由 C3 處理。

      img

      類似于其他的分布式處理系統(tǒng)實現,在 Flink 中,每個數據文件還對應一個索引文件。索引文件用來在讀取時為每個消費者索引屬于它的數據(data partition)。索引文件包含和數據文件相同的 data region,在每個 data region 中有與 data partition 相同數量的索引項,每個索引項包含兩個部分,分別對應到數據文件的偏移量以及數據的長度。作為一個優(yōu)化。Flink 為每個索引文件緩存最多 4M 的索引數據。數據文件與索引文件的對應關系如下:

      img

      2.3 讀取 IO 調度

      為了進一步提高文件 IO 性能,基于上面的存儲結構,Flink 進一步引入了 IO 調度機制,類似于磁盤調度的電梯算法,Flink 的 IO 調度總是按照 IO 請求的文件偏移順序進行調度。更具體來說,如果數據文件有 n 個 data region,每個 data region 有 m 個 data partition,同時有 m 個下游計算任務讀取這一數據文件,那么下面的偽代碼展示了 Flink 的 IO 調度算法的工作流程:

      *// let data_regions as the data region list indexed from 0 to n - 1*
       *// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*
       for (data_region in data_regions) {
         data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
         if (data_reader == null)
           break;
         reading_buffers = request_reading_buffers();
         if (reading_buffers.isEmpty())
           break;
         read_data(data_region, data_reader, reading_buffers);
       }   
      復制代碼

      2.4 數據廣播優(yōu)化

      數據廣播是指發(fā)送相同的數據給下游計算節(jié)點的所有并行任務,一個常見的應用場景是 broadcast-join。Flink 的 sort-shuffle 實現對這一過程進行了優(yōu)化,使得在包括內存排序緩沖區(qū)和 shuffle 文件中,廣播數據只保存一份,這可以大大提升數據廣播的性能。更具體來說,當寫入一條廣播數據到排序緩沖區(qū)時,這條數據只會被序列化并且拷貝一次,同樣在將數據寫出到 shuffle 文件時,也只會寫一份數據。在索引文件中,對于不同 data partition 的數據索引項,他們均指向數據文件中的同一塊數據。下圖展示了數據廣播優(yōu)化的所有細節(jié):

      img

      2.5 數據壓縮

      數據壓縮是一個簡單而有效的優(yōu)化手段,測試結果顯示數據壓縮可以提高 TPC-DS 總體性能超過 30%。類似于 Flink 的基于 hash 的批處理 shuffle 實現,數據壓縮是以網絡緩沖區(qū)(network buffer)為單位進行的,數據壓縮不跨 data partition,也就是說發(fā)給不同下游并行任務的數據分開壓縮,壓縮發(fā)生在數據排序后寫出前,下游消費任務在收到數據后進行解壓。下圖展示了數據壓縮的整個流程:

      img

      四、測試結果

      1. 穩(wěn)定性

      新的 sort-shuffle 的實現極大的提高 Flink 運行批處理作業(yè)的穩(wěn)定性。除了解決了潛在的文件句柄以及 inode 耗盡的不穩(wěn)定問題外,還解決了一些 Flink 原有 hash-shuffle 存在的已知問題,如 FLINK-21201(創(chuàng)建過多文件導致主線程阻塞),FLINK-19925(在網絡 netty 線程中執(zhí)行 IO 操作導致網絡穩(wěn)定性受到影響)等。

      2. 性能

      我們在 1000 規(guī)模的并發(fā)下運行了 TPC-DS 10T 數據規(guī)模的測試,結果表明,相比于 Flink 原本的批數據 shuffle 實現,新的數據 shuffle 實現可以實現 2-6 倍的性能提升,如果排除計算時間,只統(tǒng)計數據 shuffle 時間可以是先最高 10 倍的性能提升。下表展示了性能提升的詳細數據:

      JobsTime Used for Sort-Shuffle (s)Time Used for Hash-Shuffle (s)Speed up Factor
      q4.sql98653715.45
      q11.sql3487982.29
      q14b.sql88321292.51
      q17.sql2697812.90
      q23a.sql41811992.87
      q23b.sql3768432.24
      q25.sql4138732.11
      q29.sql35410382.93
      q31.sql2234982.23
      q50.sql2155502.56
      q64.sql2174422.04
      q74.sql2709623.56
      q75.sql1667134.30
      q93.sql2045402.65

      在我們的測試集群上,每塊機械硬盤的數據讀取以及寫入帶寬可以達到 160MB/s:

      Disk NameSDISDJSDK
      Writing Speed (MB/s)189173186
      Reading Speed (MB/s)112154158

      注:我們的測試環(huán)境配置如下,由于我們有較大的內存,所以一些 shuffle 數據量小的作業(yè)實際數據 shuffle 僅為讀寫內存,因此上面的表格僅列出了一些 shuffle 數據量大,性能提升明顯的查詢:

      Number of NodesMemory Size Per NodeCores Per NodeDisks Per Node
      12About 400G963

      五、調優(yōu)參數

      在 Flink 中,sort-shuffle 默認是不開啟的,想要開啟需要調小這個參數的配置:taskmanager.network.sort-shuffle.min-parallelism。這個參數的含義是如果數據分區(qū)的個數(一個計算任務并發(fā)需要發(fā)送數據給幾個下游計算節(jié)點)低于這個值,則走 hash-shuffle 的實現,如果高于這個值則啟用 sort-shuffle。實際應用時,在機械硬盤上,可以配置為 1,即使用 sort-shuffle。

      Flink 沒有默認開啟數據壓縮,對于批處理作業(yè),大部分場景下是建議開啟的,除非數據壓縮率低。開啟的參數為 taskmanager.network.blocking-shuffle.compression.enabled。

      對于 shuffle 數據寫和數據讀,都需要占用內存緩沖區(qū)。其中,數據寫緩沖區(qū)的大小由 taskmanager.network.sort-shuffle.min-buffers 控制,數據讀緩沖區(qū)由 taskmanager.memory.framework.off-heap.batch-shuffle.size 控制。數據寫緩沖區(qū)從網絡內存中切分出來,如果要增大數據寫緩沖區(qū)可能還需要增大網絡內存總大小,以避免出現網絡內存不足的錯誤。數據讀緩沖區(qū)從框架的 off-heap 內存中切分出來,如果要增大數據讀緩沖區(qū),可能還需要增大框架的 off-heap 內存,以避免出現 direct 內存 OOM 錯誤。一般而言更大的內存緩沖區(qū)可以帶來更好的性能,對于大規(guī)模批作業(yè),幾百兆的數據寫緩沖區(qū)與讀緩沖區(qū)是足夠的。

      六、未來展望

      還有一些后續(xù)的優(yōu)化工作,包括但不限于:

      1)網絡連接復用,這可以提高網絡的建立的性能與穩(wěn)定性,相關 Jira 包括 FLINK-22643 以及 FLINK-15455;

      2)多磁盤負載均衡,這有利于解決負載不均的問題,相關 Jira 包括 FLINK-21790 以及 FLINK-21789;

      3)實現遠程數據 shuffle 服務,這有利于進一步提升批數據 shuffle 的性能與穩(wěn)定性;

      4)允許用戶選擇磁盤類型,這可以提高易用性,用戶可以根據作業(yè)的優(yōu)先級選擇使用 HDD 或者 SSD。

      英文原文鏈接:

      flink./2021/10/26/…

      flink./2021/10/26/…


      12 月 4-5 日,Flink Forward Asia 2021 重磅開啟,全球 40+ 多行業(yè)一線廠商,80+ 干貨議題,帶來專屬于開發(fā)者的技術盛宴。 /

      另有首屆 Flink Forward Asia Hackathon 正式啟動,10W 獎金等你來! www.aliyun.com/page-source… img

        本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯系方式、誘導購買等信息,謹防詐騙。如發(fā)現有害或侵權內容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多