從一個簡單的Java單例示例談談并發
一個簡單的單例示例
單例模式可能是大家經常接觸和使用的一個設計模式,你可能會這么寫
public class UnsafeLazyInitiallization {
private static UnsafeLazyInitiallization instance;
private UnsafeLazyInitiallization() {
}
public static UnsafeLazyInitiallization getInstance(){
if(instance==null){ //1:A線程執行
instance=new UnsafeLazyInitiallization(); //2:B線程執行
}
return instance;
}
}
</code></pre>
上面代碼大家應該都知道,所謂的線程不安全的懶漢單例寫法。在UnsafeLazyInitiallization類中,假設A線程執行代碼1的同時,B線程執行代碼2,此時,線程A可能看到instance引用的對象還沒有初始化。
你可能會說,線程不安全,我可以對getInstance()方法做同步處理保證安全啊,比如下面這樣的寫法
public class SafeLazyInitiallization {
private static SafeLazyInitiallization instance;
private SafeLazyInitiallization() {
}
public synchronized static SafeLazyInitiallization getInstance(){
if(instance==null){
instance=new SafeLazyInitiallization();
}
return instance;
}
}
</code></pre>
這樣的寫法是保證了線程安全,但是由于getInstance()方法做了同步處理,synchronized將導致性能開銷。如getInstance()方法被多個線程頻繁調用,將會導致程序執行性能的下降。反之,如果getInstance()方法不會被多個線程頻繁的調用,那么這個方案將能夠提供令人滿意的性能。
那么,有沒有更優雅的方案呢?前人的智慧是偉大的,在早期的JVM中,synchronized存在巨大的性能開銷,因此,人們想出了一個“聰明”的技巧——雙重檢查鎖定。人們通過雙重檢查鎖定來降低同步的開銷。下面來讓我們看看
public class DoubleCheckedLocking { //1
private static DoubleCheckedLocking instance; //2
private DoubleCheckedLocking() {
}
public static DoubleCheckedLocking getInstance() { //3
if (instance == null) { //4:第一次檢查
synchronized (DoubleCheckedLocking.class) { //5:加鎖
if (instance == null) //6:第二次檢查
instance = new DoubleCheckedLocking(); //7:問題的根源出在這里
} //8
} //9
return instance; //10
} //11
}
</code></pre>
如上面代碼所示,如果第一次檢查instance不為null,那么就不需要執行下面的加鎖和初始化操作。因此,可以大幅降低synchronized帶來的性能開銷。雙重檢查鎖定看起來似乎很完美,但這是一個錯誤的優化!為什么呢?在線程執行到第4行,代碼讀取到instance不為null時,instance引用的對象有可能還沒有完成初始化。在第7行創建了一個對象,這行代碼可以分解為如下的3行偽代碼
memory=allocate(); //1:分配對象的內存空間
ctorInstance(memory); //2:初始化對象
instance=memory; //3:設置instance指向剛分配的內存地址
上面3行代碼中的2和3之間,可能會被重排序(在一些JIT編譯器上,這種重排序是真實發生的,如果不了解重排序,后文JMM會詳細解釋)。2和3之間重排序之后的執行時序如下
memory=allocate(); //1:分配對象的內存空間
instance=memory; //3:設置instance指向剛分配的內存地址,注意此時對象還沒有被初始化
ctorInstance(memory); //2:初始化對象
回到示例代碼第7行,如果發生重排序,另一個并發執行的線程B就有可能在第4行判斷instance不為null。線程B接下來將訪問instance所引用的對象,但此時這個對象可能還沒有被A線程初始化。在知曉問題發生的根源之后,我們可以想出兩個辦法解決
- 不允許2和3重排序
- 允許2和3重排序,但不允許其他線程“看到”這個重排序
下面就介紹這兩個解決方案的具體實現
基于volatile的解決方案
對于前面的基于雙重檢查鎖定的方案,只需要做一點小的修改,就可以實現線程安全的延遲初始化。請看下面的示例代碼
public class SafeDoubleCheckedLocking {
private volatile static SafeDoubleCheckedLocking instance;
private SafeDoubleCheckedLocking() {
}
public static SafeDoubleCheckedLocking getInstance() {
if (instance == null) {
synchronized (SafeDoubleCheckedLocking.class) {
if (instance == null)
instance = new SafeDoubleCheckedLocking();//instance為volatile,現在沒問題了
}
}
return instance;
}
}
</code></pre>
當聲明對象的引用為volatile后,前面偽代碼談到的2和3之間的重排序,在多線程環境中將會被禁止。
基于類初始化的解決方案
JVM在類的初始化階段(即在Class被加載后,且被線程使用之前),會執行類的初始化。在執行類的初始化期間,JVM會去獲取多個線程對同一個類的初始化。基于這個特性,實現的示例代碼如下
public class InstanceFactory {
private InstanceFactory() {
}
private static class InstanceHolder {
public static InstanceFactory instance = new InstanceFactory();
}
public static InstanceFactory getInstance() {
return InstanceHolder.instance; //這里將導致InstanceHolder類被初始化
}
}
</code></pre>
這個方案的本質是允許前面偽代碼談到的2和3重排序,但不允許其他線程“看到”這個重排序。在InstanceFactory示例代碼中,首次執行getInstance()方法的線程將導致InstanceHolder類被初始化。由于Java語言是多線程的,多個線程可能在同一時間嘗試去初始化同一個類或接口(比如這里多個線程可能會在同一時刻調用getInstance()方法來初始化IInstanceHolder類)。Java語言規定,對于每一個類和接口C,都有一個唯一的初始化鎖LC與之對應。從C到LC的映射,由JVM的具體實現去自由實現。JVM在類初始化期間會獲取這個初始化鎖,并且每個線程至少獲取一次鎖來確保這個類已經被初始化過了。
JMM
也許你還存在疑問,前面談的重排序是什么鬼?為什么volatile在某方面就能禁止重排序?現在引出本文的另一個話題JMM(Java Memory Model——Java內存模型)。什么是JMM呢?JMM是一個抽象概念,它并不存在。Java虛擬機規范中試圖定義一種Java內存模型(JMM)來屏蔽掉各種硬件和操作系統的內存訪問差異,以實現讓Java程序在各種平臺下都能達到一致的內存訪問效果。在此之前,主流程序語言(如C/C++等)直接使用物理硬件和操作系統的內存模型,因此,會由于不同平臺的內存模型的差異,有可能導致程序在一套平臺上并發完全正常,而在另一套平臺上并發訪問卻經常出錯,因此在某些場景就必須針對不同的平臺來編寫程序。
Java線程之間的通信由JMM來控制,JMM決定一個線程共享變量的寫入何時對另一個線程可見。JMM保證如果程序是正確同步的,那么程序的執行將具有順序一致性。從抽象的角度看,JMM定義了線程和主內存之間的抽象關系:線程之間的共享變量(實例域、靜態域和數據元素)存儲在主內存(Main Memory)中,每個線程都有一個私有的本地內存(Local Memory),本地內存中存儲了該線程以讀/寫共享變量的副本(局部變量、方法定義參數和異常處理參數是不會在線程之間共享,它們存儲在線程的本地內存中)。從物理角度上看,主內存僅僅是虛擬機內存的一部分,與物理硬件的主內存名字一樣,兩者可以互相類比;而本地內存,可與處理器高速緩存類比。Java內存模型的抽象示意圖如圖所示

