非死book 的 C++ 11 組件庫 Folly Futures
非死book 的 C++ 11 組件庫 Folly Futures
Futures 是一種通過自然的、可組合的方式表達異步計算的模式。這篇博文介紹了我們在 非死book 中使用的一種適用于 C++11 的 futures 實現:Folly Futures。
為什么要使用異步?
想象一個服務 A 正在與服務 B 交互的場景。如果 A 被鎖定到 B 回復后才能繼續進行其他操作,則 A 是同步的。此時 A 所在的線程是空閑的,它不能為其他的請求提供服務。線程會變得非常笨重-切換線程是低效的,因為這需要耗費可觀的內存,如果你進行了大量這樣的操作,操作系統會因此陷入困境。這樣做的結果就是白白浪費了資源,降低了生產力,增加了等待時間(因為請求都在隊列中等待服務)。
如果將服務 A 做成異步的,會變得更有效率,這意味著當 B 在忙著運算的時候,A 可以轉進去處理其他請求。當 B 計算完畢得出結果后,A 獲取這個結果并結束請求。
同步代碼與異步代碼的比較
讓我們考慮一個函數 fooSync,這個函數使用完全同步的方式完成基本計算 foo,同時用另一個函數 fooAsync 異步地在做同樣的工作。fooAsync 需要提供一個輸入和一個能在結果可用時調用的回調函數。
template <typename T> using Callback = std::function<void(T)>; Output fooSync(Input); void fooAsync(Input, Callback<Output>);
這是一種傳統的異步計算表達方式。(老版本的 C/C++ 異步庫會提供一個函數指針和一個 void* 類型的上下文參數,但現在 C++11 支持隱蔽功能,已經不再需要顯式提供上下文參數)
傳統的異步代碼比同步代碼更為有效,但它的可讀性不高。對比同一個函數的同步和異步版本,它們都實現了一個 multiFoo 運算,這個運算為輸入向量(vector)中的每一個元素執行 foo 操作:
using std::vector;vector<Output> multiFooSync(vector<Input> inputs) { vector<Output> outputs; for (auto input : inputs) { outputs.push_back(fooSync(input)); } return outputs;}
void multiFooAsync( vector<Input> inputs, Callback<vector<Output>> callback){ struct Context { vector<Output> outputs; std::mutex lock; size_t remaining; }; auto context = std::make_shared<Context>(); context->remaining = inputs.size(); for (auto input : inputs) { fooAsync( input, [=](Output output) { std::lock_guard<std::mutex> guard(context->lock); context->outputs->push_back(output); if (--context->remaining == 0) { callback(context->outputs); } }); }}
異步的版本要復雜得多。它需要關注很多方面,如設置一個共享的上下文對象、線程的安全性以及簿記工作,因此它必須要指定全部的計算在什么時候完成。更糟糕的是(盡管在這個例子中體現得并不明顯)這使得代碼執行的次序關系(computation graph)變得復雜,跟蹤執行路徑變得極為困難。程序員需要對整個服務的狀態機和這個狀態機接收不同輸入時的不同行為建立一套思維模式,并且當代碼中的某一處不能體現流程時要找到應該去檢查的地方。這種狀況也被親切地稱為“回調地獄”。
Futures
Future 是一個用來表示異步計算結果(未必可用)的對象。當計算完成,future 會持有一個值或是一個異常。例如:
#include <folly/futures/Future.h> using folly::Future; // Do foo asynchronously; immediately return a Future for the output Future<Output> fooFuture(Input); Future<Output> f = fooFuture(input); // f may not have a value (or exception) yet. But eventually it will. f.isReady(); // Maybe, maybe not. f.wait(); // You can synchronously wait for futures to become ready. f.isReady(); // Now this is guaranteed to be true. Output o = f.value(); // If f holds an exception, this will throw that exception.
到目前為止,我們還沒有做任何 std::future 不能做的事情。但是 future 模式中的一個強有力的方面就是可以做到連鎖回調,std::future 目前尚不支持此功能。我們通過方法 Future::then 來表達這個功能:
Future<double> f = fooFuture(input) .then([](Output o) { return o * M_PI; }) .onError([](std::exception const& e) { cerr << "Oh bother, " << e.what() << ". Returning pi instead." << endl; return M_PI; });// get() first waits, and then returns the valuecout << "Result: " << f.get() << endl;
在這里我們像使用 onError 一樣使用連接起來的 then 去接住可能引發的任何異常。可以將 future 連接起來是一個重要的能力,它允許我們編寫串行和并行的計算,并將它們表達在同一個地方,并為之提供明晰的錯誤處理。
串行功能組成
如果你想要按順序異步計算 a、b、c 和 d,使用傳統的回調方式編程就會陷入“回調地獄”- 或者,你使用的語言具備一流的匿名函數(如 C++11),結果可能是“回調金字塔”:
// the callback pyramid is syntactically annoying void asyncA(Output, Callback<OutputA>); void asyncB(OutputA, Callback<OutputB>); void asyncC(OutputB, Callback<OutputC>); void asyncD(OutputC, Callback<OutputD>); auto result = std::make_shared<double>(); fooAsync(input, [=](Output output) { // ... asyncA(output, [=](OutputA outputA) { // ... asyncB(outputA, [=](OutputB outputB) { // ... asyncC(outputB, [=](OutputC outputC) { // ... asyncD(outputC, [=](OutputD outputD) { *result = outputD * M_PI; }); }); }); }); }); // As an exercise for the masochistic reader, express the same thing without // lambdas. The result is called callback hell.
有了 futures,順序地使用then組合它們,代碼就會變得干凈整潔:
Future<OutputA> futureA(Output); Future<OutputB> futureB(OutputA); Future<OutputC> futureC(OutputB); // then() automatically lifts values (and exceptions) into a Future. OutputD d(OutputC) { if (somethingExceptional) throw anException; return OutputD();}Future<double> fut = fooFuture(input) .then(futureA) .then(futureB) .then(futureC) .then(d) .then([](OutputD outputD) { // lambdas are ok too return outputD * M_PI; });
并行功能組成
再回到我們那個 multiFoo 的例子。下面是它在 future 中的樣子:
using folly::futures::collect; Future<vector<Output>> multiFooFuture(vector<Input> inputs) { vector<Future<Output>> futures; for (auto input : inputs) { futures.push_back(fooFuture(input)); } return collect(futures);}
collect 是一種我們提供的構建塊(compositional building block),它以 Future<T> 為輸入并返回一個 Future<vector<T>>,這會在所有的 futures 完成后完成。(collect 的實現依賴于-你猜得到-then)有很多其他的構建塊,包括:collectAny、collectN、map 和 reduce。
請注意這個代碼為什么會看上去與同步版本的 multiFooSync 非常相似,我們不需要擔心上下文或線程安全的問題。這些問題都由框架解決,它們對我們而言是透明的。
執行上下文
其他一些語言里的 futures 框架提供了一個線程池用于執行回調函數,你除了要知道上下文在另外一個線程中執行,不需要關注任何多余的細節。但是 C++ 的開發者們傾向于編寫 C++ 代碼,因為他們需要控制底層細節來實現性能優化,非死book 也不例外。因此我們使用簡單的 Executor接口提供了一個靈活的機制來明確控制回調上下文的執行:
struct Executor { using Func = std::function<void()>; virtual void add(Func) = 0;};
你可以向 then 函數傳入一個 executor 來命令它的回調會通過 executor 執行。
a(input).then(executor, b);
在這段代碼中,b 將會通過 executor 執行,b 可能是一個特定的線程、一個線程池、或是一些更有趣的東西。本方法的一個常見的用例是將 CPU 從 I/O 線程中解放出來,以避免隊列中其他請求的排隊時間。
Futures 意味著你再也不用忘記說對不起
傳統的回調代碼有一個普遍的問題,即不易對錯誤或異常情況的調用進行跟蹤。程序員在檢查錯誤和采取恰當措施上必須做到嚴于律己(即使是超人也要這樣),更不要說當一場被意外拋出的情況了。Futures 使用包含一個值和一個異常的方式來解決這個問題,這些異常就像你希望的那樣與 futures融合在了一起,除非它留在 future 單元里直到被 onErorr 接住,或是被同步地,例如,賦值或取值。這使得我們很難(但不是不可能)跟丟一個應該被接住的錯誤。
使用 Promise
我們已經大致看過了 futures 的使用方法,下面來說說我們該如何制作它們。如果你需要將一個值傳入到 Future,使用 makeFuture:
using folly::makeFuture; std::runtime_error greatScott("Great Scott!"); Future<double> future = makeFuture(1.21e9); Future<double> future = makeFuture<double>(greatScott);
但如果你要包裝一個異步操作,你需要使用 Promise:
using folly::Promise; Promise<double> promise; Future<double> future = promise.getFuture();
當你準備好為 promise 賦值的時候,使用 setValue、setException 或是 setWith:
promise.setValue(1.21e9); promise.setException(greatScott); promise.setWith([]{ if (year == 1955 || year == 1885) throw greatScott; return 1.21e9; });
總之,我們通過生成另一個線程,將一個長期運行的同步操作轉換為異步操作,如下面代碼所示:
double getEnergySync(int year) { auto reactor = ReactorFactory::getReactor(year); if (!reactor) // It must be 1955 or 1885 throw greatScott; return reactor->getGigawatts(1.21); } Future<double> getEnergy(int year) { auto promise = make_shared<Promise<double>>(); std::thread([=]{ promise->setWith(std::bind(getEnergySync, year)); }).detach(); return promise->getFuture(); }
通常你不需要 promise,即使乍一看這像是你做的。舉例來說,如果你的線程池中已經有了一個 executor 或是可以很輕易地獲取它,那么這樣做會更簡單:
Future<double> future = folly::via(executor, std::bind(getEnergySync, year));
用例學習
我們提供了兩個案例來解釋如何在 非死book 和 Instagram 中使用 future 來改善延遲、魯棒性與代碼的可讀性。
Instagram 使用 futures 將他們推薦服務的基礎結構由同步轉換為異步,以此改善他們的系統。其結果是尾延遲(tail latency)得以顯著下降,并僅用十分之一不到的服務器就實現了相同的吞吐量。他們把這些改動及相關改動帶來的益處進行了記錄,更多細節可以參考他們的博客。
下一個案例是一個真正的服務,它是 非死book 新聞遞送(News Feed)的一個組成部分。這個服務有一個兩階段的葉聚合模式(leaf-aggregate pattern),請求(request)會被分解成多個葉請求將碎片分配到不同的葉服務器,我們在做同樣的事情,但根據第一次聚合的結果分配的碎片會變得不同。最終,我們獲取兩組結果集并將它們簡化為一個單一的響應(response)。
下面是相關代碼的簡化版本:
Future<vector<LeafResponse>> fanout( const map<Leaf, LeafReq>& leafToReqMap, chrono::milliseconds timeout) { vector<Future<LeafResponse>> leafFutures; for (const auto& kv : leafToReqMap) { const auto& leaf = kv.first; const auto& leafReq = kv.second; leafFutures.push_back( // Get the client for this leaf and do the async RPC getClient(leaf)->futureLeafRPC(leafReq) // If the request times out, use an empty response and move on. .onTimeout(timeout, [=] { return LeafResponse(); }) // If there's an error (e.g. RPC exception), // use an empty response and move on. .onError([=](const exception& e) { return LeafResponse(); })); } // Collect all the individual leaf requests into one Future return collect(leafFutures); } // Some sharding function; possibly dependent on previous responses. map<Leaf, LeafReq> buildLeafToReqMap( const Request& request, const vector<LeafResponse>& responses); // This function assembles our final response. Response assembleResponse( const Request& request, const vector<LeafResponse>& firstFanoutResponses, const vector<LeafResponse>& secondFanoutResponses); Future<Response> twoStageFanout(shared_ptr<Request> request) { // Stage 1: first fanout return fanout(buildLeafToReqMap(*request, {}), FIRST_FANOUT_TIMEOUT_MS) // Stage 2: With the first fanout completed, initiate the second fanout. .then([=](vector<LeafResponse>& responses) { auto firstFanoutResponses = std::make_shared<vector<LeafResponse>>(std::move(responses)); // This time, sharding is dependent on the first fanout. return fanout(buildLeafToReqMap(*request, *firstFanoutResponses), SECOND_FANOUT_TIMEOUT_MS) // Stage 3: Assemble and return the final response. .then([=](const vector<LeafResponse>& secondFanoutResponses) { return assembleResponse(*request, *firstFanoutResponses, secondFanoutResponses); }); }); }
該服務的歷史版本中曾使用只允許整體超時的異步框架,同時使用了傳統的“回調地獄”模式。是 Futures 讓這個服務自然地表達了異步計算,并使用有粒度的超時以便在某些部分運行過慢時采取更積極的行動。其結果是,服務的平均延遲減少了三分之二,尾延遲減少到原來的十分之一,總體超時錯誤明顯減少。代碼變得更加易讀和推理,作為結果,代碼還變得更易維護。
當開發人員擁有了幫助他們更好理解和表達異步操作的工具時,他們可以寫出更易于維護的低延遲服務。
結論
Folly Futures 為 C++11 帶來了健壯的、強大的、高性能的 futures。我們希望你會喜歡上它(就像我們一樣)。如果你想了解更多信息,可以查閱相關文檔、文檔塊以及 GitHub 上的代碼。
致謝
Folly Futures 制作團隊的成員包括 Hans Fugal,Dave Watson,James Sedgwick,Hannes Roth 和 Blake Mantheny,還有許多其他志同道合的貢獻者。我們要感謝 推ter,特別是 Marius,他在 非死book 關于 Finagle 和 Futures 的技術講座,激發了這個項目的創作靈感。
本文地址:http://www.oschina.net/translate/futures-for-c-11-at-非死book
原文地址:https://code.非死book.com/posts/1661982097368498/futures-for-c-11-at-非死book/