Ray,一個在集群和大型多核機器上高效運行Python代碼的框架。可以查看相關代碼和文檔。
許多人工智能算法在計算上都非常密集,并且顯示出復雜的通信模式。為此許多研究人員將大部分時間花在構建定制系統上,以高效地在集群中分發代碼。
然而,定制的系統通常是基于特定的單一算法或算法類。因此我們構建了Ray來幫助消除一堆冗余的工程任務,這些任務目前在每個新算法中反復出現。我們希望能夠重用一些基本的基礎元素來實現并高效地執行各種算法和應用程序。
Ray允許通過少的修改來遠程執行Python函數。
使用常規的Python時,在函數未執行完畢之前,如果調用函數,那么會被阻塞。下面這個例子的執行時間為8秒:
def f(): time.sleep(1) # Calls to f executed serially. results = [] for _ in range(8):
result = f()
results.append(result)
對于Ray,當調用遠程函數時會立即返回一個future(稱之為對象IDs)。接著創建一個任務,然后調度它,并在集群的某個地方去執行。下面的例子只需要1秒即可完成。
@ray.remote def f(): time.sleep(1) # Tasks executed in parallel. results = [] for _ in range(8):
result = f.remote()
results.append(result)
results = ray.get(results)
注意,惟一的變化是將@ray.remote裝飾器添加到了函數定義中,通過f.remote調用該函數,并且在對象IDs列表中調用ray.get(記住對象IDs是未來),以便阻塞進程直到相應的任務執行完成。

這是描述任務和對象的圖表。圓圈表示任務,而方框表示對象。8個單獨的任務之間沒有箭頭,表示所有的任務可以并行執行。
與MapReduce或Apache Spark這樣的批量同步并行框架相比,Ray的設計目的是支持需要細粒度任務依賴的人工智能應用程序。它與整個數據集的總統計量的計算不同,訓練過程可以對一小部分數據進行操作,也可以對少數任務的輸出進行操作。
依賴項可以通過將對象IDs(任務的輸出)傳遞到其他任務來進行編碼。
import numpy as np @ray.remote def aggregate_data(x, y): return x + y
data = [np.random.normal(size=1000) for i in range(4)] while len(data) > 1:
intermediate_result = aggregate_data.remote(data[0], data[1])
data = data[2:] + [intermediate_result]
result = ray.get(data[0])
通過將一些調用的輸出傳遞給aggregate_data,隨后調用aggregate_data,然后對這些任務之間的依賴進行編碼,這些任務可以由系統使用,從而制定調度決策,并協調對象的傳輸。注意,當將對象IDs傳遞到遠程函數調用時,實際的值將在函數執行之前被解壓,因此當執行aggregate_data函數時,x和y將是numpy數組。

這是描述任務和對象的圖表。圓圈表示任務,而方框表示對象。箭頭表明從任務到它們產生的對象或者從對象到依賴于它們的任務。
Ray使用參與者在任務之間共享可變狀態。下面的例子中多個任務共享Atari模擬器狀態。每個任務通過運行模擬器完成前面任務所遺留下的幾個步驟。
import gym @ray.remote class Simulator(object): def __init__(self): self.env = gym.make("Pong-v0")
self.env.reset() def step(self, action): return self.env.step(action) # Create a simulator, this will start a new worker that will run all # methods for this actor. simulator = Simulator.remote()
observations = [] for _ in range(4): # Take action 0 in the simulator. observations.append(simulator.step.remote(0))
每次調用simulator.step.remote生成一個在參與者上調度的任務。這些任務會改變模擬器對象的狀態,并且每次執行一個。
與遠程函數一樣,參與者的方法返回對象IDs(也就是future),這些對象可以被傳遞到其他任務中,并且可以用ray.get來檢索它們的值。

這是描述任務和對象的圖表。圓圈表示任務,而方框表示對象。個任務是參與者的構造函數。粗箭頭用于顯示在這個參與者上調用的方法共享參與者的底層狀態。
有時,當運行帶有變量持續時間的任務時,不希望等待所有的任務完成。相反,可能希望等待一半的任務完成,或者使用完一秒后完成的任務。
@ray.remote def f(): time.sleep(np.random.uniform(0, 5)) # Launch 10 tasks with variable durations. results = [f.remote() for _ in range(10)] # Wait until either five tasks have completed or two seconds have passed and # return a list of the object IDs whose tasks have finished. ready_ids, remaining_ids = ray.wait(results, num_returns=5, timeout=2000)
在本例中,ready_ids是一個對象IDs列表,它對應的任務已完成執行,而remaining_ids是剩余的對象IDs列表。
這段原始代碼使實現其他行為變得很容易,例如希望按照完成的順序來處理某些任務。
# Launch 10 tasks with variable durations.
remaining_ids = [f.remote() for _ in range(10)]
# Process the tasks in the order that they complete.
results = [] while len(remaining_ids) > 0:
ready_ids, remaining_ids = ray.wait(remaining_ids, num_returns=1)
results.append(ray.get(ready_ids[0]))
注意,可以簡單的修改上面的示例,以便上一個任務完成時自適應地啟動新任務。
序列化和反序列化數據常常是分布式計算的瓶頸。Ray讓同一機器上的工作進程通過共享內存訪問相同的對象。為了達到這一目的,Ray把內存中的對象存儲在每臺機器上為對象服務。
為了說明問題,假設我們創建了一些神經網絡權重,并希望將它們從一個Python進程傳送到另一個Python進程。
import numpy as np
weights = {"Variable{}".format(i): np.random.normal(size=5000000) for i in range(10)} # 2.68s
為了將神經網絡的權重傳輸到周圍,首先需要將它們序列化成一個連續的字節塊。這可以通過像pickle這樣的標準序列化庫來完成。
import pickle # Serialize the weights with pickle. Then deserialize them. pickled_weights = pickle.dumps(weights) # 0.986s new_weights = pickle.loads(pickled_weights) # 0.241s
反序列化所需的時間尤為重要,因為機器學習常見的一種模式是在單一的過程中聚集大量的值(例如神經網絡權重,轉出,或其他值),因此連續的反序列化步驟可能發生數百次。
為了減少在共享內存中對象進行反序列化所需要的時間,我們使用Apache Arrow數據布局。這使我們能夠在不掃描整個blob的情況下,計算對序列化的blob的偏移量。在實踐中還可以轉換成反序列化,達到幾個數量級的速度。
# Serialize the weights and copy them into the object store. Then deserialize # them. weights_id = ray.put(weights) # 0.525s new_weights = ray.get(weights_id) # 0.000622s
使用Arrow調用ray.put序列化權重,并將結果復制到對象存儲的內存中。然后調用ray.get對序列化的對象進行反序列化,并構造一個新的numpy字典數組。然而支持numpy數組的底層數組仍然存在于共享內存中,并且不會被復制到Python進程的堆中。
注意如果一臺不同的機器調用ray.get,那么相關的序列化對象將從一臺常用機器上復制到另一臺需要的機器上。
這個例子中,我們指明調用ray.put。然而通常情況下,只有當將Python對象傳遞到遠程函數或從遠程函數返回時,這個調用才會發生。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。