Java高級:線程同步lock與unlock使用

mapinfo600 12年前發布 | 11K 次閱讀 線程 Java Java開發

 

一、Lock與Synchronized區別

Java中可以使用Lock和Synchronized的可以實現對某個共享資源的同步,同時也可以實現對某些過程的原子性操作。

Lock可以使用Condition進行線程之間的調度,Synchronized則使用Object對象本身的notify, wait, notityAll調度機制,這兩種調度機制有什么異同呢?

Condition是Java5以后出現的機制,它有更好的靈活性,而且在一個對象里面可以有多個Condition(即對象監視器),則線程可以注冊在不同的Condition,從而可以有選擇性的調度線程,更加靈活。

Synchronized就相當于整個對象只有一個單一的Condition(即該對象本身)所有的線程都注冊在它身上,線程調度的時候之后調度所有得注冊線程,沒有選擇權,會出現相當大的問題 。

所以,Lock 實現提供了比使用 synchronized 方法和語句可獲得的更廣泛的鎖定操作。此實現允許更靈活的結構,可以具有差別很大的屬性,可以支持多個相關的 Condition 對象。

鎖是控制多個線程對共享資源進行訪問的工具。通常,鎖提供了對共享資源的獨占訪問。一次只能有一個線程獲得鎖,對共享資源的所有訪問都需要首先獲得鎖。不過,某些鎖可能允許對共享資源并發訪問,如 ReadWriteLock 的讀取鎖。

synchronized 方法或語句的使用提供了對與每個對象相關的隱式監視器鎖的訪問,但卻強制所有鎖獲取和釋放均要出現在一個塊結構中:當獲取了多個鎖時,它們必須以相反的順序釋放,且必須在與所有鎖被獲取時相同的詞法范圍內釋放所有鎖。

雖然 synchronized 方法和語句的范圍機制使得使用監視器鎖編程方便了很多,而且還幫助避免了很多涉及到鎖的常見編程錯誤,但有時也需要以更為靈活的方式使用鎖。例如,某些遍歷并發訪問的數據結果的算法要求使用 “hand-over-hand” 或 “chain locking”:獲取節點 A 的鎖,然后再獲取節點 B 的鎖,然后釋放 A 并獲取 C,然后釋放 B 并獲取 D,依此類推。Lock 接口的實現允許鎖在不同的作用范圍內獲取和釋放,并允許以任何順序獲取和釋放多個鎖,從而支持使用這種技術。

二、java.util.concurrent.locks類結構

上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現類都持有自己內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現不同的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。

基于AQS構建的Synchronizer包括 ReentrantLock , Semaphore , CountDownLatchReetrantRead WriteLock , FutureTask 等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不一樣而已。

ReentrantLock:需要記錄當前線程獲取原子狀態的次數,如果次數為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據著鎖從而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。以下為ReetranLock的FairSync的tryAcquire實現代碼解析:

Semaphore:則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現并發量的控制,Semaphore一開始設置許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現:

CountDownLatch:閉鎖則要保持其狀態,在這個狀態到達終止態之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDown是sync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠將這個初始值減到0,從而釋放原子狀態,讓等待的所有線程通過。

FutureTask:需要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續還是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是檢查狀態,如果是RUNNING狀態那么讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,并且通過unpark喚醒正在等待的線程,返回結果。

以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪里呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()和setState()兩個方法(protected)。這樣就為上述狀態解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態。

AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現的,比如線程獨占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared為需要共享形式獲取的synchronizer實現。

ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現) Semaphore 內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch相似,因為公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。 CountDownLatch 內部類Sync實現了tryAcquireShared和tryReleasedShared。 FutureTask 內部類Sync也實現了tryAcquireShared和tryReleasedShared。

其實使用過一些JAVA synchronizer的之后,然后結合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實我們自己也可以構造synchronizer。如下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。

三、lock與unlock使用

下面是一個場景,針對這個場景提出兩種解決方案。 一個中轉站,可以接納貨物,然后發出貨物,這是需要建一個倉庫,相當于一個緩沖區,當倉庫滿的時候,不能接貨,倉庫空的時候,不能發貨。

第一種,用一個Condition去解決,有可能會出問題。

package com.zxx;

import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock;

/** 
* 單個Condition去控制一個緩沖區,多線程對緩沖區做讀寫操作,要保證緩沖區滿的時侯不會 
* 被寫,空的時候不會被讀;單個Condition控制會出錯誤: 當緩沖區還有一個位置時,多個寫線程 
* 同時訪問,則只有一個寫線程可以對其進行寫操作,操作完之后,喚醒在這個condition上等待的 
* 其他幾個寫線程,如果判斷用IF語句的話就會出現繼續向緩沖區添加。 
* @author Administrator 
* 
*/ 
public class ConditionError { 
    Lock lock = new ReentrantLock(); 
    Condition condition = lock.newCondition(); 
    String[] container = new String[10]; 
    int index = 0; 
    public static void main(String[] args) { 
        ConditionError conditionError = new ConditionError(); 
        conditionError.test(); 
    } 
    public void test(){ 
        ExecutorService threadPool = Executors.newCachedThreadPool(); 
        for(int i = 0; i < 14; i++){//先用14個線程去寫,則有4個線程會被阻塞 
            threadPool.execute(new Runnable(){

                @Override 
                public void run() { 
                    put(); 
                } 
            }); 
        } 
        Executors.newSingleThreadExecutor().execute(new Runnable(){//用一個線程去取,則會通知4個阻塞的寫線程工作,此時 
                                                                //會有一個線程向緩沖區寫,寫完后去通知在這個condition上等待 
                                                                //的取線程,這是它的本意,但是它喚醒了寫線程,因為只有一個condition 
                                                                //不能有選擇的喚醒寫取線程,此時就需要有多個Condition 
            @Override 
            public void run() { 
                try { 
                    Thread.sleep(10000); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
                get(); 
            } 
        }); 
    } 
    /** 
     * 向緩沖去寫數據 
     */ 
    public void put(){ 
        lock.lock(); 
        try{ 
            System.out.println(Thread.currentThread().getName() + "當前位置:" + index + "-----------------------------"); 
            while(index == 10){ 
                try { 
                    System.out.println(Thread.currentThread().getName() + "處于阻塞狀態!"); 
                    condition.await(); 
//                    index = 0; 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            container[index] = new String(new Random().nextInt() + ""); 
            condition.signalAll(); 
            index ++; 
        } finally { 
            lock.unlock(); 
        } 
    } 
    /** 
     * 從緩沖區拿數據 
     */ 
    public void get(){ 
        lock.lock(); 
        try{ 
            while(index == 0){ 
                try { 
                    System.out.println("get--------" + Thread.currentThread().getName() + "處于阻塞"); 
                    condition.await(); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
            index --; 
            System.out.println("get---------" + Thread.currentThread().getName() + "喚醒阻塞"); 
            condition.signalAll(); 
        } finally { 
            lock.unlock(); 
        } 
    } 
}

第二種解決方案,用java api中的 一個例子 。

來源:xianggao

 本文由用戶 mapinfo600 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!