聊聊并發(6)ConcurrentLinkedQueue的實現原理分析

trll7485 8年前發布 | 8K 次閱讀 并發 Java開發

來自: http://www.importnew.com/17539.html

本系列:

聊聊并發(1)深入分析Volatile的實現原理

聊聊并發(2)Java SE1.6中的Synchronized

聊聊并發(3)Java線程池的分析和使用

聊聊并發(4)深入分析ConcurrentHashMap

聊聊并發(5):原子操作的實現原理

1.    引言

在并發編程中我們有時候需要使用線程安全的隊列。如果我們要實現一個線程安全的隊列有兩種實現方式一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現,而非阻塞的實現方式則可以使用循環CAS的方式來實現,本文讓我們一起來研究下Doug Lea是如何使用非阻塞的方式來實現線程安全隊列ConcurrentLinkedQueue的,相信從大師身上我們能學到不少并發編程的技巧。

2.    ConcurrentLinkedQueue的介紹

ConcurrentLinkedQueue是一個基于鏈接節點的無界線程安全隊列,它采用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部,當我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法來實現,該算法在Michael & Scott算法上進行了一些修改, Michael & Scott算法的詳細信息可以參見 參考資料一

3.    ConcurrentLinkedQueue的結構

我們通過ConcurrentLinkedQueue的類圖來分析一下它的結構。

(圖1)

ConcurrentLinkedQueue由head節點和tair節點組成,每個節點(Node)由節點元素(item)和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個next關聯起來,從而組成一張鏈表結構的隊列。默認情況下head節點存儲的元素為空,tair節點等于head節點。

1 private   transient   volatile   Node<e> tail = head;

4.    入隊列

入隊列就是將入隊節點添加到隊列的尾部。為了方便理解入隊時隊列的變化,以及head節點和tair節點的變化,每添加一個節點我就做了一個隊列的快照圖。

(圖二)

  • 第一步添加元素1。隊列更新head節點的next節點為元素1節點。又因為tail節點默認情況下等于head節點,所以它們的next節點都指向元素1節點。
  • 第二步添加元素2。隊列首先設置元素1節點的next節點為元素2節點,然后更新tail節點指向元素2節點。
  • 第三步添加元素3,設置tail節點的next節點為元素3節點。
  • 第四步添加元素4,設置元素3的next節點為元素4節點,然后將tail節點指向元素4節點。

通過debug入隊過程并觀察head節點和tail節點的變化,發現入隊主要做兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點。第二是更新tail節點,如果tail節點的next節點不為空,則將入隊節點設置成tail節點,如果tail節點的next節點為空,則將入隊節點設置成tail的next節點,所以tail節點不總是尾節點,理解這一點對于我們研究源碼會非常有幫助。

上面的分析讓我們從單線程入隊的角度來理解入隊過程,但是多個線程同時進行入隊情況就變得更加復雜,因為可能會出現其他線程插隊的情況。如果有一個線程正在入隊,那么它必須先獲取尾節點,然后設置尾節點的下一個節點為入隊節點,但這時可能有另外一個線程插隊了,那么隊列的尾節點就會發生變化,這時當前線程要暫停入隊操作,然后重新獲取尾節點。讓我們再通過源碼來詳細分析下它是如何使用CAS算法來入隊的。

public boolean offer(E e) {

        if (e == null) throw new NullPointerException();

        //入隊前,創建一個入隊節點

        Node</e><e> n = new Node</e><e>(e);

        retry:

        //死循環,入隊不成功反復入隊。

        for (;;) {

            //創建一個指向tail節點的引用

            Node</e><e> t = tail;

            //p用來表示隊列的尾節點,默認情況下等于tail節點。

            Node</e><e> p = t;

            for (int hops = 0; ; hops++) {

            //獲得p節點的下一個節點。

                Node</e><e> next = succ(p);

     //next節點不為空,說明p不是尾節點,需要更新p后在將它指向next節點

                if (next != null) {

                   //循環了兩次及其以上,并且當前節點還是不等于尾節點

                    if (hops > HOPS && t != tail)

                        continue retry;

                    p = next;

                }

                //如果p是尾節點,則設置p節點的next節點為入隊節點。

                else if (p.casNext(null, n)) {

                  //如果tail節點有大于等于1個next節點,則將入隊節點設置成tair節點,更新失敗了也沒關系,因為失敗了表示有其他線程成功更新了tair節點。

if (hops >= HOPS)

                        casTail(t, n); // 更新tail節點,允許失敗

                    return true;

                }

               // p有next節點,表示p的next節點是尾節點,則重新設置p節點

                else {

                    p = succ(p);

                }

            }

        }

    }

