Java多線程之自旋鎖與隊列鎖

phxv5212 8年前發布 | 20K 次閱讀 多線程 Java Java開發

編寫高效的并發程序,需要對互斥問題重新研究,設計出適用于多線程的互斥協議。那么問題來了,如果不能獲得鎖,應該怎么做?

  • 旋轉:繼續進行嘗試,如自旋鎖,延遲較短;

  • 阻塞:掛起自己,請求調度器切換到另一個線程,代價較大。

綜合來看,先旋轉一小段時間再阻塞,是種不錯的選擇。

java.util.concurrent.locks.Lock 接口提供了 lock() 和 unlock() 兩個重要的方法,用于解決實際互斥問題。

Lock mutex = new MyLock();
mutex.lock();
try {
    do something
} finally {
    mutex.unlock();
}

測試-設置鎖

測試-設置來源于 getAndSet() 操作,通過一個原子布爾型狀態變量的值判斷當前鎖的狀態。若為 true 表示鎖忙,若為 false 表示鎖空閑。

public class TASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (state.getAndSet(true)) {
            ;
        }
    }

    public void unlock() {
        state.set(false);
    }
}

測試-測試-設置鎖

TTASLock是升級版的TASLock算法,沒有直接調用 getAndSet() 方法,而是在鎖看起來空閑( state.get() 返回 false )時才調用。

public class TTASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

這兩個算法都能保證無死鎖的互斥,但是TTASLock的性能會比TASLock高許多。

可以從計算機系統結構的高速緩存和局部性來解釋這個問題,每個處理器都有一個cache,cache的訪問速度比內存快好幾個數量級。當cache命中時,會立即加載這個值;當cache缺失時,會在內存或兩一個處理器的cache中尋找這個數據。尋找的過程比較漫長,處理器在總線上廣播這個地址,其他處理器監聽總線。若其他處理器在自己的cache中發現這個地址,則廣播該地址及其值來做出響應。若所有處理器都沒發現這個地址,則以內存地址及其所對應的值進行響應。

getAndSet() 的直接調用讓TASLock性能損失許多:

  • getAndSet() 的調用實質是總線上的一個廣播,這個調用將會延遲所有的線程,因為所有線程都要通過監聽總線通信。

  • getAndSet() 的調用會更新 state 的值,即使值仍為 true ,但是其他處理器cache中的鎖副本將會被丟棄,從而導致cache缺失。

  • 當持有鎖的線程試圖釋放鎖時可能被延遲,因為總線被正在自旋的線程獨占。

與此相反,對于TTASLock算法采用的是本地旋轉(線程反復地重讀被緩存的值而不是反復地使用總線),線程A持有鎖時,線程B嘗試獲得鎖,但線程B只會在第一次讀鎖是cache缺失,之后每次cache命中不產生總線流量。

那么缺點來了,TTASLock釋放鎖時,會使各自旋線程處理器中的cache副本立即失效,這些線程會重新讀取這個值,造成總線流量風暴。

指數后退鎖

對于TTASLock算法,當鎖看似空閑( state.get() 返回 false )時,存在高爭用(多個線程試圖同時獲取一個鎖)的可能。高爭用意味著獲取鎖的可能性小,并且會造成總線流量增加。線程在重試之前回退一段時間是種不錯的選擇。

這里實現的指數回退算法的回退準則是,不成功嘗試的次數越多,發生爭用的可能性就越高,線程需要后退的時間就應越長。

public class BackoffLock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 10;
    private static final int MAX_DELAY = 100;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            } else {
                backoff.backoff();
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

class Backoff {
    private final int minDelay, maxDelay;
    int limit;
    final Random random;

    public Backoff(int min, int max) {
        minDelay = min;
        maxDelay = max;
        limit = minDelay;
        random = new Random();
    }

    public void backoff() {
        int delay = random.nextInt(limit);
        limit = Math.min(maxDelay, 2 * limit);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            ;
        }
    }
}

指數后退算法解決了TTASLock釋放鎖時的高爭用問題,但是它的性能與 minDelay 和 maxDelay 的選取密切相關,并且很難找到一個通用兼容的值。

另外,BackoffLock算法還有兩個問題:

  • cache一致性流量:所有線程都在同一個共享存儲單元上旋轉;

  • 臨界區利用率低:后退時間無法確定,線程延遲可能過長。

基于數組的鎖

