源碼解析Mxnet Dependency Engine
Var
var可以看做是一個tag,用來標示每一個對象的,這樣Op對對象的依賴可以簡化成對var的 依賴,這樣就可以構建出一個不依賴于具體的對象的通用的依賴引擎。Var是依賴引擎的關鍵。
類圖
聲明:下文說到執行時,意思是Op的當前var的依賴已經就緒,因為一個op可以依賴多個 var,如果其他的Var沒有就緒,那么這時op可能并沒有實際運行
Var只是一個基類,用來統一類型系統的,主要的工作在 ThreadedVar 中,每一個對象都 會有一個由 VersionedVarBlock 所組成的鏈表,這個鏈表就是一個FIFO隊列。 head_ 指向的是隊列的尾部, 實際是一個哨兵(空對象), head_ 這個命名有誤導性, pending_write_ 指向的是最"老"的寫依賴,如果沒有寫依賴,那么就指向 nullptr , 根據依賴引擎的特點,它實際上指向的是隊列的頭部, ThreadedVar 的那四個方法就是 來操作這個隊列的。
- num_pending_reads_: 代表當前正在執行(還沒有執行完)的讀依賴的個數
- pending_write_: 代表隊列中最“老”的寫依賴, 它一直指向隊列的頭部。
- head_: 隊列的尾部。
需要注意的是,正在執行的讀依賴是不在隊列中的,但是正在執行的寫依賴是在隊列中的。
理解Var的隊列
var的隊列是依賴引擎的核心,下面我們來分析下各種情況下,如何修改隊列的狀態。
- 添加讀依賴: 如果前面沒有寫依賴,那么直接運行, 否則就插入隊列的尾部(head_那一端)
- 添加寫依賴: 直接將依賴插入隊列的尾部,并檢查是不是寫就緒(既沒有讀依賴也沒有 寫依賴在運行),如果是寫就緒,那么就運行該依賴。
- 讀依賴完成
- 寫依賴完成
上圖中w1寫依賴正在執行。
寫依賴w1完成將自己移出隊列,并執行寫依賴w2
寫依賴w2完成后將自己移出隊列,接著并行的執行讀依賴r1,r2,記住正在執行的讀依賴是被移出隊列的, 它們的數目使用 num_pending_reads_ 跟蹤的
每一個讀依賴完成都會將 num_pending_reads_ 減一,如果減為了0,那么就意味著所有 的讀依賴都完成了,當r1,r2都完成后,接著執行w3寫依賴。
添加讀依賴
代碼主要在 src/engine/Threaded_engine.cc 的 AppendReadDependency 中。
inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) { std::lock_guard<std::mutex> lock{m_}; if (pending_write_ == nullptr) { // invariant: is_ready_to_read() CHECK_GE(num_pending_reads_, 0); // STATE CHANGE ++num_pending_reads_; // decrease wait counter opr_block->decr_wait(); } else { auto&& new_var_block = VersionedVarBlock::New(); assert(head_->next == nullptr); assert(head_->trigger == nullptr); assert(head_->write == false); // append things to next. head_->next = new_var_block; head_->trigger = opr_block; head_ = new_var_block; } }
代碼的基本思路是這樣的:檢查隊列中有沒有寫依賴,這分兩種情況:
- 如果沒有寫依賴,那么意味著,目前該Var沒有依賴在執行,或者說只有讀依賴在執行, 所以這個新的讀依賴可以直接執行,那么它沒有必要添加到隊列中,只需要更新 num_pending_reads_ 就好,當然因為該op可能還依賴別的var,所以你只能調用 decr_wait ,只有當wait減為0的時候,才能開始運行。這部分代碼在engine的push中。
- 如果有寫依賴,那么讀依賴必須在寫依賴的后面執行,所以需要把讀依賴添加到隊列的 尾部。記住 head_ 永遠指向一個空的哨兵對象。
添加寫依賴
代碼主要在 src/engine/Threaded_engine.cc 的 AppendWriteDependency 中。
inline void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) { auto&& new_var_block = VersionedVarBlock::New(); std::lock_guard<std::mutex> lock{m_}; // invariant. assert(head_->next == nullptr); assert(head_->trigger == nullptr); assert(head_->write == false); // attach to head. head_->next = new_var_block; head_->trigger = opr_block; head_->write = true; // check if it is ready to write if (pending_write_ == nullptr) { // invariant: is_ready_to_read() pending_write_ = head_; CHECK_GE(num_pending_reads_, 0); if (num_pending_reads_ == 0) { // STATE CHANGE opr_block->decr_wait(); num_pending_reads_ = kWriteTriggered; } } else { CHECK_NE(num_pending_reads_, 0); } head_ = new_var_block; }
代碼的基本思路是這樣的: 將該Op放入隊列的尾部,接著檢查該Op的依賴有沒有就緒,這 要檢查Var有沒有寫依賴(pending_read_==nullptr)和讀依賴(num_pending_read_==0)的Op 正在執行,只有二者都沒有時,才能開始運行,當然你依然要檢查該Op對其他的Var的依賴 有沒有就緒。需要注意的一點是,即便Op的Var寫依賴就緒,該Op也不會從隊列中移除,只 有該Op執行完成后才會被移除,這在CompleteWriteDependency中實現。
讀依賴完成
代碼主要在 src/engine/Threaded_engine.cc 的 CompleteReadDependency 中。
template <typename Dispatcher> inline void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) { OprBlock *trigger = nullptr; { // this is lock scope std::lock_guard<std::mutex> lock{m_}; CHECK_GT(num_pending_reads_, 0); if (--num_pending_reads_ == 0) { if (pending_write_ != nullptr) { // STATE CHANGE trigger = pending_write_->trigger; num_pending_reads_ = kWriteTriggered; } } } if (trigger != nullptr && trigger->decr_wait() == 0) { dispatcher(trigger); } }
該部分代碼會在一個op運算完成后調用,代碼邏輯是比較簡單的,先更新 num_pending_read_ , 更新后如果該值為0,那么就意味著,所有的讀依賴都已經執行完成, 這樣就檢查隊列,若是存在寫依賴,那么該寫依賴就就緒了,那么Op就可以執行了(前提是 依賴的其他var也都就緒了, wait為0)。上面的dispatcher實際就是用來將Op丟入執行引擎 的,它一般是PushToExecute,這個后文會看到。
寫依賴完成
代碼主要在 src/engine/Threaded_engine.cc 的 CompleteWriteDependency 中。
template <typename Dispatcher> inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { // this is lock scope VersionedVarBlock *old_pending_write, *end_of_read_chain; OprBlock* trigger_write = nullptr; { std::lock_guard<std::mutex> lock{m_}; // invariants assert(head_->next == nullptr); assert(pending_write_ != nullptr); CHECK_EQ(num_pending_reads_, kWriteTriggered); // really delete if (to_delete_) { VersionedVarBlock *head = pending_write_->next; VersionedVarBlock::Delete(pending_write_); assert(head_ == head); VersionedVarBlock::Delete(head); return true; } // detach pending write old_pending_write = pending_write_; // search for chains to trigger end_of_read_chain = old_pending_write->next; // reset to 0 pending reads num_pending_reads_ = 0; while (end_of_read_chain != head_ && end_of_read_chain->write == false) { ++num_pending_reads_; end_of_read_chain = end_of_read_chain->next; } if (end_of_read_chain == head_) { pending_write_ = nullptr; } else { // check if there is pending reads, if not trigger write assert(end_of_read_chain->write == true); pending_write_ = end_of_read_chain; if (num_pending_reads_ == 0) { // mark write as already actived in this var num_pending_reads_ = kWriteTriggered; trigger_write = end_of_read_chain->trigger; } } } // This is outside of lock scope // Be very carful, pending_write_ and num_pending_reads_ // can change now, do not reply ont the two variables. // The linked list \in [old_pending_write, end_of_read_chain) // is already detached from this Var. // So it is safe to modify these VersionedVarBlock *cur_head = old_pending_write->next; VersionedVarBlock::Delete(old_pending_write); // dispatch all the events while (cur_head != end_of_read_chain) { if (cur_head->trigger->decr_wait() == 0) { dispatcher(cur_head->trigger); } auto prev = cur_head; cur_head = cur_head->next; assert(cur_head != nullptr); VersionedVarBlock::Delete(prev); } if (trigger_write != nullptr && trigger_write->decr_wait() == 0) { dispatcher(trigger_write); } return false; }
和讀依賴完成類似,只是寫依賴的后面可能跟著多個讀依賴,所以需要遍歷鏈表直到發現下 一個寫依賴,找到之后如果是讀依賴,那么直接并行的運行,如果是寫依賴直接運行就好。
來自: http://yuyang0.github.io/notes/mxnet-engine.html