百度開源高性能RPC框架 sofa-pbrpc
簡介
sofa-pbrpc 是基于Google Protocol Buffers 實現的RPC網絡通信庫,在百度公司各部門得到廣泛使用,每天支撐上億次內部調用。sofa-pbrpc基于百度大搜索高并發高負載的業務場景不斷打磨,成為一套簡單易用的輕量級高性能RPC框架。2014年sofa-pbrpc正式對外開源受到廣大開發人員的關注,目前sofa-pbrpc已經在浪潮、金山、樂視等各大互聯網公司產品中使用。
目標
-
輕量
-
易用
-
高性能
特性
-
接口簡單,容易使用
-
實現高效,性能優異(高吞吐、低延遲、高并發連接數)
-
測試完善,運行穩定
-
支持同步和異步調用,滿足不同類型需求
-
支持多級超時設定,靈活控制請求超時時間
-
支持精準的網絡流量控制,對應用層透明
-
支持透明壓縮傳輸,節省帶寬
-
提供服務和方法級別的服務調用統計信息,方便監控
-
支持自動建立連接和自動重連,用戶無需感知連接
-
遠程地址相同的Client Stub共享一個連接通道,節省資源
-
空閑連接自動關閉,及時釋放資源
-
支持Mock測試
-
支持多Server負載均衡與容錯
-
原生支持HTTP協議訪問
-
提供內建的Web監控頁面
-
提供Python客戶端庫
-
支持webservice,用戶快速定義web server處理邏輯
-
支持profiling,實時查看程序的資源消耗,方便問題追查
快速使用
使用sofa-pbrpc只需要三步:
-
定義通訊協議
-
實現Server
-
實現Client
樣例代碼參見“sample/echo”。
定義通訊協議
定義協議只需要編寫一個proto文件即可。 范例:echo_service.proto
c++
package sofa.pbrpc.test;
option cc_generic_services = true;
message EchoRequest {
required string message = 1;
}
message EchoResponse {
required string message = 1;
}
service EchoServer {
rpc Echo(EchoRequest) returns(EchoResponse);
}
使用protoc編譯'echo_service.proto',生成接口文件'echo_service.pb.h'和'echo_service.pb.cc'。
注意:
-
package會被映射到C++中的namespace,為了避免沖突建議使用package;
-
需要設置“cc_generic_services”,以通知protoc工具生成RPC框架代碼;
-
這里EchoRequest和EchoResponse的成員完全相同,在實際應用中可以設置不同的成員;
實現Server
頭文件
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc頭文件
include "echo_service.pb.h" // service接口定義頭文件</code></pre>
實現服務
Impl() {}
private:
virtual void Echo(google::protobuf::RpcController* controller,
const sofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
google::protobuf::Closure* done)
{
sofa::pbrpc::RpcController* cntl =
static_cast<sofa::pbrpc::RpcController*>(controller);
SLOG(NOTICE, "Echo(): request message from %s: %s",
cntl->RemoteAddress().c_str(), request->message().c_str());
response->set_message("echo message: " + request->message());
done->Run();
}
};
注意:
-
服務完成后必須調用done->Run(),通知RPC系統服務完成,觸發發送Response;
-
在調了done->Run()之后,Echo的所有四個參數都不再能訪問; done-Run()可以分派到其他線程中執行,以實現了真正的異步處理;
注冊和啟動服務
int main()
{
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcServerOptions options;
options.work_thread_num = 8;
sofa::pbrpc::RpcServer rpc_server(options);
if (!rpc_server.Start("0.0.0.0:12321")) {
SLOG(ERROR, "start server failed");
return EXIT_FAILURE;
}
sofa::pbrpc::test::EchoServer* echo_service = new EchoServerImpl();
if (!rpc_server.RegisterService(echo_service)) {
SLOG(ERROR, "register service failed");
return EXIT_FAILURE;
}
rpc_server.Run();
rpc_server.Stop();
return EXIT_SUCCESS;
}
實現Client
Client支持同步和異步兩種調用方式:
-
同步調用時,調用線程會被阻塞,直到收到回復或者超時;
-
異步調用時,調用線程不會被阻塞,收到回復或者超時會調用用戶提供的回調函數;
頭文件
#include <sofa/pbrpc/pbrpc.h> // sofa-pbrpc頭文件
#include "echo_service.pb.h" // service接口定義頭文件
同步調用
int main()
{
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcClientOptions client_options;
client_options.work_thread_num = 8;
sofa::pbrpc::RpcClient rpc_client(client_options);
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
sofa::pbrpc::test::EchoRequest request;
request.set_message("Hello world!");
sofa::pbrpc::test::EchoResponse response;
sofa::pbrpc::RpcController controller;
controller.SetTimeout(3000);
stub.Echo(&controller, &request, &response, NULL);
if (controller.Failed()) {
SLOG(ERROR, "request failed: %s", controller.ErrorText().c_str());
}
return EXIT_SUCCESS;
}
異步調用
{
SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
}
else {
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
}
delete cntl;
delete request;
delete response;
*callbacked = true;
}
int main()
{
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);
sofa::pbrpc::RpcClientOptions client_options;
sofa::pbrpc::RpcClient rpc_client(client_options);
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321");
sofa::pbrpc::test::EchoServer_Stub stub(&rpc_channel);
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
bool callbacked = false;
google::protobuf::Closure* done = sofa::pbrpc::NewClosure(
&EchoCallback, cntl, request, response, &callbacked);
stub.Echo(cntl, request, response, done);
while (!callbacked) {
usleep(100000);
}
return EXIT_SUCCESS;
}
注意:
-
異步調用傳入的controller、request、response參數,在回調函數執行之前需一直保持有效;
-
回調函數的執行會分配到專門的回調線程中運行,可以通過設置RpcClientOptions的callback thread num來配置回調線程數;
實現
系統結構