從源代碼角度來看整個入隊過程主要做二件事情。第一是定位出尾節點,第二是使用CAS算法能將入隊節點設置成尾節點的next節點,如不成功則重試。

第一步定位尾節點。tail節點并不總是尾節點,所以每次入隊都必須先通過tail節點來找到尾節點,尾節點可能就是tail節點,也可能是tail節點的next節點。代碼中循環體中的第一個if就是判斷tail是否有next節點,有則表示next節點可能是尾節點。獲取tail節點的next節點需要注意的是p節點等于p的next節點的情況,只有一種可能就是p節點和p的next節點都等于空,表示這個隊列剛初始化,正準備添加第一次節點,所以需要返回head節點。獲取p節點的next節點代碼如下

final Node</e><e> succ(Node</e><e> p) {

         Node</e><e> next = p.getNext();

         return (p == next) ? head : next;

     }

第二步設置入隊節點為尾節點。p.casNext( null , n)方法用于將入隊節點設置為當前隊列尾節點的next節點,p如果是null表示p是當前隊列的尾節點,如果不為null表示有其他線程更新了尾節點,則需要重新獲取當前隊列的尾節點。

hops 的設計意圖 。上面分析過對于先進先出的隊列入隊所要做的事情就是將入隊節點設置成尾節點,doug lea寫的代碼和邏輯還是稍微有點復雜。那么我用以下方式來實現行不行?

public boolean offer(E e) {

       if (e == null)

         throw new NullPointerException();

      Node</e><e> n = new Node</e><e>(e);

      for (;;) {

         Node</e><e> t = tail;

         if (t.casNext(null, n) && casTail(t, n)) {

            return true;

         }

      }

    }

讓tail節點永遠作為隊列的尾節點,這樣實現代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點就是每次都需要使用循環CAS更新tail節點。如果能減少CAS更新tail節點的次數,就能提高入隊的效率,所以doug lea使用hops變量來控制并減少tail節點的更新頻率,并不是每次節點入隊后都將 tail節點更新成尾節點,而是當 tail節點和尾節點的距離大于等于常量HOPS的值(默認等于1)時才更新tail節點,tail和尾節點的距離越長使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為循環體需要多循環一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠遠大于讀操作,所以入隊效率會有所提升。

private static final int HOPS = 1;

還有一點需要注意的是入隊方法永遠返回true,所以不要通過返回值判斷入隊是否成功。

5.    出隊列

    出隊列的就是 從隊列里返回一個節點元素 ,并清空該節點對元素的引用。讓我們通過每個節點出隊的快照來觀察下head節點的變化。

從上圖可知,并不是每次出隊時都更新head節點,當head節點里有元素時,直接彈出head節點里的元素,而不會更新head節點。只有當head節點里沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變量來減少使用CAS更新head節點的消耗,從而提高出隊效率。讓我們再通過源碼來深入分析下出隊過程。

public E poll() {

           Node</e><e> h = head;

       // p表示頭節點,需要出隊的節點

           Node</e><e> p = h;

           for (int hops = 0;; hops++) {

                // 獲取p節點的元素

                E item = p.getItem();

                // 如果p節點的元素不為空,使用CAS設置p節點引用的元素為null,如果成功則返回p節點的元素。

                if (item != null && p.casItem(item, null)) {

                     if (hops >= HOPS) {

                          //將p節點下一個節點設置成head節點

                          Node</e><e> q = p.getNext();

                          updateHead(h, (q != null) ? q : p);

                     }

                     return item;

                }

                // 如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外一個線程修改了。那么獲取p節點的下一個節點

                Node</e><e> next = succ(p);

                // 如果p的下一個節點也為空,說明這個隊列已經空了

                if (next == null) {

              // 更新頭節點。

                     updateHead(h, p);

                     break;

                }

                // 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點

                p = next;

           }

           return null;

     }

