摘要:騰訊分布式數(shù)據(jù)倉庫基于開源軟件Hadoop和Hive進(jìn)行構(gòu)建,TDW計算引擎包括兩部分:MapReduce和Spark,兩者內(nèi)部都包含了一個重要的過程—Shuffle。本文對Shuffle過程進(jìn)行解析,并對兩個計算引擎的Shuffle過程進(jìn)行比較。
騰訊分布式數(shù)據(jù)倉庫(Tencent distributed Data Warehouse, 簡稱TDW)基于開源軟件Hadoop和Hive進(jìn)行構(gòu)建,并且根據(jù)公司數(shù)據(jù)量大、計算復(fù)雜等特定情況進(jìn)行了大量優(yōu)化和改造,目前單集群大規(guī)模達(dá)到5600臺,每日作業(yè)數(shù)達(dá)到100多萬,已經(jīng)成為公司大的離線數(shù)據(jù)處理平臺。為了滿足用戶更加多樣的計算需求,TDW也在向?qū)崟r化方向發(fā)展,為用戶提供更加高效、穩(wěn)定、豐富的服務(wù)。
TDW計算引擎包括兩部分:一個是偏離線的MapReduce,一個是偏實時的Spark,兩者內(nèi)部都包含了一個重要的過程——Shuffle。本文對Shuffle過程進(jìn)行解析,并對兩個計算引擎的Shuffle過程進(jìn)行比較,對后續(xù)的優(yōu)化方向進(jìn)行思考和探索,期待經(jīng)過我們不斷的努力,TDW計算引擎運行地更好。
Shuffle過程介紹
MapReduce的Shuffle過程介紹
Shuffle的本義是洗牌、混洗,把一組有一定規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組無規(guī)則的數(shù)據(jù),越隨機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組具有一定規(guī)則的數(shù)據(jù)。
為什么MapReduce計算模型需要Shuffle過程?我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負(fù)責(zé)數(shù)據(jù)的過濾分發(fā);Reduce是規(guī)約,負(fù)責(zé)數(shù)據(jù)的計算歸并。Reduce的數(shù)據(jù)來源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來獲取數(shù)據(jù)。
從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:
Spill過程
Spill過程包括輸出、排序、溢寫、合并等步驟,如圖所示:
Collect
每個Map任務(wù)不斷地以<key, value>對的形式把數(shù)據(jù)輸出到在內(nèi)存中構(gòu)造的一個環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)。
這個數(shù)據(jù)結(jié)構(gòu)其實就是個字節(jié)數(shù)組,叫Kvbuffer,名如其義,但是這里面不光放置了<key, value>數(shù)據(jù),還放置了一些索引數(shù)據(jù),給放置索引數(shù)據(jù)的區(qū)域起了一個Kvmeta的別名,在Kvbuffer的一塊區(qū)域上穿了一個IntBuffer(字節(jié)序采用的是平臺自身的字節(jié)序)的馬甲。<key, value>數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在Kvbuffer中是相鄰不重疊的兩個區(qū)域,用一個分界點來劃分兩者,分界點不是亙古不變的,而是每次Spill之后都會更新一次。初始的分界點是0,<key, value>數(shù)據(jù)的存儲方向是向上增長,索引數(shù)據(jù)的存儲方向是向下增長,如圖所示:
Kvbuffer的存放指針bufindex是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
索引是對<key, value>在kvbuffer中的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如Kvindex初始位置是-4,當(dāng)個<key, value>寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然后Kvindex跳到-8位置,等第二個<key, value>和索引寫完之后,Kvindex跳到-32位置。
Kvbuffer的大小雖然可以通過參數(shù)設(shè)置,但是總共就那么大,<key, value>和索引不斷地增加,加著加著,Kvbuffer總有不夠用的那天,那怎么辦?把數(shù)據(jù)從內(nèi)存刷到磁盤上再接著往內(nèi)存寫數(shù)據(jù),把Kvbuffer中的數(shù)據(jù)刷到磁盤上的過程就叫Spill,多么明了的叫法,內(nèi)存中的數(shù)據(jù)滿了就自動地spill到具有更大空間的磁盤。
關(guān)于Spill觸發(fā)的條件,也就是Kvbuffer用到什么程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務(wù)就需要等Spill完成騰出空間之后才能繼續(xù)寫數(shù)據(jù);如果Kvbuffer只是滿到一定程度,比如80%的時候就開始Spill,那在Spill的同時,Map任務(wù)還能繼續(xù)寫數(shù)據(jù),如果Spill夠快,Map可能都不需要為空閑空間而發(fā)愁。兩利相衡取其大,一般選擇后者。
Spill這個重要的過程是由Spill線程承擔(dān),Spill線程從Map任務(wù)接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。
Sort
先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個關(guān)鍵字升序排序,移動的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。
Spill
Spill線程為這次Spill過程創(chuàng)建一個磁盤文件:從所有的本地目錄中輪訓(xùn)查找能存儲這么大空間的目錄,找到之后在其中創(chuàng)建一個類似于“spill12.out”的文件。Spill線程根據(jù)排過序的Kvmeta挨個partition的把<key, value>數(shù)據(jù)吐到這個文件中,一個partition對應(yīng)的數(shù)據(jù)吐完之后順序地吐下個partition,直到把所有的partition遍歷完。一個partition在文件中對應(yīng)的數(shù)據(jù)也叫段(segment)。
所有的partition對應(yīng)的數(shù)據(jù)都放在這個文件里,雖然是順序存放的,但是怎么直接知道某個partition在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應(yīng)的數(shù)據(jù)在這個文件中的索引:起始位置、原始數(shù)據(jù)長度、壓縮之后的數(shù)據(jù)長度,一個partition對應(yīng)一個三元組。然后把這些索引信息存放在內(nèi)存中,如果內(nèi)存中放不下了,后續(xù)的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓(xùn)查找能存儲這么大空間的目錄,找到之后在其中創(chuàng)建一個類似于“spill12.out.index”的文件,文件中不光存儲了索引數(shù)據(jù),還存儲了crc32的校驗數(shù)據(jù)。(spill12.out.index不一定在磁盤上創(chuàng)建,如果內(nèi)存(默認(rèn)1M空間)中能放得下就放在內(nèi)存中,即使在磁盤上創(chuàng)建了,和spill12.out文件也不一定在同一個目錄下。)
每一次Spill過程就會少生成一個out文件,有時還會生成index文件,Spill的次數(shù)也烙印在文件名中。索引文件和數(shù)據(jù)文件的對應(yīng)關(guān)系如下圖所示:
話分兩端,在Spill線程如火如荼的進(jìn)行SortAndSpill工作的同時,Map任務(wù)不會因此而停歇,而是一無既往地進(jìn)行著數(shù)據(jù)輸出。Map還是把數(shù)據(jù)寫到kvbuffer中,那問題就來了:<key, value>只顧著悶頭按照bufindex指針向上增長,kvmeta只顧著按照Kvindex向下增長,是保持指針起始位置不變繼續(xù)跑呢,還是另謀它路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之后再重新開始或者移動內(nèi)存都比較麻煩,不可取。Map取kvbuffer中剩余空間的中間位置,用這個位置設(shè)置為新的分界點,bufindex指針移動到這個分界點,Kvindex移動到這個分界點的-16位置,然后兩者就可以和諧地按照自己既定的軌跡放置數(shù)據(jù)了,當(dāng)Spill完成,空間騰出之后,不需要做任何改動繼續(xù)前進(jìn)。分界點的轉(zhuǎn)換如下圖所示:
Map任務(wù)總要把輸出的數(shù)據(jù)寫到磁盤上,即使輸出數(shù)據(jù)量很小在內(nèi)存中全部能裝得下,在后也會把數(shù)據(jù)刷到磁盤上。
Merge
Map任務(wù)如果輸出數(shù)據(jù)量很大,可能會進(jìn)行好幾次Spill,out文件和Index文件會產(chǎn)生很多,分布在不同的磁盤上。后把這些文件進(jìn)行合并的merge過程閃亮登場。
Merge過程怎么知道產(chǎn)生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產(chǎn)生的Spill文件,然后把路徑存儲在一個數(shù)組里。Merge過程又怎么知道Spill的索引信息呢?沒錯,也是從所有的本地目錄上掃描得到Index文件,然后把索引信息存儲在一個列表里。到這里,又遇到了一個值得納悶的地方。在之前Spill過程中的時候為什么不直接把這些信息存儲在內(nèi)存中呢,何必又多了這步掃描的操作?特別是Spill的索引數(shù)據(jù),之前當(dāng)內(nèi)存超限之后就把數(shù)據(jù)寫到磁盤,現(xiàn)在又要從磁盤把這些數(shù)據(jù)讀出來,還是需要裝到更多的內(nèi)存中。之所以多此一舉,是因為這時kvbuffer這個內(nèi)存大戶已經(jīng)不再使用可以回收,有內(nèi)存空間來裝這些數(shù)據(jù)了。(對于內(nèi)存空間較大的土豪來說,用內(nèi)存來省卻這兩個io步驟還是值得考慮的。)
然后為merge過程創(chuàng)建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲終的輸出和索引。
一個partition一個partition的進(jìn)行合并輸出。對于某個partition來說,從索引列表中查詢這個partition對應(yīng)的所有索引信息,每個對應(yīng)一個段插入到段列表中。也就是這個partition對應(yīng)一個段列表,記錄所有的Spill文件中對應(yīng)的這個partition那段數(shù)據(jù)的文件名、起始位置、長度等等。
然后對這個partition對應(yīng)的所有的segment進(jìn)行合并,目標(biāo)是合并成一個segment。當(dāng)這個partition對應(yīng)很多個segment時,會分批地進(jìn)行合并:先從segment列表中把批取出來,以key為關(guān)鍵字放置成小堆,然后從小堆中每次取出小的<key, value>輸出到一個臨時文件中,這樣就把這一批段合并成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合并輸出到一個臨時segment,把其加入到列表中;這樣往復(fù)執(zhí)行,直到剩下的段是一批,輸出到終的文件中。
終的索引數(shù)據(jù)仍然輸出到Index文件中。
Map端的Shuffle過程到此結(jié)束。
Copy
Reduce任務(wù)通過HTTP向各個Map任務(wù)拖取它所需要的數(shù)據(jù)。每個節(jié)點都會啟動一個常駐的HTTP server,其中一項服務(wù)就是響應(yīng)Reduce拖取Map數(shù)據(jù)。當(dāng)有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應(yīng)的Map輸出文件中對應(yīng)這個Reduce部分的數(shù)據(jù)通過網(wǎng)絡(luò)流輸出給Reduce。
Reduce任務(wù)拖取某個Map對應(yīng)的數(shù)據(jù),如果在內(nèi)存中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中。Reduce要向每個Map去拖取數(shù)據(jù),在內(nèi)存中每個Map對應(yīng)一塊數(shù)據(jù),當(dāng)內(nèi)存中存儲的Map數(shù)據(jù)占用空間達(dá)到一定程度的時候,開始啟動內(nèi)存中merge,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個文件中。
如果在內(nèi)存中不能放得下這個Map的數(shù)據(jù)的話,直接把Map數(shù)據(jù)寫到磁盤上,在本地目錄創(chuàng)建一個文件,從HTTP流中讀取數(shù)據(jù)然后寫到磁盤,使用的緩存區(qū)大小是64K。拖一個Map數(shù)據(jù)過來就會創(chuàng)建一個文件,當(dāng)文件數(shù)量達(dá)到一定閾值時,開始啟動磁盤文件merge,把這些文件合并輸出到一個文件。
有些Map的數(shù)據(jù)較小是可以放在內(nèi)存中的,有些Map的數(shù)據(jù)較大需要放在磁盤上,這樣后Reduce任務(wù)拖過來的數(shù)據(jù)有些放在內(nèi)存中了有些放在磁盤上,后會對這些來一個全局合并。
Merge Sort
這里使用的Merge和Map端使用的Merge過程一樣。Map的輸出數(shù)據(jù)已經(jīng)是有序的,Merge進(jìn)行一次合并排序,所謂Reduce端的sort過程就是這個合并的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。
本站文章版權(quán)歸原作者及原出處所有 。內(nèi)容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負(fù)責(zé),本站只提供參考并不構(gòu)成任何投資及應(yīng)用建議。本站是一個個人學(xué)習(xí)交流的平臺,網(wǎng)站上部分文章為轉(zhuǎn)載,并不用于任何商業(yè)目的,我們已經(jīng)盡可能的對作者和來源進(jìn)行了通告,但是能力有限或疏忽,造成漏登,請及時聯(lián)系我們,我們將根據(jù)著作權(quán)人的要求,立即更正或者刪除有關(guān)內(nèi)容。本站擁有對此聲明的最終解釋權(quán)。