死磕 Java 并發:J.U.C 之阻塞隊列 - PriorityBlockingQueue
隊列是比較常見的數據結構,我們也經常使用到,BlockingQueue常用于生產者消費者場景,在Java的并發包中已經提供了BlockingQueue的實現。
J.U.C之AQS傳送門: 【死磕Java并發】—–J.U.C之AQS(一篇就夠了)
PriorityBlockingQueue介紹
我們知道線程Thread可以調用setPriority(int newPriority)來設置優先級的,線程優先級高的線程先執行,優先級低的后執行。而前面介紹的ArrayBlockingQueue、LinkedBlockingQueue都是采用FIFO原則來確定線程執行的先后順序,那么有沒有一個隊列可以支持優先級呢? PriorityBlockingQueue 。
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采用自然順序升序排序,當然我們也可以通過構造函數來指定Comparator來對元素進行排序。需要注意的是PriorityBlockingQueue不能保證同優先級元素的順序。
二叉堆
由于PriorityBlockingQueue底層采用二叉堆來實現的,所以有必要先介紹下二叉堆。
二叉堆是一種特殊的堆,就結構性而言就是完全二叉樹或者是近似完全二叉樹,滿足樹結構性和堆序性。樹機構特性就是完全二叉樹應該有的結構,堆序性則是:父節點的鍵值總是保持固定的序關系于任何一個子節點的鍵值,且每個節點的左子樹和右子樹都是一個二叉堆。它有兩種表現形式:最大堆、最小堆。
最大堆:父節點的鍵值總是大于或等于任何一個子節點的鍵值(下右圖)
最小堆:父節點的鍵值總是小于或等于任何一個子節點的鍵值(下走圖)
二叉堆一般用數組表示,如果父節點的節點位置在n處,那么其左孩子節點為:2 * n + 1 ,其右孩子節點為2 * (n + 1),其父節點為(n - 1) / 2 處。上左圖的數組表現形式為:
二叉堆的基本結構了解了,下面來看看二叉堆的添加和刪除節點。二叉堆的添加和刪除相對于二叉樹來說會簡單很多。
添加元素
首先將要添加的元素N插添加到堆的末尾位置(在二叉堆中我們稱之為空穴)。如果元素N放入空穴中而不破壞堆的序(其值大于跟父節點值(最大堆是小于父節點)),那么插入完成。否則,我們則將該元素N的節點與其父節點進行交換,然后與其新父節點進行比較直到它的父節點不在比它小(最大堆是大)或者到達根節點。
假如有如下一個二叉堆
這是一個最小堆,其父節點總是小于等于任一一個子節點。現在我們添加一個元素2。
第一步:在末尾添加一個元素2,如下:
第二步:元素2比其父節點6小,進行替換,如下:
第三步:繼續與其父節點5比較,小于,替換:
第四步:繼續比較其跟節點1,發現跟節點比自己小,則完成,到這里元素2插入完畢。所以整個添加元素過程可以概括為:在元素末尾插入元素,然后不斷比較替換直到不能移動為止。
復雜度:Ο(logn)
刪除元素
刪除元素與增加元素一樣,需要維護整個二叉堆的序。刪除位置1的元素(數組下標0),則把最后一個元素空出來移到最前邊,然后和它的兩個子節點比較,如果兩個子節點中較小的節點小于該節點,就將他們交換,知道兩個子節點都比該元素大為止。
就上面二叉堆而言,刪除的元素為元素1。
第一步:刪掉元素1,元素6空出來,如下:
第二步:與其兩個子節點(元素2、元素3)比較,都小,將其中較小的元素(元素2)放入到該空穴中:
第三步:繼續比較兩個子節點(元素5、元素7),還是都小,則將較小的元素(元素5)放入到該空穴中:
第四步:比較其子節點(元素8),比該節點小,則元素6放入該空穴位置不會影響二叉堆的樹結構,放入:
到這里整個刪除操作就已經完成了。
二叉堆的添加、刪除操作還是比較簡單的,很容易就理解了。下面我們就參考該內容來開啟PriorityBlockingQueue的源代碼研究。
PriorityBlockingQueue
PriorityBlockingQueue繼承AbstractQueue,實現BlockingQueue接口。
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
定義了一些屬性
// 默認容量 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 二叉堆數組 private transient Object[] queue; // 隊列元素的個數 private transient int size; // 比較器,如果為空,則為自然順序 private transient Comparator<? super E> comparator; // 內部鎖 private final ReentrantLock lock; private final Condition notEmpty; // private transient volatile int allocationSpinLock; // 優先隊列:主要用于序列化,這是為了兼容之前的版本。只有在序列化和反序列化才非空 private PriorityQueue<E> q;
內部仍然采用可重入鎖ReentrantLock來實現同步機制,但是這里只有一個notEmpty的Condition,了解了ArrayBlockingQueue我們知道它定義了兩個Condition,之類為何只有一個呢?原因就在于PriorityBlockingQueue是一個無界隊列,插入總是會成功,除非消耗盡了資源導致服務器掛。
入列
PriorityBlockingQueue提供put()、add()、offer()方法向隊列中加入元素。我們這里從put()入手:put(E e) :將指定元素插入此優先級隊列。
public void put(E e) { offer(e); // never need to block }
PriorityBlockingQueue是無界的,所以不可能會阻塞。內部調用offer(E e):
public boolean offer(E e) { // 不能為null if (e == null) throw new NullPointerException(); // 獲取鎖 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 根據比較器是否為null,做不同的處理 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 喚醒正在等待的消費者線程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
siftUpComparable
當比較器comparator為null時,采用自然排序,調用siftUpComparable方法:
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; // “上冒”過程 while (k > 0) { // 父級節點 (n - ) / 2 int parent = (k - 1) >>> 1; Object e = array[parent]; // key >= parent 完成(最大堆) if (key.compareTo((T) e) >= 0) break; // key < parant 替換 array[k] = e; k = parent; } array[k] = key; }
這段代碼所表示的意思:將元素X插入到數組中,然后進行調整以保持二叉堆的特性。
siftUpUsingComparator
當比較器不為null時,采用所指定的比較器,調用siftUpUsingComparator方法:
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
擴容:tryGrow
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // 擴容操作使用自旋,不需要鎖主鎖,釋放 Object[] newArray = null; // CAS 占用 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 新容量 最小翻倍 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // 超過 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; // 最大容量 } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; // 擴容后allocationSpinLock = 0 代表釋放了自旋鎖 } } // 到這里如果是本線程擴容newArray肯定是不為null,為null就是其他線程在處理擴容,那就讓給別的線程處理 if (newArray == null) Thread.yield(); // 主鎖獲取鎖 lock.lock(); // 數組復制 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
整個添加元素的過程和上面二叉堆一模一樣:先將元素添加到數組末尾,然后采用“上冒”的方式將該元素盡量往上冒。
出列
PriorityBlockingQueue提供poll()、remove()方法來執行出對操作。出對的永遠都是第一個元素:array[0]。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } }
先獲取鎖,然后調用dequeue()方法:
private E dequeue() { // 沒有元素 返回null int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 出對元素 E result = (E) array[0]; // 最后一個元素(也就是插入到空穴中的元素) E x = (E) array[n]; array[n] = null; // 根據比較器釋放為null,來執行不同的處理 Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
siftDownComparable
如果比較器為null,則調用siftDownComparable來進行自然排序處理:
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; // 最后一個葉子節點的父節點位置 int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; // 待調整位置左節點位置 Object c = array[child]; //左節點 int right = child + 1; //右節點 //左右節點比較,取較小的 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //如果待調整key最小,那就退出,直接賦值 if (key.compareTo((T) c) <= 0) break; //如果key不是最小,那就取左右節點小的那個放到調整位置,然后小的那個節點位置開始再繼續調整 array[k] = c; k = child; } array[k] = key; } }
處理思路和二叉堆刪除節點的邏輯一樣:就第一個元素定義為空穴,然后把最后一個元素取出來,嘗試插入到空穴位置,并與兩個子節點值進行比較,如果不符合,則與其中較小的子節點進行替換,然后繼續比較調整。
siftDownUsingComparator
如果指定了比較器,則采用比較器來進行調整:
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
PriorityBlockingQueue采用二叉堆來維護,所以整個處理過程不是很復雜,添加操作則是不斷“上冒”,而刪除操作則是不斷“下掉”。掌握二叉堆就掌握了PriorityBlockingQueue,無論怎么變還是不離其宗。對于PriorityBlockingQueue需要注意的是他是一個無界隊列,所以添加操作是不會失敗的,除非資源耗盡。
來自:http://cmsblogs.com