死磕 Java 并發:J.U.C 之阻塞隊列 - PriorityBlockingQueue

zl080716 6年前發布 | 37K 次閱讀 Java 阻塞隊列 并發 Java開發

隊列是比較常見的數據結構,我們也經常使用到,BlockingQueue常用于生產者消費者場景,在Java的并發包中已經提供了BlockingQueue的實現。

J.U.C之AQS傳送門: 【死磕Java并發】—–J.U.C之AQS(一篇就夠了)

PriorityBlockingQueue介紹

我們知道線程Thread可以調用setPriority(int newPriority)來設置優先級的,線程優先級高的線程先執行,優先級低的后執行。而前面介紹的ArrayBlockingQueue、LinkedBlockingQueue都是采用FIFO原則來確定線程執行的先后順序,那么有沒有一個隊列可以支持優先級呢? PriorityBlockingQueue 。

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采用自然順序升序排序,當然我們也可以通過構造函數來指定Comparator來對元素進行排序。需要注意的是PriorityBlockingQueue不能保證同優先級元素的順序。

二叉堆

由于PriorityBlockingQueue底層采用二叉堆來實現的,所以有必要先介紹下二叉堆。

二叉堆是一種特殊的堆,就結構性而言就是完全二叉樹或者是近似完全二叉樹,滿足樹結構性和堆序性。樹機構特性就是完全二叉樹應該有的結構,堆序性則是:父節點的鍵值總是保持固定的序關系于任何一個子節點的鍵值,且每個節點的左子樹和右子樹都是一個二叉堆。它有兩種表現形式:最大堆、最小堆。

最大堆:父節點的鍵值總是大于或等于任何一個子節點的鍵值(下右圖)

最小堆:父節點的鍵值總是小于或等于任何一個子節點的鍵值(下走圖)

二叉堆一般用數組表示,如果父節點的節點位置在n處,那么其左孩子節點為:2 * n + 1 ,其右孩子節點為2 * (n + 1),其父節點為(n - 1) / 2 處。上左圖的數組表現形式為:

二叉堆的基本結構了解了,下面來看看二叉堆的添加和刪除節點。二叉堆的添加和刪除相對于二叉樹來說會簡單很多。

添加元素

首先將要添加的元素N插添加到堆的末尾位置(在二叉堆中我們稱之為空穴)。如果元素N放入空穴中而不破壞堆的序(其值大于跟父節點值(最大堆是小于父節點)),那么插入完成。否則,我們則將該元素N的節點與其父節點進行交換,然后與其新父節點進行比較直到它的父節點不在比它小(最大堆是大)或者到達根節點。

假如有如下一個二叉堆

這是一個最小堆,其父節點總是小于等于任一一個子節點。現在我們添加一個元素2。

第一步:在末尾添加一個元素2,如下:

第二步:元素2比其父節點6小,進行替換,如下:

第三步:繼續與其父節點5比較,小于,替換:

第四步:繼續比較其跟節點1,發現跟節點比自己小,則完成,到這里元素2插入完畢。所以整個添加元素過程可以概括為:在元素末尾插入元素,然后不斷比較替換直到不能移動為止。

復雜度:Ο(logn)

刪除元素

刪除元素與增加元素一樣,需要維護整個二叉堆的序。刪除位置1的元素(數組下標0),則把最后一個元素空出來移到最前邊,然后和它的兩個子節點比較,如果兩個子節點中較小的節點小于該節點,就將他們交換,知道兩個子節點都比該元素大為止。

就上面二叉堆而言,刪除的元素為元素1。

第一步:刪掉元素1,元素6空出來,如下:

第二步:與其兩個子節點(元素2、元素3)比較,都小,將其中較小的元素(元素2)放入到該空穴中:

第三步:繼續比較兩個子節點(元素5、元素7),還是都小,則將較小的元素(元素5)放入到該空穴中:

第四步:比較其子節點(元素8),比該節點小,則元素6放入該空穴位置不會影響二叉堆的樹結構,放入:

到這里整個刪除操作就已經完成了。

二叉堆的添加、刪除操作還是比較簡單的,很容易就理解了。下面我們就參考該內容來開啟PriorityBlockingQueue的源代碼研究。

PriorityBlockingQueue

PriorityBlockingQueue繼承AbstractQueue,實現BlockingQueue接口。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable 

定義了一些屬性