下面的這些是隊列鎖,名字看上去奇形怪狀的,其實是發明者名字的首字母。隊列鎖就是將線程組織成一個隊列,讓每個線程在不同的存儲單元上旋轉,從而降低cache一致性流量。

基于循環數組實現隊列鎖ALock,每個線程檢測自己的slot對應的 flag[] 域來判斷是否輪到自己。

一個線程想獲得鎖,就要調用 lock() 方法,獲得自增 tail 獲得分配的slot號,然后等待這個slot空閑;當釋放鎖時,就要阻塞當前slot,然后讓下一個slot可運行。

當 flag[i] 為 true 時,那么這個線程就有權獲得鎖。任意時刻的 flag[] 數組中,應該只有一個slot的值為 true 。

public class ALock {
    ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();
    AtomicInteger tail;
    volatile boolean [] flag;
    int size;

    public ALock() {
        size = 100;
        tail = new AtomicInteger(0);
        flag = new boolean[size];
        flag[0] = true;
    }

    public void lock() {
        int slot = tail.getAndIncrement() % size;
        mySlotIndex.set(slot);
        while (! flag[slot]) {
            ;
        }
    }

    public void unlock() {
        int slot = mySlotIndex.get();
        flag[slot] = false;
        flag[(slot + 1) % size] = true;
    }
}
  • mySlotIndex 是線程的局部變量,只能被一個線程訪問,每個線程都有自己獨立初始化的副本。不需要保存在共享存儲器,不需要同步,不會產生一致性流量。使用 get() 和 set() 方法來訪問局部變量的值。

  • tail 是常規變量,域被所有的線程共享,支持原子操作。

  • 數組 flag[] 也是被多個線程共享的,但是每個線程都是在一個數組元素的本地cache副本上旋轉。

ALock對BackoffLock的改進:在多個共享存儲單元上旋轉,將cache無效性降到最低;把一個線程釋放鎖和另一個線程獲得該鎖之間的時間間隔最小化;先來先服務的公平性。但是,數組的大小至少與最大的并發線程數相同,并不是空間有效的,當并發線程最大個數為n時,同步L個不同對象就需要O(Ln)大小的空間。

CLH隊列鎖

CLH隊列鎖表示為 QNode 對象的鏈表,每個線程通過一個線程局部變量 pred 指向其前驅。每個線程通過檢測前驅結點的 locked 域來判斷是否輪到自己。如果該域為 true ,則前驅線程要么已經獲得鎖要么正在等待鎖;如果該域為 false ,則前驅進程已釋放鎖,輪到自己了。正常情況下,隊列鏈中只有一個結點的 locked 域為 false 。

當一個線程調用 lock() 方法想獲得鎖時,將自己的 locked 域置為 true ,表示該線程不準備釋放鎖,然后并將自己的結點加入到隊列鏈尾部。最后就是在前驅的 locked 域上旋轉,等待前驅釋放鎖。當這個線程調用 unlock() 方法要釋放鎖時,線程要將自己的 locked 域置為 false ,表示已經釋放鎖,然后將前驅結點作為自己的新結點以便日后訪問。

那么問題來了,為什么要在釋放鎖時做 myNode.set(myPred.get()) 這個處理呢?假設線程A釋放鎖,A的后繼結點為B,如果不使用這種處理方式,A在釋放鎖后馬上申請鎖將自己的 locked 域置為 true ,整個動作在B檢測到前驅A釋放鎖之前,那么將發生死鎖。

public class CLHLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myPred;
    ThreadLocal<QNode> myNode;

    public CLHLock() {
        tail = new AtomicReference<QNode>(new QNode());
        myPred = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return null;
            }
        };
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        qnode.locked = true;
        QNode pred = tail.getAndSet(qnode);
        myPred.set(pred);
        while (pred.locked) {
            ;
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        qnode.locked = false;
        myNode.set(myPred.get());
    }

    class QNode {
        boolean locked = false;
    }
}

如果最大線程數為n,有L個不同對象,那么CLHLock只需要O(L+n)空間。比ALock所需空間少,并且不需要知道可能使用鎖的最大線程數量。但是,在無cache的系統上性能較差,因為一次要訪問兩個結點,若這兩個結點內存位置較遠,性能損失會很大。

MCS隊列鎖

MCS隊列鎖通過檢測自己所在結點的 locked 的值來判斷是否輪到自己,等待這個域被前驅釋放鎖時改變。