首先獲取頭節點的元素,然后判斷頭節點元素是否為空,如果為空,表示另外一個線程已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設置成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個線程已經進行了一次出隊操作更新了head節點,導致元素發生了變化,需要重新獲取頭節點。

6.    參考資料

1.    引言

在并發編程中我們有時候需要使用線程安全的隊列。如果我們要實現一個線程安全的隊列有兩種實現方式一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現,而非阻塞的實現方式則可以使用循環CAS的方式來實現,本文讓我們一起來研究下Doug Lea是如何使用非阻塞的方式來實現線程安全隊列ConcurrentLinkedQueue的,相信從大師身上我們能學到不少并發編程的技巧。

2.    ConcurrentLinkedQueue的介紹

ConcurrentLinkedQueue是一個基于鏈接節點的無界線程安全隊列,它采用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部,當我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法來實現,該算法在Michael & Scott算法上進行了一些修改, Michael & Scott算法的詳細信息可以參見 參考資料一

3.    ConcurrentLinkedQueue的結構

我們通過ConcurrentLinkedQueue的類圖來分析一下它的結構。

(圖1)

ConcurrentLinkedQueue由head節點和tair節點組成,每個節點(Node)由節點元素(item)和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個next關聯起來,從而組成一張鏈表結構的隊列。默認情況下head節點存儲的元素為空,tair節點等于head節點。

1 private   transient   volatile   Node<e> tail = head;

4.    入隊列

入隊列就是將入隊節點添加到隊列的尾部。為了方便理解入隊時隊列的變化,以及head節點和tair節點的變化,每添加一個節點我就做了一個隊列的快照圖。

(圖二)

  • 第一步添加元素1。隊列更新head節點的next節點為元素1節點。又因為tail節點默認情況下等于head節點,所以它們的next節點都指向元素1節點。
  • 第二步添加元素2。隊列首先設置元素1節點的next節點為元素2節點,然后更新tail節點指向元素2節點。
  • 第三步添加元素3,設置tail節點的next節點為元素3節點。
  • 第四步添加元素4,設置元素3的next節點為元素4節點,然后將tail節點指向元素4節點。

通過debug入隊過程并觀察head節點和tail節點的變化,發現入隊主要做兩件事情,第一是將入隊節點設置成當前隊列尾節點的下一個節點。第二是更新tail節點,如果tail節點的next節點不為空,則將入隊節點設置成tail節點,如果tail節點的next節點為空,則將入隊節點設置成tail的next節點,所以tail節點不總是尾節點,理解這一點對于我們研究源碼會非常有幫助。

上面的分析讓我們從單線程入隊的角度來理解入隊過程,但是多個線程同時進行入隊情況就變得更加復雜,因為可能會出現其他線程插隊的情況。如果有一個線程正在入隊,那么它必須先獲取尾節點,然后設置尾節點的下一個節點為入隊節點,但這時可能有另外一個線程插隊了,那么隊列的尾節點就會發生變化,這時當前線程要暫停入隊操作,然后重新獲取尾節點。讓我們再通過源碼來詳細分析下它是如何使用CAS算法來入隊的。

public boolean offer(E e) {

        if (e == null) throw new NullPointerException();

        //入隊前,創建一個入隊節點

        Node</e><e> n = new Node</e><e>(e);

        retry:

        //死循環,入隊不成功反復入隊。

        for (;;) {

            //創建一個指向tail節點的引用

            Node</e><e> t = tail;

            //p用來表示隊列的尾節點,默認情況下等于tail節點。

            Node</e><e> p = t;

            for (int hops = 0; ; hops++) {

            //獲得p節點的下一個節點。

                Node</e><e> next = succ(p);

     //next節點不為空,說明p不是尾節點,需要更新p后在將它指向next節點

                if (next != null) {

                   //循環了兩次及其以上,并且當前節點還是不等于尾節點

                    if (hops > HOPS && t != tail)

                        continue retry;

                    p = next;

                }

                //如果p是尾節點,則設置p節點的next節點為入隊節點。

                else if (p.casNext(null, n)) {

                  //如果tail節點有大于等于1個next節點,則將入隊節點設置成tair節點,更新失敗了也沒關系,因為失敗了表示有其他線程成功更新了tair節點。

if (hops >= HOPS)

                        casTail(t, n); // 更新tail節點,允許失敗

                    return true;

                }

               // p有next節點,表示p的next節點是尾節點,則重新設置p節點

                else {

                    p = succ(p);

                }

            }

        }

    }

