無鎖隊列的實現
關于無鎖隊列的實現,網上有很多文章,雖然本文可能和那些文章有所重復,但是我還是想以我自己的方式把這些文章中的重要的知識點串起來和大家講一講這個技術。下面開始正文。
關于 CAS 等原子操作
在開始說無鎖隊列之前,我們需要知道一個很重要的技術就是 CAS 操作——Compare & Set,或是 Compare & Swap,現在幾乎所有的 CPU 指令都支持 CAS 的原子操作,X86下對應的是 CMPXCHG 匯編指令。有了這個原子操作,我們就可以用其來實現各種無鎖(lock free)的數據結構。
這個操作用C語言來描述就是下面這個樣子:(代碼來自 Wikipedia 的 Compare And Swap 詞條)意思就是說,看一看內存*reg 里的值是不是 oldval,如果是的話,則對其賦值 newval。
int compare_and_swap (int* reg, int oldval, int newval) { int old_reg_val = *reg; if (old_reg_val == oldval) *reg = newval; return old_reg_val; }
這個操作可以變種為返回 bool 值的形式(返回 bool 值的好處在于,可以調用者知道有沒有更新成功):
bool compare_and_swap (int *accum, int *dest, int newval) { if ( *accum == *dest ) { *dest = newval; return true; } return false; }
與 CAS 相似的還有下面的原子操作:(這些東西大家自己看 Wikipedia 吧)
- Fetch And Add,一般用來對變量做 +1 的原子操作
- Test-and-set,寫值到某個內存位置并傳回其舊值。匯編指令 BST
- Test and Test-and-set,用來低低 Test-and-Set 的資源爭奪情況
注:在實際的C/C++程序中,CAS 的各種實現版本如下:
1)GCC 的 CAS
GCC4.1+ 版本中支持 CAS 的原子操作(完整的原子操作可參看 GCC Atomic Builtins)
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2)Windows 的 CAS
在 Windows 下,你可以使用下面的 Windows API 來完成 CAS:(完整的 Windows 原子操作可參看 MSDN 的 InterLocked Functions)
InterlockedCompareExchange ( __inout LONG volatile *Target,
__in LONG Exchange,
__in LONG Comperand);
3) C++11 中的 CAS
C++11中的 STL 中的 atomic 類的函數可以讓你跨平臺。(完整的C++11的原子操作可參看 Atomic Operation Library)
template< class T >bool atomic_compare_exchange_weak ( std::atomic* obj, T* expected, T desired ); template< class T >bool atomic_compare_exchange_weak ( volatile std::atomic * obj, T* expected, T desired );
無鎖隊列的鏈表實現
下面的東西主要來自 John D. Valois 1994 年 10 月在拉斯維加斯的并行和分布系統系統國際大會上的一篇論文——《Implementing Lock-Free Queues》。
我們先來看一下進隊列用 CAS 實現的方式:
EnQueue (x) //進隊列 { //準備新加入的結點數據 q = new record (); q->value = x; q->next = NULL; do { p = tail; //取鏈表尾指針的快照 } while( CAS (p->next, NULL, q) != TRUE); //如果沒有把結點鏈上,再試 CAS (tail, p, q); //置尾結點 }
我們可以看到,程序中的那個 do- while 的 Re-Try-Loo。就是說,很有可能我在準備在隊列尾加入結點時,別的線程已經加成功了,于是 tail 指針就變了,于是我的 CAS 返回了 false,于是程序再試,直到試成功為止。這個很像我們的搶電話熱的不停重播的情況。
你會看到,為什么我們的“置尾結點”的操作不判斷是否成功,因為:
- 如果有一個線程 T1,它的 while 中的 CAS 如果成功的話,那么其它所有的隨后線程的 CAS 都會失敗,然后就會再循環,
- 此時,如果 T1 線程還沒有更新 tail 指針,其它的線程繼續失敗,因為 tail->next 不是 NULL 了。
- 直到 T1 線程更新完 tail 指針,于是其它的線程中的某個線程就可以得到新的 tail 指針,繼續往下走了。
這里有一個潛在的問題——如果 T1 線程在用 CAS 更新 tail 指針的之前,線程停掉了,那么其它線程就進入死循環了。下面是改良版的 EnQueue ()
EnQueue (x) //進隊列改良版 { q = new record (); q->value = x; q->next = NULL; p = tail; oldp = p do { while (p->next != NULL) p = p->next; } while( CAS (p.next, NULL, q) != TRUE); //如果沒有把結點鏈上,再試 CAS (tail, oldp, q); //置尾結點 }
我們讓每個線程,自己 fetch 指針 p 到鏈表尾。但是這樣的 fetch 會很影響性能。而通實際情況看下來,99.9% 的情況不會有線程停轉的情況,所以,更好的做法是,你可以接合上述的這兩個版本,如果 retry 的次數超了一個值的話(比如說 3 次),那么,就自己 fetch 指針。
好了,我們解決了 EnQueue,我們再來看看 DeQueue 的代碼:(很簡單,我就不解釋了)
DeQueue () //出隊列 { do{ p = head; if (p->next == NULL){ return ERR_EMPTY_QUEUE; } while( CAS (head, p, p->next); return p->next->value; }
我們可以看到,DeQueue 的代碼操作的是 head->next,而不是 head 本身。這樣考慮是因為一個邊界條件,我們需要一個 dummy 的頭指針來解決鏈表中如果只有一個元素,head 和 tail 都指向同一個結點的問題,這樣 EnQueue 和 DeQueue 要互相排斥了。
注:上圖的 tail 正處于更新之前的裝態。
DAS 的 ABA 問題
所謂 ABA(見維基百科的 ABA 詞條),問題基本是這個樣子:
- 進程 P1 在共享變量中讀到值為A
- P1被搶占了,進程 P2 執行
- P2把共享變量里的值從A改成了B,再改回到A,此時被 P1 搶占。
- P1回來看到共享變量里的值沒有被改變,于是繼續執行。
雖然 P1 以為變量值沒有改變,繼續執行了,但是這個會引發一些潛在的問題。ABA 問題最容易發生在 lock free 的算法中的,DAS 首當其沖,因為 DAS 判斷的是指針的地址。如果這個地址被重用了呢,問題就很大了。
比如上述的 DeQueue ()函數,因為我們要讓 head 和 tail 分開,所以我們引入了一個 dummy 指針給 head,當我們做 CAS 的之前,如果 head 的那塊內存被回收并被重用了,而重用的內存又被 EnQueue ()進來了,這會有很大的問題。(內存管理中重用內存基本上是一種很常見的行為)
這個例子你可能沒有看懂,維基百科上給了一個活生生的例子——
你拿著一個裝滿錢的手提箱在飛機場,此時過來了一個火辣性感的美女,然后她很暖昧地挑逗著你,并趁你不注意的時候,把用一個一模一樣的手提箱和你那裝滿錢的箱子調了個包,然后就離開了,你看到你的手提箱還在那,于是就提著手提箱去趕飛機去了。
這就是 ABA 的問題。
解決 ABA 的問題
維基百科上給了一個解——使用 double-CAS(雙保險的 CAS),例如,在 32 位系統上,我們要檢查 64 位的內容
1)一次用 CAS 檢查雙倍長度的值,前半部是指針,后半部分是一個計數器。
2)只有這兩個都一樣,才算通過檢查,要吧賦新的值。并把計數器累加1。
這樣一來,ABA 發生時,雖然值一樣,但是計數器就不一樣(但是在 32 位的系統上,這個計數器會溢出回來又從 1 開始的,這還是會有 ABA 的問題)
當然,我們這個隊列的問題就是不想讓那個內存重用,這樣明確的業務問題比較好解決,論文《Implementing Lock-Free Queues》給出一這么一個方法——使用結點內存引用計數 refcnt!
SafeRead (q) { loop: p = q->next; if (p == NULL){ return p; } Fetch&Add (p->refcnt, 1); if (p == q->next){ return p; }else{ Release (p); } goto loop }
其中的 Fetch&Add 和 Release 分是是加引用計數和減引用計數,都是原子操作,這樣就可以阻止內存被回收了。
用數組實現無鎖隊列
本實現來自論文《Implementing Lock-Free Queues》
使用數組來實現隊列是很常見的方法,因為沒有內存的分部和釋放,一切都會變得簡單,實現的思路如下:
1)數組隊列應該是一個 ring buffer 形式的數組(環形數組)
2)數組的元素應該有三個可能的值:HEAD,TAIL,EMPTY(當然,還有實際的數據)
3)數組一開始全部初始化成 EMPTY,有兩個相鄰的元素要初始化成 HEAD 和 TAIL,這代表空隊列。
4)EnQueue 操作。假設數據x要入隊列,定位 TAIL 的位置,使用 double-CAS 方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明隊列滿了。
5)DeQueue 操作。定位 HEAD 的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同樣需要注意,如果x是 TAIL,則說明隊列為空。
算法的一個關鍵是——如何定位 HEAD 或 TAIL?
1)我們可以聲明兩個計數器,一個用來計數 EnQueue 的次數,一個用來計數 DeQueue 的次數。
2)這兩個計算器使用使用 Fetch&ADD 來進行原子累加,在 EnQueue 或 DeQueue 完成的時候累加就好了。
3)累加后求個模什么的就可以知道 TAIL 和 HEAD 的位置了。
如下圖所示:
小結
以上基本上就是所有的無鎖隊列的技術細節,這些技術都可以用在其它的無鎖數據結構上。
1)無鎖隊列主要是通過 CAS、FAA 這些原子操作,和 Retry-Loop 實現。
2)對于 Retry-Loop,我個人感覺其實和鎖什么什么兩樣。只是這種“鎖”的粒度變小了,主要是“鎖”HEAD 和 TAIL 這兩個關鍵資源。而不是整個數據結構。
還有一些和 Lock Free 的文章你可以去看看:
- Code Project 上的雄文 《Yet another implementation of a lock-free circular array queue》
- Herb Sutter 的《Writing Lock-Free Code: A Corrected Queue》– 用C++11的 std::atomic 模板。
- IBM developerWorks 的《設計不使用互斥鎖的并發數據結構》 來自: coolshell.cn