這里先介紹幾個基礎概念:8種操作指令、內存屏障、順序一致性模型、as-if-serial、happens-before 、數據依賴性、 重排序。
8種操作指令
關于主內存與本地內存之間具體的交互協議,即一個變量如何從主內存拷貝到本地內存、如何從本地內存同步回主內存之類的實現細節,JMM中定義了以下8種操作來完成,虛擬機實現時必須保證下面提及的每種操作都是原子的、不可再分的(對于double和long類型的遍歷來說,load、store、read和write操作在某些平臺上允許有例外):
- lock(鎖定):作用于主內存的變量,它把一個變量標識為一條線程獨立的狀態。
- unlock(解鎖):作用于主內存的變量,它把一個處于鎖定狀態的變量釋放出來,釋放后的變量才可以被其他線程鎖定。
- read(讀取):作用于主內存的變量,它把一個變量的值從主內存傳輸到線程的本地內存中,以便隨后的load動作使用。
- load(載入):作用于本地內存的變量,它把read操作從主內存中得到變量值放入本地內存的變量副本中。
- use(使用):作用于本地內存的變量,它把本地內存中一個變量的值傳遞給執行引擎,每當虛擬機遇到一個需要使用到變量的值的字節碼指令時將會執行這個操作。
- assign(賦值):作用于本地內存的變量,它把一個從執行引擎接收到的值賦給本地內存的變量,每當虛擬機遇到一個給變量賦值的字節碼指令時執行這個操作。
- store(存儲):作用于本地內存的變量,它把本地內存中的一個變量的值傳送到主內存中,以便隨后的write操作使用。
- write(寫入):作用于主內存的變量,它把store操作從本地內存中提到的變量的值放入到主內存的變量中。
如果要把一個變量從主內存模型復制到本地內存,那就要順序的執行read和load操作,如果要把變量從本地內存同步回主內存,就要順序的執行store和write操作。注意,Java內存模型只要求上述兩個操作必須按順序執行,而沒有保證是連續執行。也就是說read與load之間、store與write之間是可插入其他指令的,如對主內存中的變量a、b進行訪問時,一種可能出現的順序是read a read b、load b、load a。
內存屏障
內存屏障是一組處理器指令(前面的8個操作指令),用于實現對內存操作的順序限制。包括LoadLoad, LoadStore, StoreLoad, StoreStore共4種內存屏障。內存屏障存在的意義是什么呢?它是在Java編譯器生成指令序列的適當位置插入內存屏障指令來禁止特定類型的處理器重排序,從而讓程序按我們預想的流程去執行,內存屏障是與相應的內存重排序相對應的。JMM把內存屏障指令分為4類

