在《Hadoop 1.x中fsimage和edits合并實現》文章中,我們談到了Hadoop 1.x上的fsimage和edits合并實現,里面也提到了Hadoop 2.x版本的fsimage和edits合并實現和Hadoop 1.x完全不一樣,今天就來談談Hadoop 2.x中fsimage和edits合并的實現。
我們知道,在Hadoop 2.x中解決了NameNode的單點故障問題;同時SecondaryName已經不用了,而之前的Hadoop 1.x中是通過SecondaryName來合并fsimage和edits以此來減小edits文件的大小,從而減少NameNode重啟的時間。而在Hadoop 2.x中已經不用SecondaryName,那它是怎么來實現fsimage和edits合并的呢?首先我們得知道,在Hadoop 2.x中提供了HA機制(解決NameNode單點故障),可以通過配置奇數個JournalNode來實現HA,如何配置今天就不談了!HA機制通過在同一個集群中運行兩個NN(active NN & standby NN)來解決NameNode的單點故障,在任何時間,只有一臺機器處于Active狀態;另一臺機器是處于Standby狀態。Active NN負責集群中所有客戶端的操作;而Standby NN主要用于備用,它主要維持足夠的狀態,如果必要,可以提供快速的故障恢復。
為了讓Standby NN的狀態和Active NN保持同步,即元數據保持一致,它們都將會和JournalNodes守護進程通信。當Active NN執行任何有關命名空間的修改,它需要持久化到一半以上的JournalNodes上(通過edits log持久化存儲),而Standby NN負責觀察edits log的變化,它能夠讀取從JNs中讀取edits信息,并更新其內部的命名空間。一旦Active NN出現故障,Standby NN將會保證從JNs中讀出了全部的Edits,然后切換成Active狀態。Standby NN讀取全部的edits可確保發生故障轉移之前,是和Active NN擁有完全同步的命名空間狀態(更多的關于Hadoop 2.x的HA相關知識,可以參考本博客的《Hadoop2.2.0中HDFS的高可用性實現原理》)。
那么這種機制是如何實現fsimage和edits的合并?在standby NameNode節點上會一直運行一個叫做CheckpointerThread的線程,這個線程調用StandbyCheckpointer類的doWork()函數,而doWork函數會每隔Math.min(checkpointCheckPeriod, checkpointPeriod)秒來坐一次合并操作,相關代碼如下:
01try {
02 Thread.sleep(1000 * checkpointConf.getCheckPeriod());
03 } catch (InterruptedException ie) {
04}
05
06public long getCheckPeriod() {
07 return Math.min(checkpointCheckPeriod, checkpointPeriod);
08}
09
10checkpointCheckPeriod = conf.getLong(
11 DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
12 DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
13
14checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
15 DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
上面的checkpointCheckPeriod和checkpointPeriod變量是通過獲取hdfs-site.xml以下兩個屬性的值得到:
01<property>
02 <name>dfs.namenode.checkpoint.period</name>
03 <value>3600</value>
04 <description>The number of seconds between two periodic checkpoints.
05 </description>
06</property>
07
08<property>
09 <name>dfs.namenode.checkpoint.check.period</name>
10 <value>60</value>
11 <description>The SecondaryNameNode and CheckpointNode will poll the NameNode
12 every 'dfs.namenode.checkpoint.check.period' seconds to query the number
13 of uncheckpointed transactions.
14 </description>
15</property>
當達到下面兩個條件的情況下,將會執行一次checkpoint:
01boolean needCheckpoint = false;
02if (uncheckpointed >= checkpointConf.getTxnCount()) {
03 LOG.info("Triggering checkpoint because there have been " +
04 uncheckpointed + " txns since the last checkpoint, which " +
05 "exceeds the configured threshold " +
06 checkpointConf.getTxnCount());
07 needCheckpoint = true;
08} else if (secsSinceLast >= checkpointConf.getPeriod()) {
09 LOG.info("Triggering checkpoint because it has been " +
10 secsSinceLast + " seconds since the last checkpoint, which " +
11 "exceeds the configured interval " + checkpointConf.getPeriod());
12 needCheckpoint = true;
13}
當上述needCheckpoint被設置成true的時候,StandbyCheckpointer類的doWork()函數將會調用doCheckpoint()函數正式處理checkpoint。當fsimage和edits的合并完成之后,它將會把合并后的fsimage上傳到Active NameNode節點上,Active NameNode節點下載完合并后的fsimage,再將舊的fsimage刪掉(Active NameNode上的)同時清除舊的edits文件。步驟可以歸類如下:
(1)、配置好HA后,客戶端所有的更新操作將會寫到JournalNodes節點的共享目錄中,可以通過下面配置
1<property>
2 <name>dfs.namenode.shared.edits.dir</name>
3 <value>qjournal://XXXX/mycluster</value>
4</property>
5
6<property>
7 <name>dfs.journalnode.edits.dir</name>
8 <value>/export1/hadoop2x/dfs/journal</value>
9</property>
(2)、Active Namenode和Standby NameNode從JournalNodes的edits共享目錄中同步edits到自己edits目錄中;
(3)、Standby NameNode中的StandbyCheckpointer類會定期的檢查合并的條件是否成立,如果成立會合并fsimage和edits文件;
(4)、Standby NameNode中的StandbyCheckpointer類合并完之后,將合并之后的fsimage上傳到Active NameNode相應目錄中;
(5)、Active NameNode接到新的fsimage文件之后,將舊的fsimage和edits文件清理掉;
(6)、通過上面的幾步,fsimage和edits文件就完成了合并,由于HA機制,會使得Standby NameNode和Active NameNode都擁有新的fsimage和edits文件(之前Hadoop 1.x的SecondaryNameNode中的fsimage和edits不是新的)
本博客文章除特別聲明,全部都是原創!
尊重原創,轉載請注明: 轉載自過往記憶(http://www.iteblog.com/)
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。