線程若要獲得鎖,需把自己結點添加到鏈表的尾部。若隊列鏈表原先為空,則獲得鎖。否則,將前驅結點的 next 域指向自己,在自己的 locked 域上自旋等待,直到前驅將該域置為 false 。線程若要釋放鎖,判斷是否在隊尾,如果是只需將 tail 置為 null ,如果不是需將后繼的 locked 域置為 false 且將自己結點的 next 域置為默認的 null 。注意在隊尾的情況,可能有個線程正在獲得鎖,要等一下變為后一種情況。

public class MCSLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public MCSLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        QNode pred = tail.getAndSet(qnode);
        if (pred != null) {
            qnode.locked = true;
            pred.next = qnode;
            while (qnode.locked) {
                ;
            }
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (qnode.next == null) {
            if (tail.compareAndSet(qnode, null)) {
                return;
            }
            while (qnode.next == null) {
                ;
            }
        }
        qnode.next.locked = false;
        qnode.next = null;
    }

    class QNode {
        boolean locked = false;
        QNode next = null;
    }
}

結點能被重復使用,該鎖的空間復雜度為O(L+n)。MCSLock算法適合無cache的系統結構,因為每個線程只需控制自己自旋的存儲單元。但是,釋放鎖也需要旋轉,另外讀寫比較次數比CLHLock多。

時限隊列鎖

Lock接口中有個一個 tryLock() 方法,可以指定一個時限(獲得鎖可等待的最大時間),若獲得鎖超時則放棄嘗試。最后返回一個布爾值說明鎖是否申請成功。

時限隊列鎖TOLock是基于CLHLock類的,鎖是一個結點的虛擬隊列,每個結點在它的前驅結點上自旋等待鎖釋放。但是當這個線程超時時,不能簡單的拋棄它的隊列結點,而是將這個結點標記為已廢棄,這樣它的后繼們(如果有)就會注意到這個結點已放棄,轉而在放棄結點的前驅上自旋。為了保證隊列鏈表的連續性,每次申請鎖都會 new 一個 QNode 。

時限隊列鎖結點的域 pred 會特殊一點,它有3種類型的取值:

  • null :初始狀態,未獲得鎖或已釋放鎖;

  • AVAILABLE :一個靜態結點,表示對應結點已釋放鎖,申請鎖成功;

  • QNode : pred 域為 null 的前驅結點,表示對應結點因超時放棄鎖請求,在放棄請求時才會設置這個值。

申請鎖時,創建一個 pred 域為 null 的新結點并加入到鏈表尾部,若原先鏈表為空或前驅結點已釋放鎖,則獲得鎖。否則,在嘗試時間內,找到 pred 域為 null 的前驅結點,等待它釋放鎖。若在等待前驅結點釋放鎖的過程中超時,就嘗試從鏈表中刪除這個結點,要分這個結點是否有后繼兩種情況。

釋放鎖時,檢查該結點是否有后繼,若無后繼可直接把 tail 設置為 null ,否則將該結點的 pred 域指向 AVAILABLE 。

public class TOLock {
    static QNode AVAILABLE = new QNode();
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public TOLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>();
    }

    public boolean tryLock(long time, TimeUnit unit) {
        long startTime = System.currentTimeMillis();
        long patience = TimeUnit.MILLISECONDS.convert(time, unit);
        QNode qnode = new QNode();
        myNode.set(qnode);
        qnode.pred = null;
        QNode myPred = tail.getAndSet(qnode);
        if (myPred == null || myPred.pred == AVAILABLE) {
            return true;
        }
        while (System.currentTimeMillis() - startTime < patience) {
            QNode predPred = myPred.pred;
            if (predPred == AVAILABLE) {
                return true;
            } else {
                if (predPred != null) {
                    myPred = predPred;
                }
            }
        }
        if (! tail.compareAndSet(qnode, myPred)) {
            qnode.pred = myPred;
        }
        return false;
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (! tail.compareAndSet(qnode, null)) {
            qnode.pred = AVAILABLE;
        }
    }

    static class QNode {
        public QNode pred = null;
    }
}

優點:同CLHLock。缺點:每次申請鎖都要分配一個新結點,在鎖上旋轉的線程可能要回溯一個超時結點鏈。

上面實現的這些鎖算法不支持重入。我們可以使用銀行轉賬的例子來測試一下鎖的效果,任意賬戶間可以隨意轉賬,鎖生效時所有賬戶的總金額是不變的。完整的算法實現和測試代碼在 這里

 本文由用戶 phxv5212 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!