Spark是一個快速的、通用的分布式計算系統,而分布式的特性就意味著,必然存在節點間的通信。本文主要介紹不同的Spark組件之間是如何通過RPC(Remote Procedure Call) 進行點對點通信的,分為三個章節:
Spark的RPC主要在兩個模塊中:
為了更好的了解Spark RPC的內部實現細節,我基于Spark 2.1版本抽離了RPC通信的部分,單獨啟了一個項目,放到了github以及發布到Maven中央倉庫做學習使用,提供了比較好的上手文檔、參數設置和性能評估。下面就通過這個模塊對Spark RPC先做一個感性的認識。
以下的代碼均可以在kraps-rpc找到。
假設我們要開發一個Hello服務,客戶端可以傳輸string,服務端響應hi或者bye,并echo回去輸入的string。
步,定義一個HelloEndpoint繼承自RpcEndpoint表明可以并發的調用該服務,如果繼承自ThreadSafeRpcEndpoint則表明該Endpoint不允許并發。
class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint { override def onStart(): Unit = {
println("start hello endpoint")
} override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case SayHi(msg) => {
println(s"receive $msg")
context.reply(s"hi, $msg")
} case SayBye(msg) => {
println(s"receive $msg")
context.reply(s"bye, $msg")
}
} override def onStop(): Unit = {
println("stop hello endpoint")
}
} case class SayHi(msg: String) case class SayBye(msg: String)
和Java傳統的RPC解決方案對比,可以看出這里不用定義接口或者方法標示(比如通常的id或者name),使用scala的模式匹配進行方法的路由。雖然點對點通信的契約交換受制于語言,這里就是SayHi和SayBye兩個case class,但是Spark RPC定位于內部組件通信,所以無傷大雅。
第二步,把剛剛開發好的Endpoint交給Spark RPC管理其生命周期,用于響應外部請求。RpcEnvServerConfig可以定義一些參數、server名稱(僅僅是一個標識)、bind地址和端口。通過NettyRpcEnvFactory這個工廠方法,生成RpcEnv,RpcEnv是整個Spark RPC的核心所在,后文會詳細展開,通過setupEndpoint將”hello-service”這個名字和步定義的Endpoint綁定,后續client調用路由到這個Endpoint就需要”hello-service”這個名字。調用awaitTermination來阻塞服務端監聽請求并且處理。
val config = RpcEnvServerConfig(new RpcConf(), "hello-server", "localhost", 52345) val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()
第三步,開發一個client調用剛剛啟動的server,首先RpcEnvClientConfig和RpcEnv都是必須的,然后通過剛剛提到的”hello-service”名字新建一個遠程Endpoint的引用(Ref),可以看做是stub,用于調用,這里首先展示通過異步的方式來做請求。
val rpcConf = new RpcConf()
val config = RpcEnvClientConfig(rpcConf, "hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hell-service")
val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") }
Await.result(future, Duration.apply("30s"))
也可以通過同步的方式,在新的Spark中askWithRetry實際已更名為askSync。
val result = endPointRef.askWithRetry[String](SayBye("neo"))
這就是Spark RPC的通信過程,使用起來易用性可想而知,非常簡單,RPC框架屏蔽了Socket I/O模型、線程模型、序列化/反序列化過程、使用netty做了包識別,長連接,網絡重連重試等機制。
在Spark內部,很多的Endpoint以及EndpointRef與之通信都是通過這種形式的,舉例來說比如driver和executor之間的交互用到了心跳機制,使用HeartbeatReceiver來實現,這也是一個Endpoint,它的注冊在SparkContext初始化的時候做的,代碼如下:
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
而它的調用在Executor內的方式如下:
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
首先說明下,自Spark 2.0后已經把Akka這個RPC框架剝離出去了(詳細見SPARK-5293),原因很簡單,因為很多用戶會使用Akka做消息傳遞,那么就會和Spark內嵌的版本產生沖突,而Spark也僅僅用了Akka做RPC,所以2.0之后,基于底層的org.apache.spark.spark-network-common模塊實現了一個類似Akka Actor消息傳遞模式的scala模塊,封裝在了core里面,kraps-rpc也就是把這個部分從core里面剝離出來獨立了一個項目。
雖然剝離了Akka,但是還是沿襲了Actor模式中的一些概念,在現在的Spark RPC中有如下映射關系。
RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem
底層通信全部使用netty進行了替換,使用的是org.apache.spark.spark-network-common這個內部lib。
這里先上一個UML圖展示了Spark RPC模塊內的類關系,白色的是Spark-core中的scala類,黃色的是org.apache.spark.spark-network-common中的java類。
不要被這張圖所嚇倒,經過下面的解釋分析,相信讀者可以領會其內涵,不用細究其設計的合理度,Spark是一個發展很快、不斷演進的項目,代碼不是一成不變的,持續變化是一定的。
RpcEndpoint和RpcCallContext
先看左側的RpcEndpoint,RpcEndpoint是一個可以響應請求的服務,和Akka中的Actor類似,從它的提供的方法簽名(如下)可以看出,receive方法是單向方式的,可以比作UDP,而receiveAndReply是應答方式的,可以比作TCP。它的子類實現可以選擇性的覆蓋這兩個函數,我們章實現的HelloEndpoint以及Spark中的HeartbeatReceiver都是它的子類。
def receive: PartialFunction[Any, Unit] = {
case _ => throw new RpcException(self + " does not implement 'receive'")
} def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new RpcException(self + " won't reply anything"))
}
其中RpcCallContext是用于分離核心業務邏輯和底層傳輸的橋接方法,這也可以看出Spark RPC多用組合,聚合以及回調callback的設計模式來做OO抽象,這樣可以剝離業務邏輯->RPC封裝(Spark-core模塊內)->底層通信(spark-network-common)三者。RpcCallContext可以用于回復正常的響應以及錯誤異常,例如:
reply(response: Any) // 回復一個message,可以是一個case class。 sendFailure(e: Throwable) // 回復一個異常,可以是Exception的子類,由于Spark RPC默認采用Java序列化方式,所以異常可以完整的在客戶端還原并且作為cause re-throw出去。
RpcCallContext也分為了兩個子類,分別是LocalNettyRpcCallContext和RemoteNettyRpcCallContext,這個主要是框架內部使用,如果是本地就走LocalNettyRpcCallContext直接調用Endpoint即可,否則就走RemoteNettyRpcCallContext需要通過RPC和遠程交互,這點也體現了RPC的核心概念,就是如何執行另外一個地址空間上的函數、方法,就仿佛在本地調用一樣。
另外,RpcEndpoint還提供了一系列回調函數覆蓋。
另外需要注意下,它的一個子類是ThreadSafeRpcEndpoint,很多Spark中的Endpoint繼承了這個類,Spark RPC框架對這種Endpoint不做并發處理,也就是同一時間只允許一個線程在做調用。
還有一個默認的RpcEndpoint叫做RpcEndpointVerifier,每一個RpcEnv初始化的時候都會注冊上這個Endpoint,因為客戶端的調用每次都需要先詢問服務端是否存在某一個Endpoint。
RpcEndpointRef
RpcEndpointRef類似于Akka中ActorRef,顧名思義,它是RpcEndpoint的引用,提供的方法send等同于!, ask方法等同于?,send用于單向發送請求(RpcEndpoint中的receive響應它),提供fire-and-forget語義,而ask提供請求響應的語義(RpcEndpoint中的receiveAndReply響應它),默認是需要返回response的,帶有超時機制,可以同步阻塞等待,也可以返回一個Future句柄,不阻塞發起請求的工作線程。
RpcEndpointRef是客戶端發起請求的入口,它可以從RpcEnv中獲取,并且聰明的做本地調用或者RPC。
RpcEnv和NettyRpcEnv
類庫中核心的就是RpcEnv,剛剛提到了這就是ActorSystem,服務端和客戶端都可以使用它來做通信。
對于server side來說,RpcEnv是RpcEndpoint的運行環境,負責RpcEndpoint的整個生命周期管理,它可以注冊或者銷毀Endpoint,解析TCP層的數據包并反序列化,封裝成RpcMessage,并且路由請求到指定的Endpoint,調用業務邏輯代碼,如果Endpoint需要響應,把返回的對象序列化后通過TCP層再傳輸到遠程對端,如果Endpoint發生異常,那么調用RpcCallContext.sendFailure來把異常發送回去。
對client side來說,通過RpcEnv可以獲取RpcEndpoint引用,也就是RpcEndpointRef的。
RpcEnv是和具體的底層通信模塊交互的負責人,它的伴生對象包含創建RpcEnv的方法,簽名如下:
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
RpcEnv的創建由RpcEnvFactory負責,RpcEnvFactory目前只有一個子類是NettyRpcEnvFactory,原來還有AkkaRpcEnvFactory。NettyRpcEnvFactory.create方法一旦調用就會立即在bind的address和port上啟動server。
它依賴的RpcEnvConfig就是一個包含了SparkConf以及一些參數(kraps-rpc中更名為RpcConf)。RpcEnv的參數都需要從RpcEnvConfig中拿,基本的hostname和port,還有高級些的連接超時、重試次數、Reactor線程池大小等等。
下面看看RpcEnv常用的兩個方法:
// 注冊endpoint,必須指定名稱,客戶端路由就靠這個名稱來找endpoint def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
// 拿到一個endpoint的引用 def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
NettyRpcEnv由NettyRpcEnvFactory.create創建,這是整個Spark core和org.apache.spark.spark-network-common的橋梁,內部leverage底層提供的通信能力,同時包裝了一個類Actor的語義。上面兩個核心的方法,setupEndpoint會在Dispatcher中注冊Endpoint,setupEndpointRef會先去調用RpcEndpointVerifier嘗試驗證本地或者遠程是否存在某個endpoint,然后再創建RpcEndpointRef。更多關于服務端、客戶端調用的細節將在時序圖中闡述,這里不再展開。
Dispatcher和Inbox
NettyRpcEnv中包含Dispatcher,主要針對服務端,幫助路由到正確的RpcEndpoint,并且調用其業務邏輯。
這里需要先闡述下Reactor模型,Spark RPC的Socket I/O一個典型的Reactor模型的,但是結合了Actor pattern中的mailbox,可謂是一種混合的實現方式。
使用Reactor模型,由底層netty創建的EventLoop做I/O多路復用,這里使用Multiple Reactors這種形式,如下圖所示,從netty的角度而言,Main Reactor和Sub Reactor對應BossGroup和WorkerGroup的概念,前者負責監聽TCP連接、建立和斷開,后者負責真正的I/O讀寫,而圖中的ThreadPool就是的Dispatcher中的線程池,它來解耦開來耗時的業務邏輯和I/O操作,這樣就可以更scalabe,只需要少數的線程就可以處理成千上萬的連接,這種思想是標準的分治策略,offload非I/O操作到另外的線程池。
真正處理RpcEndpoint的業務邏輯在ThreadPool里面,中間靠Reactor線程中的handler處理decode成RpcMessage,然后投遞到Inbox中,所以compute的過程在另外的下面介紹的Dispatcher線程池里面做。
剛剛還提到了Actor pattern中mailbox模式,Spark RPC早起源于Akka,所以進化到現在,仍然了使用了這個模式。這里就介紹Inbox,每個Endpoint都有一個Inbox,Inbox里面有一個InboxMessage的鏈表,InboxMessage有很多子類,可以是遠程調用過來的RpcMessage,可以是遠程調用過來的fire-and-forget的單向消息OneWayMessage,還可以是各種服務啟動,鏈路建立斷開等Message,這些Message都會在Inbox內部的方法內做模式匹配,調用相應的RpcEndpoint的函數(都是一一對應的)。
Dispatcher中包含一個MessageLoop,它讀取LinkedBlockingQueue中的投遞RpcMessage,根據客戶端指定的Endpoint標識,找到Endpoint的Inbox,然后投遞進去,由于是阻塞隊列,當沒有消息的時候自然阻塞,一旦有消息,就開始工作。Dispatcher的ThreadPool負責消費這些Message。
Dispatcher的ThreadPool它使用參數spark.rpc.netty.dispatcher.numThreads來控制數量,如果kill -3 每個Spark driver或者executor進程,都會看到N個dispatcher線程:
"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]
那么另外的問題是誰會調用Dispatcher分發Message的方法呢?答案是RpcHandler的子類NettyRpcHandler,這就是Reactor中的線程做的事情。RpcHandler是底層org.apache.spark.spark-network-common提供的handler,當遠程的數據包解析成功后,會調用這個handler做處理。
這樣就完成了一個完全異步的流程,Network IO通信由底層負責,然后由Dispatcher分發,只要Dispatcher中的InboxMessage的鏈表足夠大,那么就可以讓Dispatcher中的ThreadPool慢慢消化消息,和底層的IO解耦開來,完全在獨立的線程中完成,一旦完成Endpoint內部業務邏輯,利用RpcCallContext回調來做消息的返回。
Outbox
NettyRpcEnv中包含一個ConcurrentHashMap[RpcAddress, Outbox],每個遠程Endpoint都對應一個Outbox,這和上面Inbox遙相呼應,是一個mailbox似的實現方式。
和Inbox類似,Outbox內部包含一個OutboxMessage的鏈表,OutboxMessage有兩個子類,OneWayOutboxMessage和RpcOutboxMessage,分別對應調用RpcEndpoint的receive和receiveAndReply方法。
NettyRpcEnv中的send和ask方法會調用指定地址Outbox中的send方法,當遠程連接未建立時,會先建立連接,然后去消化OutboxMessage。
同樣,一個問題是Outbox中的send方法如何將消息通過Network IO發送出去,如果是ask方法又是如何讀取遠程響應的呢?答案是send方法通過org.apache.spark.spark-network-common創建的TransportClient發送出去消息,由Reactor線程負責序列化并且發送出去,每個Message都會返回一個UUID,由底層來維護一個發送出去消息與其Callback的HashMap,當Netty收到完整的遠程RpcResponse時候,回調響應的Callback,做反序列化,進而回調Spark core中的業務邏輯,做Promise/Future的done,上層退出阻塞。
這也是一個異步的過程,發送消息到Outbox后,直接返回,Network IO通信由底層負責,一旦RPC調用成功或者失敗,都會回調上層的函數,做相應的處理。
spark-network-common中的類
這里暫不做過多的展開,都是基于Netty的封裝,有興趣的讀者可以自行閱讀源碼,當然還可以參考我之前開源的Navi-pbrpc框架的代碼,其原理是基本相同的。
服務啟動
話不多述,直接上圖。
服務端響應
階段,IO接收。TransportRequestHandler是netty的回調handler,它會根據wire format(下文會介紹)解析好一個完整的數據包,交給NettyRpcEnv做反序列化,如果是RPC調用會構造RpcMessage,然后回調RpcHandler的方法處理RpcMessage,內部會調用Dispatcher做RpcMessage的投遞,放到Inbox中,到此結束。
第二階段,IO響應。MessageLoop獲取帶處理的RpcMessage,交給Dispatcher中的ThreadPool做處理,實際就是調用RpcEndpoint的業務邏輯,通過RpcCallContext將消息序列化,通過回調函數,告訴TransportRequestHandler這有一個消息處理完畢,響應回去。
這里請重點體會異步處理帶來的便利,使用Reactor和Actor mailbox的結合的模式,解耦了消息的獲取以及處理邏輯。
客戶端請求
客戶端一般需要先建立RpcEnv,然后獲取RpcEndpointRef。
階段,IO發送。利用RpcEndpointRef做send或者ask動作,這里以send為例,send會先進行消息的序列化,然后投遞到指定地址的Outbox中,Outbox如果發現連接未建立則先嘗試建立連接,然后調用底層的TransportClient發送數據,直接通過該netty的API完成,完成后即可返回,這里返回了UUID作為消息的標識,用于下一個階段的回調,使用的角度來說可以返回一個Future,客戶端可以阻塞或者繼續做其他操作。
第二,IO接收。TransportResponseHandler接收到遠程的響應后,會先做反序列號,然后回調階段的Future,完成調用,這個過程全部在Reactor線程中完成的,通過Future做線程間的通知。
Spark RPC作為RPC傳輸層選擇TCP協議,做可靠的、全雙工的binary stream通道。
做一個高性能/scalable的RPC,需要能夠滿足,服務端盡可能多的處理并發請求,第二,同時盡可能短的處理完畢。CPU和I/O之前天然存在著差異,網絡傳輸的延時不可控,CPU資源寶貴,系統進程/線程資源寶貴,為了盡可能避免Socket I/O阻塞服務端和客戶端調用,有一些模式(pattern)是可以應用的。Spark RPC的I/O Model由于采用了Netty,因此使用的底層的I/O多路復用(I/O Multiplexing)機制,這里可以通過spark.rpc.io.mode參數設置,不同的平臺使用的技術不同,例如linux使用epoll。
線程模型采用Multi-Reactors + mailbox的異步方式來處理,在上文中已經介紹過。
Schema Declaration和序列化方面,Spark RPC默認采用Java native serialization方案,主要從兼容性和JVM平臺內部組件通信,以及scala語言的融合考慮,所以不具備跨語言通信的能力,性能上也不是追求極致,目前還沒有使用Kyro等更好序列化性能和數據大小的方案。
協議結構,Spark RPC采用私有的wire format如下,采用headr+payload的組織方式,header中包括整個frame的長度,message的類型,請求UUID。為解決TCP粘包和半包問題,以及組織成完整的Message的邏輯都在org.apache.spark.network.protocol.MessageEncoder中。
使用wireshake具體分析一下。
首先看一個RPC請求,就是調用章說的HelloEndpoint,客戶端調用分兩個TCP Segment傳輸,這是因為Spark使用netty的時候header和body分別writeAndFlush出去。
下圖是個TCP segment:
例子中藍色的部分是header,頭中的字節解析如下:
00 00 00 00 00 00 05 d2 // 十進制1490,是整個frame的長度
03一個字節表示的是RpcRequest,枚舉定義如下:
RpcRequest(3) RpcResponse(4) RpcFailure(5) StreamRequest(6) StreamResponse(7) StreamFailure(8), OneWayMessage(9) User(-1)
每個字節的意義如下:
4b ac a6 9f 83 5d 17 a9 // 8個字節是UUID 05 bd // 十進制1469,payload長度
具體的Payload就長下面這個樣子,可以看出使用Java native serialization,一個簡單的Echo請求就有1469個字節,還是很大的,序列化的效率不高。但是Spark RPC定位內部通信,不是一個通用的RPC框架,并且使用的量非常小,所以這點消耗也就可以忽略了,還有Spark Structured Streaming使用該序列化方式,其性能還是可以滿足要求的。
另外,作者在kraps-rpc中還給Spark-rpc做了一次性能測試,具體可以參考github。
作者從好奇的角度來深度挖掘了下Spark RPC的內幕,并且從2.1版本的Spark core中獨立出了一個專門的項目Kraps-rpc,放到了github以及發布到Maven中央倉庫做學習使用,提供了比較好的上手文檔、參數設置和性能評估,在整合kraps-rpc還發現了一個小的改進點,給Spark提了一個PR——[SPARK-21701],已經被merge到了主干,算是contribute社區了(10086個開心)。
接著深入剖析了Spark RPC模塊內的類組織關系,使用UML類圖和時序圖幫助讀者更好的理解一些核心的概念,包括RpcEnv,RpcEndpoint,RpcEndpointRef等,以及I/O的設計模式,包括I/O多路復用,Reactor和Actor mailbox等,這里還是重點提下Spark RPC的設計哲學,利用netty強大的Socket I/O能力,構建一個異步的通信框架。后,從TCP層的segment二進制角度分析了wire protocol。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。