RPC原理與實現
簡介
RPC 的主要功能 目標是讓構建分布式計算(應用)更容易 ,在提供強大的遠程調用能力時不損失本地調用的語義簡潔性。為實現該目標,RPC 框架需提供一種透明調用機制讓使用者 不必顯式的區分本地調用和遠程調用 。
調用分類
RPC 調用分以下兩種:
- 同步調用
客戶方等待調用執行完成并返回結果。 - 異步調用
客戶方調用后不用等待執行結果返回,但依然可以通過回調通知等方式獲取返回結果。 若客戶方不關心調用返回結果,則變成單向異步調用,單向調用不用返回結果。
異步和同步的區分在于是否等待服務端執行完成并返回結果 。
結構拆解
RPC 服務方通過 RpcServer 去導出(export)遠程接口方法 ,而 客戶方通過 RpcClient 去引入(import)遠程接口方法 。 客戶方像調用本地方法一樣去調用遠程接口方法, RPC 框架提供接口的代理實現,實際的調用將委托給代理 RpcProxy 。 代理封裝調用信息并將調用轉交給 RpcInvoker 去實際執行 。 在客戶端的 RpcInvoker 通過連接器 RpcConnector 去維持與服務端的通道 RpcChannel, 并使用 RpcProtocol 執行協議編碼(encode)并將編碼后的請求消息通過通道發送給服務方。
RPC 服務端接收器 RpcAcceptor 接收客戶端的調用請求 ,同樣使用 RpcProtocol 執行協議解碼(decode)。 解碼后的調用信息傳遞給 RpcProcessor 去控制處理調用過程, 最后再委托調用給 RpcInvoker 去實際執行并返回調用結果 。
組件職責
上面我們進一步拆解了 RPC 實現結構的各個組件組成部分,下面我們詳細說明下每個組件的職責劃分。
1、 RpcServer
負責導出(export)遠程接口
2、 RpcClient
負責導入(import)遠程接口的代理實現
3、 RpcProxy
遠程接口的代理實現
4、 RpcInvoker
客戶方實現:負責編碼調用信息和發送調用請求到服務方并 等待調用結果返回
服務方實現:負責調用服務端接口的具體實現并返回調用結 果
5、 RpcProtocol
負責協議編/解碼
6、 RpcConnector
負責維持客戶方和服務方的連接通道和發送數據到服務方
7、 RpcAcceptor
負責接收客戶方請求并返回請求結果
8、 RpcProcessor
負責在服務方控制調用過程,包括管理調用線程池、超時時 間等
9、 RpcChannel
數據傳輸通道
實現分析
在進一步拆解了組件并劃分了職責之后,這里以在 java 平臺實現該 RPC 框架概念模型為例,詳細分析下實現中需要考慮的因素。
5.1導出遠程接口
導出遠程接口的意思是指 只有導出的接口可以供遠程調用,而未導出的接口則不能 。 在 java 中導出接口的代碼片段可能如下:
DemoService demo = new ...;
RpcServer server = new ...;
server.export(DemoService.class, demo, options);
我們可以導出整個接口,也可以更細粒度一點只導出接口中的某些方法,如: java 中還有一種比較特殊的調用就是多態, 也就是一個接口可能有多個實現,
// 只導出 DemoService 中簽名為 hi(String s) 的方法server.export(DemoService.class, demo, "hi", new Class<?>[] { String.class }, options);
那么遠程調用時到底調用哪個? 這個本地調用的語義是通過 jvm 提供的引用多態性隱式實現的,那么對于 RPC 來說跨進程的調用就沒法隱式實現了。 如果前面 DemoService 接口有 2 個實現,那么在導出接口時就需要特殊標記不同的實現,如:
DemoService demo = new ...;
DemoService demo2 = new ...;
RpcServer server = new ...;
server.export(DemoService.class, demo, options);
server.export("demo2", DemoService.class, demo2, options);
上面 demo2 是另一個實現, 我們標記為 demo2 來導出, 那么遠程調用時也需要傳遞該標記才能調用到正確的實現類,這樣就解決了多態調用的語義 。
5.2導入遠程接口與客戶端代理
導入相對于導出遠程接口,客戶端代碼為了能夠發起調用必須要獲得遠程接口的方法或過程定義 。目前, 大部分跨語言平臺 RPC 框架采用根據 IDL 定義通過 code generator 去生成 stub 代碼,這種方式下實際導入的過程就是通過代碼生成器在編譯期完成的 。 我所使用過的一些跨語言平臺 RPC 框架如 CORBAR、WebService、ICE、Thrift 均是此類方式。
代碼生成的方式對跨語言平臺 RPC 框架而言是必然的選擇,而對于同一語言平臺的 RPC 則可以通過共享接口定義來實現 。 在 java 中導入接口的代碼片段可能如下:
RpcClient client = new ...;
DemoService demo = client.refer(DemoService.class);
demo.hi("how are you?");
5.3協議編解碼
客戶端代理在發起調用前需要對調用信息進行編碼,這就要 考慮需要編碼些什么信息并以什么格式傳輸到服務端才能讓服務端完成調用 。 出于效率考慮,編碼的信息越少越好(傳輸數據少),編碼的規則越簡單越好(執行效率高)。 我們先看下需要編碼些什么信息:
- 調用編碼
接口方法:包括接口名、方法名
方法參數:包括參數類型、參數值
調用屬性:包括調用屬性信息,例如調用附件隱式參數、調用超時時間等
- 返回編碼
返回結果:接口方法中定義的返回值
返回碼:異常返回碼
返回異常信息:調用異常信息
除了以上這些必須的調用信息, 我們可能還需要一些元信息以方便程序編解碼以及未來可能的擴展 。 這樣我們的編碼消息里面就分成了兩部分, 一部分是元信息、另一部分是調用的必要信息 。 如果設計一種 RPC 協議消息的話, 元信息我們把它放在協議消息頭中,而必要信息放在協議消息體中 。 下面給出一種概念上的 RPC 協議消息設計格式:
消息頭
magic : 協議魔數,為解碼設計
header size: 協議頭長度,為擴展設計
version : 協議版本,為兼容設計
st : 消息體序列化類型
hb : 心跳消息標記,為長連接傳輸層心跳設計
ow : 單向消息標記,
rp : 響應消息標記,不置位默認是請求消息
status code: 響應消息狀態碼
reserved : 為字節對齊保留
message id : 消息 id
body size : 消息體長度
消息體
采用序列化編碼,常見有以下格式
xml : 如 webservie SOAP
json : 如 JSON-RPC
binary: 如 thrift; hession; kryo 等
格式確定后編解碼就簡單了,由于頭長度一定所以我們比較關心的就是消息體的序列化方式。 序列化我們關心三個方面:
序列化和反序列化的效率,越快越好。
序列化后的字節長度,越小越好。
序列化和反序列化的兼容性,接口參數對象若增加了字段,是否兼容。
5.4 傳輸服務
協議編碼之后,自然就是需要將編碼后的 RPC 請求消息傳輸到服務方,服務方執行后返回結果消息或確認消息給客戶方。 RPC 的應用場景實質是一種可靠的請求應答消息流,和 HTTP 類似 。 因此 選擇長連接方式的 TCP 協議會更高效,與 HTTP 不同的是在協議層面我們定義了每個消息的唯一 id,因此可以更容易的復用連接 。
既然使用長連接,那么第一個問題是到底 client 和 server 之間需要多少根連接? 實際上單連接和多連接在使用上沒有區別,對于數據傳輸量較小的應用類型,單連接基本足夠 。 單連接和多連接最大的區別在于,每根連接都有自己私有的發送和接收緩沖區, 因此大數據量傳輸時分散在不同的連接緩沖區會得到更好的吞吐效率 。 所以,如果你的數據傳輸量不足以讓單連接的緩沖區一直處于飽和狀態的話,那么使用多連接并不會產生任何明顯的提升,反而會增加連接管理的開銷。
連接是由 client 端發起建立并維持。 如果 client 和 server 之間是直連的,那么連接一般不會中斷(當然物理鏈路故障除外)。 如果 client 和 server 連接經過一些負載中轉設備,有可能連接一段時間不活躍時會被這些中間設備中斷。 為了保持連接有必要定時為每個連接發送心跳數據以維持連接不中斷 。 心跳消息是 RPC 框架庫使用的內部消息,在前文協議頭結構中也有一個專門的心跳位, 就是用來標記心跳消息的,它對業務應用透明。
5.5 執行調用
client stub 所做的事情僅僅是編碼消息并傳輸給服務方,而真正調用過程發生在服務方 。 server stub 從前文的結構拆解中我們細分了 RpcProcessor 和 RpcInvoker 兩個組件, 一個負責控制調用過程,一個負責真正調用 。這里我們還是以 java 中實現這兩個組件為例來分析下它們到底需要做什么?
java 中實現代碼的動態接口調用目前一般通過反射調用。 除了原生的 jdk 自帶的反射,一些第三方庫也提供了性能更優的反射調用, 因此 RpcInvoker 就是封裝了反射調用的實現細節 。
調用過程的控制需要考慮哪些因素,RpcProcessor 需要提供什么樣地調用控制服務呢? 下面提出幾點以啟發思考:
1、效率提升
每個請求應該盡快被執行,因此我們不能每請求來再創建線程去執行,需要提供線程池服務。
2、資源隔離
當我們導出多個遠程接口時,如何避免單一接口調用占據所有線程資源,而引發其他接口執行阻塞。
3、超時控制
當某個接口執行緩慢,而 client 端已經超時放棄等待后,server 端的線程繼續執行此時顯得毫無意義。
5.6異常處理
無論 RPC 怎樣努力把遠程調用偽裝的像本地調用,但它們依然有很大的不同點,而且有一些異常情況是在本地調用時絕對不會碰到的。在說異常處理之前,我們先比較下本地調用和 RPC調用的一些差異:
1、 本地調用一定會執行,而遠程調用則不一定,調用消息可能因為網絡原因并未發送到服務方。
2、 本地調用只會拋出接口聲明的異常,而遠程調用還會拋出 RPC 框架運行時的其他異常。
3、 本地調用和遠程調用的性能可能差距很大,這取決于 RPC 固有消耗所占的比重。
正是這些區別決定了使用 RPC 時需要更多考量。 當調用遠程接口拋出異常時,異常可能是一個業務異常, 也可能是 RPC 框架拋出的運行時異常(如:網絡中斷等) 。 業務異常表明服務方已經執行了調用,可能因為某些原因導致未能正常執行, 而 RPC 運行時異常則有可能服務方根本沒有執行,對調用方而言的異常處理策略自然需要區分。
由于 RPC 固有的消耗相對本地調用高出幾個數量級, 本地調用的固有消耗是納秒級,而 RPC 的固有消耗是在毫秒級 。 那么對于過于輕量的計算任務就并不合適導出遠程接口由獨立的進程提供服務, 只有花在計算任務上時間遠遠高于 RPC 的固有消耗才值得導出為遠程接口提供服務 。
如何調用他人的遠程服務
由于各服務部署在不同機器,服務間的調用免不了網絡通信過程,服務消費方每調用一個服務都要寫一坨網絡通信相關的代碼,不僅復雜而且極易出錯。
如果有一種方式能讓我們像調用本地服務一樣調用遠程服務,而讓調用者對網絡通信這些細節透明 ,那么將大大提高生產力,比如服務消費方在執行helloWorldService.sayHello(“test”)時,實質上調用的是遠端的服務。 這種方式其實就是RPC(Remote Procedure Call Protocol) ,在各大互聯網公司中被廣泛使用,如阿里巴巴的hsf、dubbo(開源)、非死book的thrift(開源)、Google grpc(開源)、推ter的finagle等。
要讓網絡通信細節對使用者透明,我們自然需要對通信細節進行封裝 ,我們先看下一個RPC調用的流程:
1)服務消費方(client)調用以本地調用方式調用服務;
2)client stub接收到調用后負責將方法、參數等組裝成能夠進行網絡傳輸的消息體;
3)client stub找到服務地址,并將消息發送到服務端;
4)server stub收到消息后進行解碼;
5)server stub根據解碼結果調用本地的服務;
6)本地服務執行并將結果返回給server stub;
7)server stub將返回結果打包成消息并發送至消費方;
8)client stub接收到消息,并進行解碼;
9)服務消費方得到最終結果。
RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節透明。
6.1怎么做到透明化遠程服務調用
怎么封裝通信細節才能讓用戶像以本地調用方式調用遠程服務呢?對java來說就是使用代理! java代理有兩種方式:1) jdk 動態代理;2)字節碼生成 。盡管字節碼生成方式實現的代理更為強大和高效,但代碼不易維護,大部分公司實現RPC框架時還是選擇動態代理方式。
下面簡單介紹下動態代理怎么實現我們的需求。我們需要實現RPCProxyClient代理類, 代理類的invoke方法中封裝了與遠端服務通信的細節 ,消費方首先從RPCProxyClient獲得服務提供方的接口,當執行helloWorldService.sayHello(“test”)方法時就會調用invoke方法。
public class RPCProxyClient implements java.lang.reflect.InvocationHandler{
private Object obj;
public RPCProxyClient(Object obj){
this.obj=obj;
}
/**
* 得到被代理對象;
*/
public static Object getProxy(Object obj){ return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(), obj.getClass().getInterfaces(), new RPCProxyClient(obj));
}
/**
* 調用此方法執行
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//結果參數; Object result = new Object();
// ...執行通信相關邏輯
// ... return result;
}
}
public class Test {
public static void main(String[] args) { HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class);
helloWorldService.sayHello("test");
}
}</code></pre>
6.2怎么對消息進行編碼和解碼
1、 確定消息數據結構
上節講了invoke里需要封裝通信細節,而 通信的第一步就是要確定客戶端和服務端相互通信的消息結構 。客戶端的請求消息結構一般需要包括以下內容:
1)接口名稱
在我們的例子里接口名是“HelloWorldService”,如果不傳,服務端就不知道調用哪個接口了;
2)方法名
一個接口內可能有很多方法,如果不傳方法名服務端也就不知道調用哪個方法;
3)參數類型&參數值
參數類型有很多,比如有bool、int、long、double、string、map、list,甚至如struct(class);以及相應的參數值;
4)超時時間
5)requestID
標識唯一請求id,在下面一節會詳細描述requestID的用處。
同理,服務端返回的消息結構一般包括以下內容。
1)返回值
2)狀態code
3)requestID
2、 序列化
一旦確定了消息的數據結構后,下一步就是要考慮序列化與反序列化了。
什么是序列化? 序列化就是將數據結構或對象轉換成二進制串的過程,也就是編碼的過程。
什么是反序列化? 將在序列化過程中所生成的二進制串轉換成數據結構或者對象的過程。
為什么需要序列化? 轉換為二進制串后才好進行網絡傳輸嘛!為什么需要反序列化?將二進制轉換為對象才好進行后續處理!
現如今序列化的方案越來越多,每種序列化方案都有優點和缺點,它們在設計之初有自己獨特的應用場景,那到底選擇哪種呢?從RPC的角度上看,主要看三點:
1)通用性 ,比如是否能支持Map等復雜的數據結構;
2)性能 ,包括時間復雜度和空間復雜度,由于RPC框架將會被公司幾乎所有服務使用,如果序列化上能節約一點時間,對整個公司的收益都將非常可觀,同理如果序列化上能節約一點內存,網絡帶寬也能省下不少;
3)可擴展性 ,對互聯網公司而言,業務變化快,如果序列化協議具有良好的可擴展性,支持自動增加新的業務字段,刪除老的字段,而不影響老的服務,這將大大提供系統的健壯性。
目前國內各大互聯網公司廣泛使用hessian、protobuf、thrift、avro等成熟的序列化解決方案來搭建RPC框架,這些都是久經考驗的解決方案。
6.3通信
消息數據結構被序列化為二進制串后,下一步就要進行網絡通信了。 目前有兩種IO通信模型:1)BIO;2)NIO 。一般RPC框架需要支持這兩種IO模型。
如何實現RPC的IO通信框架? 1)使用java nio方式自研 ,這種方式較為復雜,而且很有可能出現隱藏bug,見過一些互聯網公司使用這種方式; 2)基于mina ,mina在早幾年比較火熱,不過這些年版本更新緩慢; 3)基于netty ,現在很多RPC框架都直接基于netty這一IO通信框架,比如阿里巴巴的HSF、dubbo,推ter的finagle等。
6.4消息里為什么要帶有requestID
如果使用netty的話, 一般會用channel.writeAndFlush()方法來發送消息二進制串,這個方法調用后對于整個遠程調用(從發出請求到接收到結果)來說是一個異步的,即對于當前線程來說,將請求發送出來后,線程就可以往后執行了,至于服務端的結果,是服務端處理完成后,再以消息的形式發送給客戶端的 。于是這里出現以下兩個問題:
1)怎么讓當前線程“暫停”,等結果回來后,再向后執行?
2)如果有多個線程同時進行遠程方法調用,這時建立在client server之間的socket連接上會有很多雙方發送的消息傳遞,前后順序也可能是隨機的,server處理完結果后,將結果消息發送給client,client收到很多消息,怎么知道哪個消息結果是原先哪個線程調用的?
如下圖所示,線程A和線程B同時向client socket發送請求requestA和requestB,socket先后將requestB和requestA發送至server,而server可能將responseA先返回,盡管requestA請求到達時間更晚。我們需要 一種機制保證responseA丟給ThreadA,responseB丟給ThreadB 。

怎么解決呢?
1)client線程每次通過socket調用一次遠程接口前,生成一個唯一的ID,即requestID(requestID必需保證在一個Socket連接里面是唯一的), 一般常常使用AtomicLong從0開始累計數字生成唯一ID ;
2)將處理結果的回調對象callback, 存放到全局ConcurrentHashMap里面put(requestID, callback) ;
3)當線程調用channel.writeAndFlush()發送消息后,緊接著執行callback的get()方法試圖獲取遠程返回的結果。 在get()內部,則使用synchronized獲取回調對象callback的鎖,再先檢測是否已經獲取到結果,如果沒有,然后調用callback的wait()方法,釋放callback上的鎖,讓當前線程處于等待狀態 。
4)服務端接收到請求并處理后, 將response結果(此結果中包含了前面的requestID)發送給客戶端,客戶端socket連接上專門監聽消息的線程收到消息,分析結果,取到requestID,再從前面的ConcurrentHashMap里面get(requestID),從而找到callback對象,再用synchronized獲取callback上的鎖,將方法調用結果設置到callback對象里,再調用callback.notifyAll()喚醒前面處于等待狀態的線程 。
public Object get() {
synchronized (this) { // 旋鎖
while (!isDone) { // 是否有結果了
wait(); // 沒結果是釋放鎖,讓當前線程處于等待狀態
}
}
}
private void setDone(Response res) {
this.res = res;
isDone = true;
synchronized (this) { // 獲取鎖,因為前面wait()已經釋放了callback的鎖了
notifyAll(); // 喚醒處于等待的線程
}
} </code></pre>
如何發布自己的服務
如何讓別人使用我們的服務呢?有同學說很簡單嘛,告訴使用者服務的IP以及端口就可以了啊。確實是這樣,這里問題的關鍵在于是自動告知還是人肉告知。
人肉告知的方式:如果你發現你的服務一臺機器不夠,要再添加一臺,這個時候就要告訴調用者我現在有兩個ip了,你們要輪詢調用來實現負載均衡;調用者咬咬牙改了,結果某天一臺機器掛了,調用者發現服務有一半不可用,他又只能手動修改代碼來刪除掛掉那臺機器的ip。現實生產環境當然不會使用人肉方式。
有沒有一種方法能實現自動告知,即機器的增添、剔除對調用方透明,調用者不再需要寫死服務提供方地址? 當然可以,現如今zookeeper被廣泛用于實現服務自動注冊與發現功能!
簡單來講, zookeeper可以充當一個服務注冊表(Service Registry),讓多個服務提供者形成一個集群,讓服務消費者通過服務注冊表獲取具體的服務訪問地址(ip+端口)去訪問具體的服務提供者 。如下圖所示:

具體來說, zookeeper就是個分布式文件系統,每當一個服務提供者部署后都要將自己的服務注冊到zookeeper的某一路徑上 : /{service}/{version}/{ip:port}, 比如我們的HelloWorldService部署到兩臺機器,那么zookeeper上就會創建兩條目錄:分別為/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888。
zookeeper提供了“心跳檢測”功能,它會定時向各個服務提供者發送一個請求(實際上建立的是一個 socket 長連接),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,并將其剔除 ,比如100.19.20.02這臺機器如果宕機了,那么zookeeper上的路徑就會只剩/HelloWorldService/1.0.0/100.19.20.01:16888。
服務消費者會去監聽相應路徑(/HelloWorldService/1.0.0),一旦路徑上的數據有任務變化(增加或減少),zookeeper都會通知服務消費方服務提供者地址列表已經發生改變,從而進行更新 。
更為重要的是zookeeper 與生俱來的容錯容災能力(比如leader選舉),可以確保服務注冊表的高可用性。
分布式RPC流程圖


來自:http://www.tuicool.com//articles/bUnMZze