Java多線程之自旋鎖與隊列鎖
編寫高效的并發程序,需要對互斥問題重新研究,設計出適用于多線程的互斥協議。那么問題來了,如果不能獲得鎖,應該怎么做?
-
旋轉:繼續進行嘗試,如自旋鎖,延遲較短;
-
阻塞:掛起自己,請求調度器切換到另一個線程,代價較大。
綜合來看,先旋轉一小段時間再阻塞,是種不錯的選擇。
java.util.concurrent.locks.Lock 接口提供了 lock() 和 unlock() 兩個重要的方法,用于解決實際互斥問題。
Lock mutex = new MyLock();
mutex.lock();
try {
do something
} finally {
mutex.unlock();
}
測試-設置鎖
測試-設置來源于 getAndSet() 操作,通過一個原子布爾型狀態變量的值判斷當前鎖的狀態。若為 true 表示鎖忙,若為 false 表示鎖空閑。
public class TASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (state.getAndSet(true)) {
;
}
}
public void unlock() {
state.set(false);
}
}
測試-測試-設置鎖
TTASLock是升級版的TASLock算法,沒有直接調用 getAndSet() 方法,而是在鎖看起來空閑( state.get() 返回 false )時才調用。
public class TTASLock {
AtomicBoolean state = new AtomicBoolean(false);
public void lock() {
while (true) {
while (state.get()) {
;
}
if (! state.getAndSet(true)) {
return;
}
}
}
public void unlock() {
state.set(false);
}
}
這兩個算法都能保證無死鎖的互斥,但是TTASLock的性能會比TASLock高許多。
可以從計算機系統結構的高速緩存和局部性來解釋這個問題,每個處理器都有一個cache,cache的訪問速度比內存快好幾個數量級。當cache命中時,會立即加載這個值;當cache缺失時,會在內存或兩一個處理器的cache中尋找這個數據。尋找的過程比較漫長,處理器在總線上廣播這個地址,其他處理器監聽總線。若其他處理器在自己的cache中發現這個地址,則廣播該地址及其值來做出響應。若所有處理器都沒發現這個地址,則以內存地址及其所對應的值進行響應。
getAndSet() 的直接調用讓TASLock性能損失許多:
-
getAndSet() 的調用實質是總線上的一個廣播,這個調用將會延遲所有的線程,因為所有線程都要通過監聽總線通信。
-
getAndSet() 的調用會更新 state 的值,即使值仍為 true ,但是其他處理器cache中的鎖副本將會被丟棄,從而導致cache缺失。
-
當持有鎖的線程試圖釋放鎖時可能被延遲,因為總線被正在自旋的線程獨占。
與此相反,對于TTASLock算法采用的是本地旋轉(線程反復地重讀被緩存的值而不是反復地使用總線),線程A持有鎖時,線程B嘗試獲得鎖,但線程B只會在第一次讀鎖是cache缺失,之后每次cache命中不產生總線流量。
那么缺點來了,TTASLock釋放鎖時,會使各自旋線程處理器中的cache副本立即失效,這些線程會重新讀取這個值,造成總線流量風暴。
指數后退鎖
對于TTASLock算法,當鎖看似空閑( state.get() 返回 false )時,存在高爭用(多個線程試圖同時獲取一個鎖)的可能。高爭用意味著獲取鎖的可能性小,并且會造成總線流量增加。線程在重試之前回退一段時間是種不錯的選擇。
這里實現的指數回退算法的回退準則是,不成功嘗試的次數越多,發生爭用的可能性就越高,線程需要后退的時間就應越長。
public class BackoffLock {
private AtomicBoolean state = new AtomicBoolean(false);
private static final int MIN_DELAY = 10;
private static final int MAX_DELAY = 100;
public void lock() {
Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
while (true) {
while (state.get()) {
;
}
if (! state.getAndSet(true)) {
return;
} else {
backoff.backoff();
}
}
}
public void unlock() {
state.set(false);
}
}
class Backoff {
private final int minDelay, maxDelay;
int limit;
final Random random;
public Backoff(int min, int max) {
minDelay = min;
maxDelay = max;
limit = minDelay;
random = new Random();
}
public void backoff() {
int delay = random.nextInt(limit);
limit = Math.min(maxDelay, 2 * limit);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
;
}
}
}
指數后退算法解決了TTASLock釋放鎖時的高爭用問題,但是它的性能與 minDelay 和 maxDelay 的選取密切相關,并且很難找到一個通用兼容的值。
另外,BackoffLock算法還有兩個問題:
-
cache一致性流量:所有線程都在同一個共享存儲單元上旋轉;
-
臨界區利用率低:后退時間無法確定,線程延遲可能過長。
基于數組的鎖
下面的這些是隊列鎖,名字看上去奇形怪狀的,其實是發明者名字的首字母。隊列鎖就是將線程組織成一個隊列,讓每個線程在不同的存儲單元上旋轉,從而降低cache一致性流量。
基于循環數組實現隊列鎖ALock,每個線程檢測自己的slot對應的 flag[] 域來判斷是否輪到自己。
一個線程想獲得鎖,就要調用 lock() 方法,獲得自增 tail 獲得分配的slot號,然后等待這個slot空閑;當釋放鎖時,就要阻塞當前slot,然后讓下一個slot可運行。
當 flag[i] 為 true 時,那么這個線程就有權獲得鎖。任意時刻的 flag[] 數組中,應該只有一個slot的值為 true 。
public class ALock {
ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();
AtomicInteger tail;
volatile boolean [] flag;
int size;
public ALock() {
size = 100;
tail = new AtomicInteger(0);
flag = new boolean[size];
flag[0] = true;
}
public void lock() {
int slot = tail.getAndIncrement() % size;
mySlotIndex.set(slot);
while (! flag[slot]) {
;
}
}
public void unlock() {
int slot = mySlotIndex.get();
flag[slot] = false;
flag[(slot + 1) % size] = true;
}
}
-
mySlotIndex 是線程的局部變量,只能被一個線程訪問,每個線程都有自己獨立初始化的副本。不需要保存在共享存儲器,不需要同步,不會產生一致性流量。使用 get() 和 set() 方法來訪問局部變量的值。
-
tail 是常規變量,域被所有的線程共享,支持原子操作。
-
數組 flag[] 也是被多個線程共享的,但是每個線程都是在一個數組元素的本地cache副本上旋轉。
ALock對BackoffLock的改進:在多個共享存儲單元上旋轉,將cache無效性降到最低;把一個線程釋放鎖和另一個線程獲得該鎖之間的時間間隔最小化;先來先服務的公平性。但是,數組的大小至少與最大的并發線程數相同,并不是空間有效的,當并發線程最大個數為n時,同步L個不同對象就需要O(Ln)大小的空間。
CLH隊列鎖
CLH隊列鎖表示為 QNode 對象的鏈表,每個線程通過一個線程局部變量 pred 指向其前驅。每個線程通過檢測前驅結點的 locked 域來判斷是否輪到自己。如果該域為 true ,則前驅線程要么已經獲得鎖要么正在等待鎖;如果該域為 false ,則前驅進程已釋放鎖,輪到自己了。正常情況下,隊列鏈中只有一個結點的 locked 域為 false 。
當一個線程調用 lock() 方法想獲得鎖時,將自己的 locked 域置為 true ,表示該線程不準備釋放鎖,然后并將自己的結點加入到隊列鏈尾部。最后就是在前驅的 locked 域上旋轉,等待前驅釋放鎖。當這個線程調用 unlock() 方法要釋放鎖時,線程要將自己的 locked 域置為 false ,表示已經釋放鎖,然后將前驅結點作為自己的新結點以便日后訪問。
那么問題來了,為什么要在釋放鎖時做 myNode.set(myPred.get()) 這個處理呢?假設線程A釋放鎖,A的后繼結點為B,如果不使用這種處理方式,A在釋放鎖后馬上申請鎖將自己的 locked 域置為 true ,整個動作在B檢測到前驅A釋放鎖之前,那么將發生死鎖。
public class CLHLock {
AtomicReference<QNode> tail;
ThreadLocal<QNode> myPred;
ThreadLocal<QNode> myNode;
public CLHLock() {
tail = new AtomicReference<QNode>(new QNode());
myPred = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return null;
}
};
myNode = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return new QNode();
}
};
}
public void lock() {
QNode qnode = myNode.get();
qnode.locked = true;
QNode pred = tail.getAndSet(qnode);
myPred.set(pred);
while (pred.locked) {
;
}
}
public void unlock() {
QNode qnode = myNode.get();
qnode.locked = false;
myNode.set(myPred.get());
}
class QNode {
boolean locked = false;
}
}
如果最大線程數為n,有L個不同對象,那么CLHLock只需要O(L+n)空間。比ALock所需空間少,并且不需要知道可能使用鎖的最大線程數量。但是,在無cache的系統上性能較差,因為一次要訪問兩個結點,若這兩個結點內存位置較遠,性能損失會很大。
MCS隊列鎖
MCS隊列鎖通過檢測自己所在結點的 locked 的值來判斷是否輪到自己,等待這個域被前驅釋放鎖時改變。
線程若要獲得鎖,需把自己結點添加到鏈表的尾部。若隊列鏈表原先為空,則獲得鎖。否則,將前驅結點的 next 域指向自己,在自己的 locked 域上自旋等待,直到前驅將該域置為 false 。線程若要釋放鎖,判斷是否在隊尾,如果是只需將 tail 置為 null ,如果不是需將后繼的 locked 域置為 false 且將自己結點的 next 域置為默認的 null 。注意在隊尾的情況,可能有個線程正在獲得鎖,要等一下變為后一種情況。
public class MCSLock {
AtomicReference<QNode> tail;
ThreadLocal<QNode> myNode;
public MCSLock() {
tail = new AtomicReference<QNode>(null);
myNode = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return new QNode();
}
};
}
public void lock() {
QNode qnode = myNode.get();
QNode pred = tail.getAndSet(qnode);
if (pred != null) {
qnode.locked = true;
pred.next = qnode;
while (qnode.locked) {
;
}
}
}
public void unlock() {
QNode qnode = myNode.get();
if (qnode.next == null) {
if (tail.compareAndSet(qnode, null)) {
return;
}
while (qnode.next == null) {
;
}
}
qnode.next.locked = false;
qnode.next = null;
}
class QNode {
boolean locked = false;
QNode next = null;
}
}
結點能被重復使用,該鎖的空間復雜度為O(L+n)。MCSLock算法適合無cache的系統結構,因為每個線程只需控制自己自旋的存儲單元。但是,釋放鎖也需要旋轉,另外讀寫比較次數比CLHLock多。
時限隊列鎖
Lock接口中有個一個 tryLock() 方法,可以指定一個時限(獲得鎖可等待的最大時間),若獲得鎖超時則放棄嘗試。最后返回一個布爾值說明鎖是否申請成功。
時限隊列鎖TOLock是基于CLHLock類的,鎖是一個結點的虛擬隊列,每個結點在它的前驅結點上自旋等待鎖釋放。但是當這個線程超時時,不能簡單的拋棄它的隊列結點,而是將這個結點標記為已廢棄,這樣它的后繼們(如果有)就會注意到這個結點已放棄,轉而在放棄結點的前驅上自旋。為了保證隊列鏈表的連續性,每次申請鎖都會 new 一個 QNode 。
時限隊列鎖結點的域 pred 會特殊一點,它有3種類型的取值:
-
null :初始狀態,未獲得鎖或已釋放鎖;
-
AVAILABLE :一個靜態結點,表示對應結點已釋放鎖,申請鎖成功;
-
QNode : pred 域為 null 的前驅結點,表示對應結點因超時放棄鎖請求,在放棄請求時才會設置這個值。
申請鎖時,創建一個 pred 域為 null 的新結點并加入到鏈表尾部,若原先鏈表為空或前驅結點已釋放鎖,則獲得鎖。否則,在嘗試時間內,找到 pred 域為 null 的前驅結點,等待它釋放鎖。若在等待前驅結點釋放鎖的過程中超時,就嘗試從鏈表中刪除這個結點,要分這個結點是否有后繼兩種情況。
釋放鎖時,檢查該結點是否有后繼,若無后繼可直接把 tail 設置為 null ,否則將該結點的 pred 域指向 AVAILABLE 。
public class TOLock {
static QNode AVAILABLE = new QNode();
AtomicReference<QNode> tail;
ThreadLocal<QNode> myNode;
public TOLock() {
tail = new AtomicReference<QNode>(null);
myNode = new ThreadLocal<QNode>();
}
public boolean tryLock(long time, TimeUnit unit) {
long startTime = System.currentTimeMillis();
long patience = TimeUnit.MILLISECONDS.convert(time, unit);
QNode qnode = new QNode();
myNode.set(qnode);
qnode.pred = null;
QNode myPred = tail.getAndSet(qnode);
if (myPred == null || myPred.pred == AVAILABLE) {
return true;
}
while (System.currentTimeMillis() - startTime < patience) {
QNode predPred = myPred.pred;
if (predPred == AVAILABLE) {
return true;
} else {
if (predPred != null) {
myPred = predPred;
}
}
}
if (! tail.compareAndSet(qnode, myPred)) {
qnode.pred = myPred;
}
return false;
}
public void unlock() {
QNode qnode = myNode.get();
if (! tail.compareAndSet(qnode, null)) {
qnode.pred = AVAILABLE;
}
}
static class QNode {
public QNode pred = null;
}
}
優點:同CLHLock。缺點:每次申請鎖都要分配一個新結點,在鎖上旋轉的線程可能要回溯一個超時結點鏈。
上面實現的這些鎖算法不支持重入。我們可以使用銀行轉賬的例子來測試一下鎖的效果,任意賬戶間可以隨意轉賬,鎖生效時所有賬戶的總金額是不變的。完整的算法實現和測試代碼在 這里 。