從源代碼角度來看整個入隊過程主要做二件事情。第一是定位出尾節點,第二是使用CAS算法能將入隊節點設置成尾節點的next節點,如不成功則重試。

第一步定位尾節點。tail節點并不總是尾節點,所以每次入隊都必須先通過tail節點來找到尾節點,尾節點可能就是tail節點,也可能是tail節點的next節點。代碼中循環體中的第一個if就是判斷tail是否有next節點,有則表示next節點可能是尾節點。獲取tail節點的next節點需要注意的是p節點等于p的next節點的情況,只有一種可能就是p節點和p的next節點都等于空,表示這個隊列剛初始化,正準備添加第一次節點,所以需要返回head節點。獲取p節點的next節點代碼如下

final Node</e><e> succ(Node</e><e> p) {

         Node</e><e> next = p.getNext();

         return (p == next) ? head : next;

     }

第二步設置入隊節點為尾節點。p.casNext( null , n)方法用于將入隊節點設置為當前隊列尾節點的next節點,p如果是null表示p是當前隊列的尾節點,如果不為null表示有其他線程更新了尾節點,則需要重新獲取當前隊列的尾節點。

hops 的設計意圖 。上面分析過對于先進先出的隊列入隊所要做的事情就是將入隊節點設置成尾節點,doug lea寫的代碼和邏輯還是稍微有點復雜。那么我用以下方式來實現行不行?

public boolean offer(E e) {

       if (e == null)

         throw new NullPointerException();

      Node</e><e> n = new Node</e><e>(e);

      for (;;) {

         Node</e><e> t = tail;

         if (t.casNext(null, n) && casTail(t, n)) {

            return true;

         }

      }

    }

讓tail節點永遠作為隊列的尾節點,這樣實現代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點就是每次都需要使用循環CAS更新tail節點。如果能減少CAS更新tail節點的次數,就能提高入隊的效率,所以doug lea使用hops變量來控制并減少tail節點的更新頻率,并不是每次節點入隊后都將 tail節點更新成尾節點,而是當 tail節點和尾節點的距離大于等于常量HOPS的值(默認等于1)時才更新tail節點,tail和尾節點的距離越長使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為循環體需要多循環一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠遠大于讀操作,所以入隊效率會有所提升。

private static final int HOPS = 1;

還有一點需要注意的是入隊方法永遠返回true,所以不要通過返回值判斷入隊是否成功。

5.    出隊列

    出隊列的就是 從隊列里返回一個節點元素 ,并清空該節點對元素的引用。讓我們通過每個節點出隊的快照來觀察下head節點的變化。

從上圖可知,并不是每次出隊時都更新head節點,當head節點里有元素時,直接彈出head節點里的元素,而不會更新head節點。只有當head節點里沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變量來減少使用CAS更新head節點的消耗,從而提高出隊效率。讓我們再通過源碼來深入分析下出隊過程。

public E poll() {

           Node</e><e> h = head;

       // p表示頭節點,需要出隊的節點

           Node</e><e> p = h;

           for (int hops = 0;; hops++) {

                // 獲取p節點的元素

                E item = p.getItem();

                // 如果p節點的元素不為空,使用CAS設置p節點引用的元素為null,如果成功則返回p節點的元素。

                if (item != null && p.casItem(item, null)) {

                     if (hops >= HOPS) {

                          //將p節點下一個節點設置成head節點

                          Node</e><e> q = p.getNext();

                          updateHead(h, (q != null) ? q : p);

                     }

                     return item;

                }

                // 如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外一個線程修改了。那么獲取p節點的下一個節點

                Node</e><e> next = succ(p);

                // 如果p的下一個節點也為空,說明這個隊列已經空了

                if (next == null) {

              // 更新頭節點。

                     updateHead(h, p);

                     break;

                }

                // 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點

                p = next;

           }

           return null;

     }

首先獲取頭節點的元素,然后判斷頭節點元素是否為空,如果為空,表示另外一個線程已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設置成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個線程已經進行了一次出隊操作更新了head節點,導致元素發生了變化,需要重新獲取頭節點。

6.    參考資料

 本文由用戶 trll7485 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!