-
RpcClientStream/RpcServerStream:代表client和server之間的連接,用于client和server的網絡通信。
-
ThreadGroup:client和server內部線程池,用于io操作和執行回調。
-
TimeoutManager:采用訂閱者模型,對rpc請求進行超時管理。
-
RpCListenser:接受來自client的連接請求,創建與client之間的連接。
-
ServicePool:server端服務管理與路由。

整個RPC調用經過以下階段:
-
Stub調用RPC函數發起RPC請求.
-
RpcChannel調用CallMethod執行RPC調用。
-
RpcClient選取RpcClientStream異步發送請求,并添加至超時隊列。
-
server端RpcListener接收到client的請求,創建對應RpcServerStream。
-
RpcServerStream接收數據,根據meta信息在ServerPool中選取對應Service.Method執行。
-
server通過RpcServerStream發送執行結果,回復過程與請求過程類似。
技術特點
協議棧方式的網絡模型

在sofa-pbrpc中網絡數據自上而下流劃分為RpcClientStream/RpcServerStream、RpcMessageStream、RpcByteStream三層。消息流層主要負責網絡通信相關的操作,操作對象為序列化之后的二機制字節流;消息流層處理的對象是由header、meta和data組裝的消息,負責消息級別的控制與統計;協議層負責異步發送接受請求和響應數據。三層結構每一層是下一層的封裝和擴展,采用這樣協議棧方式的層次劃分更加有利于數據協議的擴展。
ZeroCopy方式管理緩沖區。


sofa-pbrpc將內存劃分為固定大小的buffer作為緩沖區,對buffer采用引用計數進行管理,減少不必要的內存拷貝。
支持HTTP協議
除了使用原生client訪問server外,sofa-pbrpc也支持使用http協議訪問server上的服務。同時,用戶可以通過使用server端的WebService工具類,快速實現server的對于http請求的處理邏輯。
支持json格式數據傳輸
sofa-pbrpc支持用戶使用http客戶端向server發送json格式的數據請求,并返回json格式的響應。
提供豐富的工具類
sofa-pbrpc提供常用工具類給開發者,包括:

性能
測試環境
- cpu 16core
- memory 64G
- kernel 2.6.32_1-15-0-0
吞吐

延遲

來自:https://my.oschina.net/u/1050511/blog/751841