StoreLoad Barriers是一個“全能型 ”的屏障,它同時具有其他3個屏障的效果。現在的多數處理器大多支持該屏障(其他類型的屏障不一定被所有處理器支持)。執行該屏障開銷會很昂貴,因為當前處理器通常要把寫緩沖區中的數據全部刷新到內存中。
數據依賴性
如果兩個操作訪問同一個變量,且這兩個操作中有一個為寫操作,此時這兩個操作之間就存在數據依賴性。數據依賴性分3種類型:寫后讀、寫后寫、讀后寫。這3種情況,只要重排序兩個操作的執行順序,程序的執行結果就會被改變。編譯器和處理器可能對操作進行重排序。而它們進行重排序時,會遵守數據依賴性,不會改變數據依賴關系的兩個操作的執行順序。
這里所說的數據依賴性僅針對單個處理器中執行的指令序列和單個線程中執行的操作,不同處理器之間和不同線程之間的數據依賴性不被編譯器和處理器考慮。
順序一致性內存模型
順序一致性內存模型是一個理論參考模型,在設計的時候,處理器的內存模型和編程語言的內存模型都會以順序一致性內存模型作為參照。它有兩個特性:
- 一個線程中的所有操作必須按照程序的順序來執行
- (不管程序是否同步)所有線程都只能看到一個單一的操作執行順序。在順序一致性的內存模型中,每個操作必須原子執行并且立刻對所有線程可見。
從順序一致性模型中,我們可以知道程序所有操作完全按照程序的順序串行執行。而在JMM中,臨界區內的代碼可以重排序(但JMM不允許臨界區內的代碼“逸出”到臨界區外,那樣就破壞監視器的語義)。JMM會在退出臨界區和進入臨界區這兩個關鍵時間點做一些特別處理,使得線程在這兩個時間點具有與順序一致性模型相同的內存視圖。雖然線程A在臨界區內做了重排序,但由于監視器互斥執行的特性,這里的線程B根本無法“觀察”到線程A在臨界區內的重排序。這種重排序既提高了執行效率,又沒有改變程序的執行結果。像前面單例示例的類初始化解決方案就是采用了這個思想。
as-if-serial
as-if-serial的意思是不管怎么重排序,(單線程)程序的執行結果不能改變。編譯器、runtime和處理器都必須遵守as-if-serial語義。為了遵守as-if-serial語義,編譯器和處理器不會對存在數據依賴關系的操作做重排序。
as-if-serial語義把單線程程序保護了起來,遵守as-if-serial語義的編譯器、runtime和處理器共同為編寫單線程程序的程序員創建了一個幻覺:單線程程序是按程序的順序來執行的。as-if-serial語義使單線程程序員無需擔心重排序會干擾他們,也無需擔心內存可見性問題。
happens-before
happens-before是JMM最核心的概念。從JDK5開始,Java使用新的JSR-133內存模型,JSR-133 使用happens-before的概念闡述操作之間的內存可見性,如果一個操作執行的結果需要對另一個操作可見,那么這兩個操作之間必須存在happens-before關系。
happens-before規則如下:
- 程序次序法則:線程中的每個動作 A 都 happens-before 于該線程中的每一個動作 B,其中,在程序中,所有的動作 B 都出現在動作 A 之后。(注:此法則只是要求遵循 as-if-serial語義)
- 監視器鎖法則:對一個監視器鎖的解鎖 happens-before 于每一個后續對同一監視器鎖的加鎖。(顯式鎖的加鎖和解鎖有著與內置鎖,即監視器鎖相同的存儲語意。)
- volatile變量法則:對 volatile 域的寫入操作 happens-before 于每一個后續對同一域的讀操作。(原子變量的讀寫操作有著與 volatile 變量相同的語意。)(volatile變量具有可見性和讀寫原子性。)
- 線程啟動法則:在一個線程里,對 Thread.start 的調用會 happens-before 于每一個啟動線程中的動作。 線程終止法則:線程中的任何動作都 happens-before 于其他線程檢測到這個線程已終結,或者從 Thread.join 方法調用中成功返回,或者 Thread.isAlive 方法返回false。
- 中斷法則法則:一個線程調用另一個線程的 interrupt 方法 happens-before 于被中斷線程發現中斷(通過拋出InterruptedException, 或者調用 isInterrupted 方法和 interrupted 方法)。
- 終結法則:一個對象的構造函數的結束 happens-before 于這個對象 finalizer 開始。
- 傳遞性:如果 A happens-before 于 B,且 B happens-before 于 C,則 A happens-before 于 C。
happens-before與JMM的關系如下圖所示