// 默認容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    // 最大容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 二叉堆數組
    private transient Object[] queue;
    // 隊列元素的個數
    private transient int size;
    // 比較器,如果為空,則為自然順序
    private transient Comparator<? super E> comparator;
    // 內部鎖
    private final ReentrantLock lock;
    private final Condition notEmpty;
    // 
    private transient volatile int allocationSpinLock;
    // 優先隊列:主要用于序列化,這是為了兼容之前的版本。只有在序列化和反序列化才非空
    private PriorityQueue<E> q;

內部仍然采用可重入鎖ReentrantLock來實現同步機制,但是這里只有一個notEmpty的Condition,了解了ArrayBlockingQueue我們知道它定義了兩個Condition,之類為何只有一個呢?原因就在于PriorityBlockingQueue是一個無界隊列,插入總是會成功,除非消耗盡了資源導致服務器掛。

入列

PriorityBlockingQueue提供put()、add()、offer()方法向隊列中加入元素。我們這里從put()入手:put(E e) :將指定元素插入此優先級隊列。

   public void put(E e) {
        offer(e); // never need to block
    }

PriorityBlockingQueue是無界的,所以不可能會阻塞。內部調用offer(E e):

public boolean offer(E e) {
        // 不能為null
        if (e == null)
            throw new NullPointerException();
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 擴容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 根據比較器是否為null,做不同的處理
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // 喚醒正在等待的消費者線程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

siftUpComparable

當比較器comparator為null時,采用自然排序,調用siftUpComparable方法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        // “上冒”過程
        while (k > 0) {
            // 父級節點 (n - ) / 2
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            // key >= parent 完成(最大堆)
            if (key.compareTo((T) e) >= 0)
                break;
            // key < parant 替換
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

這段代碼所表示的意思:將元素X插入到數組中,然后進行調整以保持二叉堆的特性。

siftUpUsingComparator

當比較器不為null時,采用所指定的比較器,調用siftUpUsingComparator方法:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

擴容:tryGrow

   private void tryGrow(Object[] array, int oldCap) {
        lock.unlock();      // 擴容操作使用自旋,不需要鎖主鎖,釋放
        Object[] newArray = null;
        // CAS 占用
        if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
            try {
                // 新容量  最小翻倍
                int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) :  (oldCap >> 1));
                // 超過
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;        // 最大容量
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;     // 擴容后allocationSpinLock = 0 代表釋放了自旋鎖
            }
        }
        // 到這里如果是本線程擴容newArray肯定是不為null,為null就是其他線程在處理擴容,那就讓給別的線程處理
        if (newArray == null)
            Thread.yield();
        // 主鎖獲取鎖
        lock.lock();
        // 數組復制
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

整個添加元素的過程和上面二叉堆一模一樣:先將元素添加到數組末尾,然后采用“上冒”的方式將該元素盡量往上冒。

出列

PriorityBlockingQueue提供poll()、remove()方法來執行出對操作。出對的永遠都是第一個元素:array[0]。

  public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

先獲取鎖,然后調用dequeue()方法:

  private E dequeue() {
        // 沒有元素 返回null
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 出對元素
            E result = (E) array[0];
            // 最后一個元素(也就是插入到空穴中的元素)
            E x = (E) array[n];
            array[n] = null;
            // 根據比較器釋放為null,來執行不同的處理
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

siftDownComparable

如果比較器為null,則調用siftDownComparable來進行自然排序處理:

 private static <T> void siftDownComparable(int k, T x, Object[] array,
 int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            // 最后一個葉子節點的父節點位置
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;       // 待調整位置左節點位置
                Object c = array[child];        //左節點
                int right = child + 1;          //右節點
                //左右節點比較,取較小的
                if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                //如果待調整key最小,那就退出,直接賦值
                if (key.compareTo((T) c) <= 0)
                    break;
                //如果key不是最小,那就取左右節點小的那個放到調整位置,然后小的那個節點位置開始再繼續調整
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

處理思路和二叉堆刪除節點的邏輯一樣:就第一個元素定義為空穴,然后把最后一個元素取出來,嘗試插入到空穴位置,并與兩個子節點值進行比較,如果不符合,則與其中較小的子節點進行替換,然后繼續比較調整。

siftDownUsingComparator

如果指定了比較器,則采用比較器來進行調整:

   private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 int n,
 Comparator<? super T> cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }

PriorityBlockingQueue采用二叉堆來維護,所以整個處理過程不是很復雜,添加操作則是不斷“上冒”,而刪除操作則是不斷“下掉”。掌握二叉堆就掌握了PriorityBlockingQueue,無論怎么變還是不離其宗。對于PriorityBlockingQueue需要注意的是他是一個無界隊列,所以添加操作是不會失敗的,除非資源耗盡。

 

 

來自:http://cmsblogs.com

 

 本文由用戶 zl080716 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!