摘要:Cloudera和英特爾公司的工程師們正在通力合作,旨在使Spark shuffle階段具有更高的可擴展性和穩定性。本文對相關方法的設計進行了詳細描述。
區別常見的Embarrassingly Parallel系統,類似MapReduce和Apache Spark(Apache Hadoop的下一代數據處理引擎)這樣的計算引擎主要區別在于對“all-to-all” 操作的支持上。和許多分布式引擎一樣,MapReduce和Spark的操作通常針對的是被分片數據集的子分片,很多操作每次只處理單個數據節點,同時這些操作所涉及到的數據往往都只存在于這個數據片內。all-to-all操作必須將數據集看作一個整體,而每個輸出結果都可以總結自不同分片上的記錄。Spark的groupByKey、sortByKey,還有reduceByKey這些shuffle功能都屬于這方面常見的操作。
在這些分布式計算引擎中,shuffle指的是在一個all-to-all操作中將數據再分割和聚合的操作。顯而易見,在實踐生產中,我們在Spark部署時所發現的大多性能、可擴展性及穩定性問題都是在shuffle過程中產生的。
Cloudera和英特爾的工程師們正通力合作以擴展Spark的shuffle,使得shuffle可以更加快速與穩定地處理大量的數據集。Spark在很多方面相較MapReduce有更多優勢,同時又在穩定性與可擴展性上相差無幾。在此,我們從久經考驗的MapReduce shuffle部署中吸取經驗,以提高排序數據輸出的shuffle性能。
在本文中,我們將會逐層解析——介紹目前Spark shuffle的運作實現模式,提出修改建議,并對性能的提高方式進行分析。更多的工作進展可以于正在進行中的SPARK-2926發現。
Spark目前的運作實現模式
一個shuffle包含兩組任務:1. 產生shuffle數據的階段;2.使用shuffle數據的階段。鑒于歷史原因,寫入數據的任務被稱做“map task”,而讀取數據的任務被稱做“reduce tasks”,但是以上角色分配只局限于單個job的某個具體shuffle過程中。在一個shuffle中扮演reduce的task,在另一個shuffle中可能就是map了,因為它在前者里面執行的是讀取操作,而在后者中執行的是數據寫入任務,并在隨后的階段中被消費。
MapReduce和Spark的shuffle都使用到了“pull”模式。在每個map任務中,數據被寫入本地磁盤,然后在reduce任務中會遠程請求讀取這些數據。由于shuffle使用的是all-to-all模式,任何map任務輸出的記錄組都可能用于任意reduce。一個job在map時的shuffle操作基于以下原則:所有用于同一個reduce操作的結果都會被寫入到相鄰的組別中,以便獲取數據時更為簡單。
Spark默認的shuffle實現(即hash-based shuffle)是map階段為每個reduce任務單獨打開一個文件,這種操作勝在簡單,但實際中卻有一些問題,比如說實現時Spark必須維持大量的內存消耗,或者造成大量的隨機磁盤I/O。此外,如果M和R分別代表著一個shuffle操作中的map和reduce數量,則hash-based shuffle需要產生總共M*R個數量的臨時文件,Shuffle consolidation將這個數量減至C*R個(這里的C代表的是同時能夠運行的map任務數量),但即便是經過這樣的修改之后,在運行的reducer數量過多時還是經常會出現“文件打開過多”的限制。
Hash-based shuffle中單個map任務
Sort-based shuffle中單個map任務
為了進一步提高shuffle的穩定性與性能,從1.1版本開始,Spark引入了“sort-based shuffle”實現,其功能與MapReduce使用的map方式十分類似。在部署時,每個任務的map輸出結果都會被儲存在內存里(直到可用內存耗盡),然后在reduce任務中進行排序,之后再spill到一個單獨的文件。如果在單個任務中該操作發生了多次,那么這個任務的輸出將被合并。
在reduced的過程中,一組線程負責抓取遠程的map輸出blocks。當數據進入后,它們會被反序列化,再轉化成一個適用于執行all-to-all操作的數據結構。在類似groupByKey、reduceByKey,還有aggregateByKey之類的聚合操作中,其結果會變成一個ExternalAppendOnlyMap(本質上是一個內存溢出時會spill到硬盤的哈希map)。在類似sortByKey的排序操作中,輸出結果會變成一個ExternalSorter(將結果分類后可能會spill到硬盤,并在對結果進行排序后返回一個迭代程序)。
完全Sort-based Shuffle
上文所描述的方式有兩個弊端:
每個Spark reduce的任務都需要同時打開大量的反序列化記錄,從而導致內存的大量消耗,而大量的Java對象對JVM的垃圾收集(garbage collection)產生壓力,會造成系統變慢和卡頓,同時由于這個版本較之序列化的版本內存消耗更為巨大,因而Spark必須更早更頻繁的spill,造成硬盤I/O也更為頻繁。此外,由于判斷反序列化對象的內存占用情況時難以達到100%的準確率,因此保持大量的反序列化對象會加劇內存不足的可能性。
在引導需要在分片內的排序操作時,我們需要進行兩次排序:mapper時按分片排序,reducer時按Key排序。
我們修改了map時在分片內按Key對結果進行排序,這樣在reduce時我們只要合并每個map任務排序后的吧blocks即可。我們可以按照序列化的模式將每個block存到內存中,然后在合并時逐一地將結果反序列化。這樣任何時候,內存中反序列化記錄的大數量就是已經合并的blocks總量。
完全sort-based shuffle中的單個map任務
單個reduce任務可以接收來自數以千計map任務的blocks,為了使得這個多路歸并更加高效,尤其是在數據超過可用內存的情況下,我們引入了分層合并( tiered merger)的概念。如果需要合并許多保存在磁盤上的blocks,這樣做可以小化磁盤尋道數量。分層合并同樣適用于ExternalAppendOnlyMap以及ExternalSorter的內部合并步驟,但是暫時我們還沒有進行修改。
高性能合并
每個任務中有一組線程是負責同步抓取shuffle數據的,每個任務對應的內存池有48MB,用來存放相應的數據。
我們引入了SortShuffleReader,先從內存池中獲取到blocks,然后[key, value]的方式向用戶代碼中返回迭代器對象。
Spark有一個所有任務共享的shuffle內存區域,默認大小是完整executor heap的20%。當blocks進入時,SortShuffleReader會嘗試從該主區域中調用shuffle所需的內存,直至內存塞滿調用失敗為止,然后我們需要將數據spill到硬盤上以釋放內存。SortShuffleReader將所有(好吧,并非所有的,有時候只會spill一小部分)內存中的數據塊寫入一個單獨的文件中并存入硬盤。隨著blocks被存入硬盤,一個后臺線程會對其進行監視,并在必要時將這些文件合并為更大一些的磁盤blogs。“final merge”會將所有終硬盤與內存中的blocks全部合并起來。
如何確定是時候進行一個臨時的“磁盤到磁盤”合并?
spark.shuffle.maxMergeFactor(默認為100)控制著一次可以合并的硬盤blocks數量的大值,當硬盤blocks的數量超過限制時,后臺線程會運行一次合并以降低這個數量(但是不會馬上奏效,詳情請查看代碼)。在確定需要合并多少blocks時,線程首先會將需要執行合并的blocks數量設定為小值,并將這個值作為合并數量的上限,以期盡可能減少blocks的合并次數。因此,如果spark.shuffle.maxMergeFactor是100,而磁盤blocks的終數量為110,這樣只需總共進行11個blocks的合并,就可將終磁盤blocks的數量保持在恰好100。想要再合并哪怕一個blocks,都會需要再一次的額外合并,而可能導致不必要的磁盤I/O。
maxMergeWidth為4的分層合并。每個矩形代表一個segment,其中三個合并為一個,然后終有四個segment被合并到一個迭代器中,以備下一次操作使用。
與sortByKey的性能對比
我們測試了使用SparkPerf進行sortbykey時,在相應的修改后,性能有何變化。在其中我們選擇了兩個不同大小的數據集,以比較我們的改動在內存足以支持所有shuffle數據時,和不足以支持的情況下對于性能的增益情況。
Spark的sortByKey變化導致兩個job和三個stage。
Sample stage:進行數據取樣以創建一個分區范圍,分區大小相等。
Map階段:寫入為reduce階段準備的shuffle bucket。
Reduce階段:得到相關的shuffle結果,按特定的數據集分區進行合并/分類。
引入一個6節點集群的基準,每個executor包含24個core和36GB的內存,大數據集有200億條記錄,壓縮后在HDFS上占409.8GB。小數據集有20億條記錄,壓縮后在HDFS上占15.9GB。每條記錄都包含一對10個字符串的鍵值對,在兩個case中,我們在超過1000個分片中測試了排序,每個stage的運行時間表以及總共的job如下圖顯示:
大數據集(越低則越好)
小數據集(越低則越好)
取樣階段耗時相同,因為此階段并不涉及shuffle過程;在map階段,在我們的改進下,每個分片中按Key對數據進行排序,導致這個階段的運行時間增加了(大數據集增加了37%,小數據集則是27%)。但是增加的時間在reduce階段得到了更大的補償,由于現在只需合并排序后的數據,Reduce階段的兩個數據集的耗時共減少了66%,從而使得大數據集加速27%,小數據集加速17%。
下面還有什么?
SPARK-2926是Spark shuffle的幾個改進計劃的成果之一,在這個版本中很多方面上shuffle可以更好地管理內存:
SPARK-4550 用內存緩沖中的map輸出數據作為原始數據,取代Java對象。map輸出數據的空間消耗更少,從而使得spill更少,在原始數據的對比上更快。
SPARK-4452 更詳細地追蹤不同shuffle數據結構的內存分配,同時將無需消耗的內存盡早返還。
SPARK-3461 追蹤agroupBy后出現的特定Key值相應字符串或者節點,而不是一次將其全部loading入內存。
作者簡介:Sandy Ryza是Cloudera公司的數據科學家、Hadoop提交者,同時也是Spark的貢獻者之一。他還是Advanced Analytics with Spark一書的作者之一。
Saisai(Jerry)Shao是一名英特爾公司的軟件工程師,同時也是Spark的貢獻者之一。
原文鏈接:Improving Sort Performance in Apache Spark: It’s a Double
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。