as-if-serial語義和happens-before本質上一樣,參考順序一致性內存模型的理論,在不改變程序執行結果的前提下,給編譯器和處理器以最大的自由度,提高并行度。
重排序
終于談到我們反復提及的重排序了,重排序是指編譯器和處理器為了優化程序性能而對指令序列進行重新排序的一種手段。重排序分3種類型。
- 編譯器優化的重排序。編譯器在不改變單線程程序語義(as-if-serial )的前提下,可以重新安排語句的執行順序。
- 指令級并行的重排序。現代處理器采用了指令級并行技術(Instruction Level Parallelism,ILP)來將多條指令重疊執行。如果不存在數據依賴性,處理器可以改變語句對機器指令的執行順序。
- 內存系統的重排序。由于處理器使用緩存和讀/寫緩沖區,這使得加載和存儲操作看上去可能是在亂序執行。
從Java源代碼到最終實際執行的指令序列,會分別經歷下面3種重排序

上述的1屬于編譯器重排序,2和3屬于處理器重排序。這些重排序可能會導致多線程程序出現內存可見性問題。對于編譯器,JMM的編譯器重排序規則會禁止特定類型的編譯器重排序(不是所有的編譯器重排序都要禁止)。對于處理器重排序,JMM的處理器重排序規則會要求Java編譯器在生成指令序列時,插入特定類型的內存屏障指令,通過內存屏障指令來禁止特定類型的處理器重排序。
JMM屬于語言級的內存模型,它確保在不同的編譯器和不同的處理器平臺之上,通過禁止特定類型的編譯器重排序和處理器重排序,為程序員提供一致的內存可見性保證。
從JMM設計者的角度來說,在設計JMM時,需要考慮兩個關鍵因素:
- 程序員對內存模型的使用。程序員希望內存模型易于理解,易于編程。程序員希望基于一個強內存模型(程序盡可能的順序執行)來編寫代碼。
- 編譯器和處理器對內存模型的實現。編譯器和處理器希望內存模型對它們的束縛越少越好,這樣它們就可以做盡可能多的優化(對程序重排序,做盡可能多的并發)來提高性能。編譯器和處理器希望實現一個弱內存模型。
JMM設計就需要在這兩者之間作出協調。JMM對程序采取了不同的策略:
- 對于會改變程序執行結果的重排序,JMM要求編譯器和處理器必須禁止這種重排序。
- 對于不會改變程序執行結果的重排序,JMM對編譯器和處理器不作要求(JMM允許這種重排序)。
介紹完了這幾個基本概念,我們不難推斷出JMM是圍繞著在并發過程中如何處理原子性、可見性和有序性這三個特征來建立的:
- 原子性:由Java內存模型來直接保證的原子性操作就是我們前面介紹的8個原子操作指令,其中lock(lock指令實際在處理器上原子操作體現對總線加鎖或對緩存加鎖)和unlock指令操作JVM并未直接開放給用戶使用,但是卻提供了更高層次的字節碼指令monitorenter和monitorexit來隱式使用這兩個操作,這兩個字節碼指令反映到Java代碼中就是同步塊——synchronize關鍵字,因此在synchronized塊之間的操作也具備原子性。除了synchronize,在Java中另一個實現原子操作的重要方式是自旋CAS,它是利用處理器提供的cmpxchg指令實現的。至于自旋CAS后面J.U.C中會詳細介紹,它和volatile是整個J.U.C底層實現的核心。
- 可見性:可見性是指一個線程修改了共享變量的值,其他線程能夠立即得知這個修改。而我們上文談的happens-before原則禁止某些處理器和編譯器的重排序,來保證了JMM的可見性。而體現在程序上,實現可見性的關鍵字包含了volatile、synchronize和final。
- 有序性:談到有序性就涉及到前面說的重排序和順序一致性內存模型。我們也都知道了as-if-serial是針對單線程程序有序的,即使存在重排序,但是最終程序結果還是不變的,而多線程程序的有序性則體現在JMM通過插入內存屏障指令,禁止了特定類型處理器的重排序。通過前面8個操作指令和happens-before原則介紹,也不難推斷出,volatile和synchronized兩個關鍵字來保證線程之間的有序性,volatile本身就包含了禁止指令重排序的語義,而synchronized則是由監視器法則獲得。
J.U.C
談完了JMM,那么Java相關類庫是如何實現的呢?這里就談談J.U.C( java.util.concurrent),先來張J.U.C的思維導圖

