Java高級:線程同步lock與unlock使用
一、Lock與Synchronized區別
Java中可以使用Lock和Synchronized的可以實現對某個共享資源的同步,同時也可以實現對某些過程的原子性操作。
Lock可以使用Condition進行線程之間的調度,Synchronized則使用Object對象本身的notify, wait, notityAll調度機制,這兩種調度機制有什么異同呢?
Condition是Java5以后出現的機制,它有更好的靈活性,而且在一個對象里面可以有多個Condition(即對象監視器),則線程可以注冊在不同的Condition,從而可以有選擇性的調度線程,更加靈活。
Synchronized就相當于整個對象只有一個單一的Condition(即該對象本身)所有的線程都注冊在它身上,線程調度的時候之后調度所有得注冊線程,沒有選擇權,會出現相當大的問題 。
所以,Lock 實現提供了比使用 synchronized 方法和語句可獲得的更廣泛的鎖定操作。此實現允許更靈活的結構,可以具有差別很大的屬性,可以支持多個相關的 Condition 對象。
鎖是控制多個線程對共享資源進行訪問的工具。通常,鎖提供了對共享資源的獨占訪問。一次只能有一個線程獲得鎖,對共享資源的所有訪問都需要首先獲得鎖。不過,某些鎖可能允許對共享資源并發訪問,如 ReadWriteLock 的讀取鎖。
synchronized 方法或語句的使用提供了對與每個對象相關的隱式監視器鎖的訪問,但卻強制所有鎖獲取和釋放均要出現在一個塊結構中:當獲取了多個鎖時,它們必須以相反的順序釋放,且必須在與所有鎖被獲取時相同的詞法范圍內釋放所有鎖。
雖然 synchronized 方法和語句的范圍機制使得使用監視器鎖編程方便了很多,而且還幫助避免了很多涉及到鎖的常見編程錯誤,但有時也需要以更為靈活的方式使用鎖。例如,某些遍歷并發訪問的數據結果的算法要求使用 “hand-over-hand” 或 “chain locking”:獲取節點 A 的鎖,然后再獲取節點 B 的鎖,然后釋放 A 并獲取 C,然后釋放 B 并獲取 D,依此類推。Lock 接口的實現允許鎖在不同的作用范圍內獲取和釋放,并允許以任何順序獲取和釋放多個鎖,從而支持使用這種技術。
二、java.util.concurrent.locks類結構
上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現類都持有自己內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現不同的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。
基于AQS構建的Synchronizer包括 ReentrantLock , Semaphore , CountDownLatch , ReetrantRead WriteLock , FutureTask 等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不一樣而已。
ReentrantLock:需要記錄當前線程獲取原子狀態的次數,如果次數為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據著鎖從而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。以下為ReetranLock的FairSync的tryAcquire實現代碼解析:
Semaphore:則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現并發量的控制,Semaphore一開始設置許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現:
CountDownLatch:閉鎖則要保持其狀態,在這個狀態到達終止態之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDown是sync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠將這個初始值減到0,從而釋放原子狀態,讓等待的所有線程通過。
FutureTask:需要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續還是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是檢查狀態,如果是RUNNING狀態那么讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,并且通過unpark喚醒正在等待的線程,返回結果。
以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪里呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()和setState()兩個方法(protected)。這樣就為上述狀態解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態。
AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現的,比如線程獨占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared為需要共享形式獲取的synchronizer實現。
ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現) Semaphore 內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch相似,因為公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。 CountDownLatch 內部類Sync實現了tryAcquireShared和tryReleasedShared。 FutureTask 內部類Sync也實現了tryAcquireShared和tryReleasedShared。
其實使用過一些JAVA synchronizer的之后,然后結合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實我們自己也可以構造synchronizer。如下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。
三、lock與unlock使用
下面是一個場景,針對這個場景提出兩種解決方案。 一個中轉站,可以接納貨物,然后發出貨物,這是需要建一個倉庫,相當于一個緩沖區,當倉庫滿的時候,不能接貨,倉庫空的時候,不能發貨。
第一種,用一個Condition去解決,有可能會出問題。
package com.zxx;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 單個Condition去控制一個緩沖區,多線程對緩沖區做讀寫操作,要保證緩沖區滿的時侯不會
* 被寫,空的時候不會被讀;單個Condition控制會出錯誤: 當緩沖區還有一個位置時,多個寫線程
* 同時訪問,則只有一個寫線程可以對其進行寫操作,操作完之后,喚醒在這個condition上等待的
* 其他幾個寫線程,如果判斷用IF語句的話就會出現繼續向緩沖區添加。
* @author Administrator
*
*/
public class ConditionError {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
String[] container = new String[10];
int index = 0;
public static void main(String[] args) {
ConditionError conditionError = new ConditionError();
conditionError.test();
}
public void test(){
ExecutorService threadPool = Executors.newCachedThreadPool();
for(int i = 0; i < 14; i++){//先用14個線程去寫,則有4個線程會被阻塞
threadPool.execute(new Runnable(){
@Override
public void run() {
put();
}
});
}
Executors.newSingleThreadExecutor().execute(new Runnable(){//用一個線程去取,則會通知4個阻塞的寫線程工作,此時
//會有一個線程向緩沖區寫,寫完后去通知在這個condition上等待
//的取線程,這是它的本意,但是它喚醒了寫線程,因為只有一個condition
//不能有選擇的喚醒寫取線程,此時就需要有多個Condition
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
get();
}
});
}
/**
* 向緩沖去寫數據
*/
public void put(){
lock.lock();
try{
System.out.println(Thread.currentThread().getName() + "當前位置:" + index + "-----------------------------");
while(index == 10){
try {
System.out.println(Thread.currentThread().getName() + "處于阻塞狀態!");
condition.await();
// index = 0;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
container[index] = new String(new Random().nextInt() + "");
condition.signalAll();
index ++;
} finally {
lock.unlock();
}
}
/**
* 從緩沖區拿數據
*/
public void get(){
lock.lock();
try{
while(index == 0){
try {
System.out.println("get--------" + Thread.currentThread().getName() + "處于阻塞");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
index --;
System.out.println("get---------" + Thread.currentThread().getName() + "喚醒阻塞");
condition.signalAll();
} finally {
lock.unlock();
}
}
}
第二種解決方案,用java api中的 一個例子 。
來源:xianggao