不難看出,J.U.C由atomic、locks、tools、collections、executor這五部分組成。它們的實現基于volatile的讀寫和CAS所具有的volatile讀和寫。AQS(AbstractQueuedSynchronizer,隊列同步器)、非阻塞數據結構和原子變量類,這些J.U.C中的基礎類都是使用了這種模式實現的,而J.U.C中的高層類又依賴于這些基礎類來實現的。從整體上看,J.U.C的實現示意圖如下

也許你對volatile和CAS的底層實現原理不是很了解,這里先這里先簡單介紹下它們的底層實現
volatile
Java語言規范第三版對volatile的定義為:Java編程語言允許線程訪問共享變量,為了確保共享變量能被準確和一致性的更新,線程應該確保通過排他鎖單獨獲得這個變量。如果一個字段被聲明為volatile,Java內存模型確保這個所有線程看到這個值的變量是一致的。而volatile是如何來保證可見性的呢?如果對聲明了volatile的變量進行寫操作,JVM就會向處理器發送一條Lock前綴的指令,將這個變量所在緩存行的數據寫回到系統內存(Lock指令會在聲言該信號期間鎖總線/緩存,這樣就獨占了系統內存)。但是,就算是寫回到內存,如果其他處理器緩存的值還是舊的,再執行計算操作就會有問題。所以,在多處理器下,為了保證各個處理器的緩存是一致的,就會實現緩存一致性協議,每個處理器通過嗅探在總線(注意處理器不直接跟系統內存交互,而是通過總線)上傳播的數據來檢查自己緩存的值是不是過期了,當處理器發現直接緩存行對應的內存地址被修改,就會將當前處理器的緩存行設置成無效狀態,當處理器對這個數據進行修改操作的時候,會重新從系統內存中把數據讀到處理器緩存里。
CAS
CAS其實應用挺廣泛的,我們常常聽到的悲觀鎖樂觀鎖的概念,樂觀鎖(無鎖)指的就是CAS。這里只是簡單說下在并發的應用,所謂的樂觀并發策略,通俗的說,就是先進性操作,如果沒有其他線程爭用共享數據,那操作就成功了,如果共享數據有爭用,產生了沖突,那就采取其他的補償措施(最常見的補償措施就是不斷重試,治到成功為止,這里其實也就是自旋CAS的概念),這種樂觀的并發策略的許多實現都不需要把線程掛起,因此這種操作也被稱為非阻塞同步。而CAS這種樂觀并發策略操作和沖突檢測這兩個步驟具備的原子性,是靠什么保證的呢?硬件,硬件保證了一個從語義上看起來需要多次操作的行為只通過一條處理器指令就能完成。
也許你會存在疑問,為什么這種無鎖的方案一般會比直接加鎖效率更高呢?這里其實涉及到線程的實現和線程的狀態轉換。實現線程主要有三種方式:使用內核線程實現、使用用戶線程實現和使用用戶線程加輕量級進程混合實現。而Java的線程實現則依賴于平臺使用的線程模型。至于狀態轉換,Java定義了6種線程狀態,在任意一個時間點,一個線程只能有且只有其中的一種狀態,這6種狀態分別是:新建、運行、無限期等待、限期等待、阻塞、結束。 Java的線程是映射到操作系統的原生線程之上的,如果要阻塞或喚醒一個線程,都需要操作系統來幫忙完成,這就需要從用戶態轉換到核心態中,因此狀態轉換需要耗費很多的處理器時間。對于簡單的同步塊(被synchronized修飾的方法),狀態轉換消耗的時間可能比用戶代碼執行的時間還要長。所以出現了這種優化方案,在操作系統阻塞線程之間引入一段自旋過程或一直自旋直到成功為止。避免頻繁的切入到核心態之中。
但是這種方案其實也并不完美,在這里就說下CAS實現原子操作的三大問題
- ABA問題。因為CAS需要在操作值的時候,檢查值有沒有變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發現它的值沒有變化,但是實際上發生變化了。ABA解決的思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加1。JDK的Atomic包里提供了一個類AtomicStampedReference來解決ABA問題。不過目前來說這個類比較“雞肋”,大部分情況下ABA問題不會影響程序并發的正確性,如果需要解決ABA問題,改用原來的互斥同步可能會比原子類更高效。
- 循環時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。所以說如果是長時間占用鎖執行的程序,這種方案并不適用于此。
- 只能保證一個共享變量的原子操作。當對一個共享變量執行操作時,我們可以使用自旋CAS來保證原子性,但是對多個共享變量的操作時,自旋CAS就無法保證操作的原子性,這個時候可以用鎖。
談完了這兩個概念,下面我們就來逐個分析這五部分的具體源碼實現
atomic
atomic包的原子操作類提供了一種簡單、性能高效、線程安全操作一個變量的方式。atomic包里一共13個類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用、原子更新屬性。atomic包里的類基本使用Unsafe實現的包裝類。
下面通過一個簡單的CAS方式實現計數器(一個線程安全的計數器方法safeCount和一個非線程安全的計數器方法count)的示例來說下
public class CASTest {
public static void main(String[] args){
final Counter cas=new Counter();
List ts=new ArrayList(600);
long start=System.currentTimeMillis();
for(int j=0;j<100;j++){
Thread t=new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10000;i++){
cas.count();
cas.safeCount();
}
}
});
ts.add(t);
}
for(Thread t:ts){
t.start();
}
for(Thread t:ts){
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(cas.i);
System.out.println(cas.atomicI.get());
System.out.println(System.currentTimeMillis()-start);
}
}
public class Counter {
public AtomicInteger atomicI=new AtomicInteger(0);
public int i=0;
/**
* 使用CAS實現線程安全計數器
*/
public void safeCount(){
for(;;){
int i=atomicI.get();
boolean suc=atomicI.compareAndSet(i,++i);
if(suc){
break;
}
}
}
/**
* 非線程安全計數器
*/
public void count(){
i++;
}
}
</code></pre>
safeCount()方法的代碼塊其實是getandIncrement()方法的實現,源碼for循環體第一步優先取得atomicI里存儲的數值,第二步對atomicI的當前數值進行加1操作,關鍵的第三步調用compareAndSet()方法來進行原子更新操作,該方法先檢查當前數值是否等于current,等于意味著atomicI的值沒有被其他線程修改過,則將atomicI的當前數值更新成next的值,如果不等compareAndSet()方法會返回false,程序則進入for循環重新進行compareAndSet()方法操作進行不斷嘗試直到成功為止。在這里我們跟蹤下compareAndSet()方法如下
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
從上面源碼我們發現是使用Unsafe實現的,其實atomic里的類基本都是使用Unsafe實現的。我們再回到這個本地方法調用,這個本地方法在openjdk中依次調用c++代碼為unsafe.cpp、atomic.app和atomic_windows_x86.inline.hpp。關于本地方法實現的源代碼這里就不貼出來了,其實大體上是程序會根據當前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(Lock Cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身就會維護單處理器內的順序一致性,不需要lock前綴提供的內存屏障效果)。
locks
鎖是用來控制多個線程訪問共享資源的形式,Java SE 5之后,J.U.C中新增了locks來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能。只是在使用時需要顯示的獲取和釋放鎖。雖然它缺少了隱式獲取和釋放鎖的便捷性,但是卻擁有了鎖獲取和釋放的可操作性、可中斷的獲取鎖及超時獲取鎖等多種synchronized關鍵字不具備的同步特性。
locks在這我們只介紹下核心的AQS(AbstractQueuedSynchronizer,隊列同步器),AQS是用來構建鎖或者其他同步組件的基礎框架,它使用一個用volatile修飾的int成員變量表示同步狀態。通過內置的FIFO隊列來完成資源獲取線程的排隊工作。同步器的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態,在抽象方法的實現過程免不了要對同步狀態進行更改,這時候就會使用到AQS提供的3個方法:getState()、setState()和compareAndSetState()來進行操作,這是因為它們能夠保證狀態的改變是原子性的。為什么這么設計呢?因為鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現細節,而AQS面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和AQS很好的隔離了使用者和實現者鎖關注的領域。
現在我們就自定義一個獨占鎖來詳細解釋下AQS的實現機制
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -4387327721959839431L;
protected boolean isHeldExclusively() {
return getState() == 1;
}
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
</code></pre>
實現自定義組件的時候,我們可以看到,AQS可重寫的方法是tryAcquire()——獨占式獲取同步狀態、tryRelease()——獨占式釋放同步狀態、tryAcquireShared()——共享式獲取同步狀態、tryReleaseShared ()——共享式釋放同步狀態、isHeldExclusively()——是否被當前線程所獨占。這個示例中,獨占鎖Mutex是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器并實現了獨占式獲取和釋放同步狀態。在tryAcquire()中,如果經過CAS設置成功(同步狀態設置為1),則表示獲取了同步狀態,而在tryRelease()中,只是將同步狀態重置為0。接著我們對比一下重入鎖(ReentrantLock)的源碼實現
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/* Synchronizer providing all implementation mechanics /
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
//todo sth...
}
//todo sth...
}
</code></pre>
重入鎖分公平鎖和不公平鎖,默認使用的是不公平鎖,在這我們看到實現重入鎖大體上跟我們剛才自定義的獨占鎖差不多,但是有什么區別呢?我們看看重入鎖nonfairTryAcquire()方法實現:首先獲取同步狀態(默認是0),如果是0的話,CAS設置同步狀態,非0的話則判斷當前線程是否已占有鎖,如果是的話,則偏向更新同步狀態。從這里我們不難推斷出重入鎖的概念,同一個線程可以多次獲得同一把鎖,在釋放的時候也必須釋放相同次數的鎖。通過對比相信大家對自定義一個鎖有了一個初步的概念,也許你存在疑問我們重寫的這幾個方法在AQS哪地方用呢?現在我們來繼續往下跟蹤,我們深入跟蹤下剛才自定義獨占鎖lock()方法里面acquire()的實現
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個方法在AQS類里面,看到里面的tryAcquire(arg)大家也就明白了,tryAcquire(arg)方法獲取同步狀態,后面acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法就是說的節點構造、加入同步隊列及在同步隊列中自旋等待的AQS沒暴露給我們的相關操作。大體的流程就是首先調用自定義同步器實現的tryAcquire()方法,該方法保證線程安全的獲取同步狀態,如果獲取同步狀態失敗,則構造同步節點(獨占式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)并通過addWaiter()方法將該節點加入到同步隊列的尾部,最后調用acquireQueued()方法,使得該節點以“死循環”的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要靠前驅節點的出隊或阻塞線程被中斷來實現。也許你還是不明白剛才所說的,那么我們繼續跟蹤下addWaiter()方法的實現
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
</code></pre>
上面的代碼通過使用compareAndSetTail()方法來確保節點能夠被線程安全添加。在enq()方法中,同步器通過“死循環”來確保節點的正確添加,在”死循環“中只有通過CAS將節點設置成為尾節點之后,當前線程才能夠從該方法返回,否則,當前線程不斷地嘗試重試設置。
在節點進入同步隊列之后,發生了什么呢?現在我們繼續跟蹤下acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
從上面的代碼我們不難看出,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說每個線程)都在自省的觀察,當條件滿足時(自己的前驅節點是頭節點就進行CAS設置同步狀態)就獲得同步狀態,然后就可以從自旋的過程中退出,否則依舊在這個自旋的過程中。
collections
從前面的思維導圖我們可以看到并發容器包括鏈表、隊列、HashMap等.它們都是線程安全的。
- ConcurrentHashMap : 一個高效的線程安全的HashMap。
- CopyOnWriteArrayList : 在讀多寫少的場景中,性能非常好,遠遠高于vector。
- ConcurrentLinkedQueue : 高效并發隊列,使用鏈表實現,可以看成線程安全的LinkedList。
- BlockingQueue : 一個接口,JDK內部通過鏈表,數組等方式實現了這個接口,表示阻塞隊列,非常適合用作數據共享 。
- ConcurrentSkipListMap : 跳表的實現,這是一個Map,使用跳表數據結構進行快速查找 。
另外Collections工具類可以幫助我們將任意集合包裝成線程安全的集合。在這里重點說下ConcurrentHashMap和BlockingQueue這兩個并發容器。
我們都知道HashMap線程不安全的,而我們可以通過Collections.synchronizedMap(new HashMap<>())來包裝一個線程安全的HashMap或者使用線程安全的HashTable,但是它們的效率都不是很好,這時候我們就有了ConcurrentHashMap。為什么ConcurrentHashMap高效且線程安全呢?其實它使用了鎖分段技術來提高了并發的訪問率。假如容器里有多把鎖,每一把鎖用于鎖容器的一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效地提高并發訪問效率,這就是鎖分段技術。首先將數據分成一段段的存儲,然后給每段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。而既然數據被分成了多個段,線程如何定位要訪問的段的數據呢?這里其實是通過散列算法來定位的。
現在來談談阻塞隊列,阻塞隊列其實跟后面要談的線程池息息相關的,JDK7提供了7個阻塞隊列,分別是
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產者和消費者能夠高效率的進行通訊呢?讓我們先來看看JDK是如何實現的。
使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼如下:
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
</code></pre>
當我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產者主要通過LockSupport.park(this)來實現
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,然后調用unsafe.park阻塞當前線程。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
unsafe.park是個native方法,代碼如下:
public native void park(boolean isAbsolute, long time);
park這個方法會阻塞當前線程,只有以下四種情況中的一種發生時,該方法才會返回。
- 與park對應的unpark執行或已經執行時。注意:已經執行是指unpark先執行,然后再執行的park。
- 線程被中斷時。
- 如果參數中的time不是零,等待了指定的毫秒數時。
- 發生異常現象時。這些異常事先無法確定。
我們繼續看一下JVM是如何實現park方法的,park在不同的操作系統使用不同的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代碼如下:
void os::PlatformEvent::park() {
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
// In theory we could move the ST of 0 into _Event past the unlock(),
// but then we'd need a MEMBAR after the ST.
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
}
guarantee (_Event >= 0, "invariant") ;
}
}
pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以理解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。
當隊列滿時,生產者往阻塞隊列里插入一個元素,生產者線程會進入WAITING (parking)狀態。
executor
Executor框架提供了各種類型的線程池,不同的線程池應用了前面介紹的不同的堵塞隊列

Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實現類。 對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool()方法,雖然看起來創建的線程具有完全不同的功能特點,但其內部均使用了ThreadPoolExecutor實現
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
- newFixedThreadPool()方法的實現,它返回一個corePoolSize和maximumPoolSize一樣的,并使用了LinkedBlockingQueue任務隊列(無界隊列)的線程池。當任務提交非常頻繁時,該隊列可能迅速膨脹,從而系統資源耗盡。
- newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()方法的退化,只是簡單的將線程池數量設置為1。
- newCachedThreadPool()方法返回corePoolSize為0而maximumPoolSize無窮大的線程池,這意味著沒有任務的時候線程池內沒有現場,而當任務提交時,該線程池使用空閑線程執行任務,若無空閑則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列是直接提交隊列,它總是破事線程池增加新的線程來執行任務。當任務執行完后由于corePoolSize為0,因此空閑線程在指定時間內(60s)被回收。對于newCachedThreadPool(),如果有大量任務提交,而任務又不那么快執行時,那么系統變回開啟等量的線程處理,這樣做法可能會很快耗盡系統的資源,因為它會增加無窮大數量的線程。
由以上線程池的實現可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構造函數:
public ThreadPoolExecutor(
//核心線程池,指定了線程池中的線程數量
int corePoolSize,
//基本線程池,指定了線程池中的最大線程數量
int maximumPoolSize,
//當前線程池數量超過corePoolSize時,多余的空閑線程的存活時間,即多次時間內會被銷毀。
long keepAliveTime,
//keepAliveTime的單位
TimeUnit unit,
//任務隊列,被提交但尚未被執行的任務。
BlockingQueue workQueue,
//線程工廠,用于創建線程,一般用默認的即可
ThreadFactory threadFactory,
//拒絕策略,當任務太多來不及處理,如何拒絕任務。
RejectedExecutionHandler handler)
ThreadPoolExecutor的任務調度邏輯如下

從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:
- 首先線程池判斷基本線程池是否已滿,如果沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
- 其次線程池判斷工作隊列是否已滿,如果沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
- 最后線程池判斷整個線程池是否已滿,如果沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。
下面我們來看看ThreadPoolExecutor核心調度代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* workerCountOf(c)獲取當前線程池線程總數
* 當前線程數小于corePoolSize核心線程數時,會將任務通過addWorker(command, true)方法直接調度執行。
* 否則進入下個if,將任務加入等待隊列
**/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* workQueue.offer(command) 將任務加入等待隊列。
* 如果加入失敗(比如有界隊列達到上限或者使用了synchronousQueue)則會執行else。
*
**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* addWorker(command, false)直接交給線程池,
* 如果當前線程已達到maximumPoolSize,則提交失敗執行reject()拒絕策略。
**/
else if (!addWorker(command, false))
reject(command);
}
</code></pre>
從上面的源碼我們可以知道execute的執行步驟:
- 如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。
- 如果運行的線程等于或多于corePoolSize,則將任務加入到BlockingQueue。
- 如果無法將任務假如BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。
- 如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能的避免獲取全局鎖(那將會是一個嚴重的 可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后(當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。
參考閱讀
本文部分內容參考自《Java并發編程的藝術》、《深入理解Java虛擬機(第2版)》、《實戰Java高并發程序設計》、《深入Java內存模型》、《Java并發編程實踐》,感興趣的可自行查閱。
來源: HugNew