Java并發編程:阻塞隊列BlockingQueue
阻塞隊列BlockingQueue簡介
阻塞隊列BlockingQueue是JDK1.5并發新特性中的內容,阻塞隊列首先是一個隊列,同樣實現了Collection接口。阻塞隊列提供了可阻塞的put和take方法,以及支持定時的poll和offer方法。
阻塞隊列跟普通隊列相比,首頁它是線程安全的,另外還提供了兩個附加操作:當隊列為空時,從隊列中獲取元素的操作將被阻塞;當隊列填滿是,向隊列添加元素將被阻塞。這兩個附加操作分別由BlockingQueue提供的兩個take和put方法支持。如果隊列已經滿了,那么put方法將被阻塞直到有空間可用;如果隊列為空,那么take方法將被阻塞直到有元素可用。隊列可以是有界的也可以是無界的,無界隊列永遠不會充滿,因此在無界隊列上面put方法也永遠不會被阻塞。
BlockingQueue提供了4中類型的處理方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 拋出異常: 當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
- 返回特殊值: 當隊列滿時,向隊列中添加元素,則返回false,否則返回true。當隊列為空時,向隊列中獲取元素,則返回null,否則返回元素。
- 一直阻塞: 當阻塞隊列滿時,如果生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列為空時,如果消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閑或響應中斷退出。
- 超時退出: 當隊列滿時,如果生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列為空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,如果超過指定時間退出返回null。
Java提供的7個阻塞隊列
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 加鎖 | arraylist |
LinkedBlockingQueue | 可選有界 | 加鎖 | 單向linkedlist |
PriorityBlockingQueue | 無界 | 加鎖 | Heap |
DelayQueue | 無界 | 加鎖 | Heap |
SynchronousQueue | 有界 | 無鎖(JDK1.6) | ~ |
LinkedTransferQueue | 無界 | 無鎖 | 單向linkedlist |
LinkedBlockingDeque | 可選有界 | 加鎖 | 雙向linkedlist |
在多線程環境中,通過隊列可以很容易的實現數據共享。在基于隊列的生產者-消費者模型中,數據生產時,生產者就把數據放入隊列,當消費者準備使用數據時就從隊列中取出數據。生產者不需要知道消費者的標識或者數量,或者他們是唯一的生產者。同樣,消費者也不需要知道生產者來自何處。BlockingQqueue簡化了生產者-消費者的過程,它支持任意數量的生產者-消費者。一種最常見的生產者-消費者模式就是線程池與工作隊列的組合,在Executor執行框架中就體現了這種模式。
阻塞隊列BlockingQueue的成員介紹
ArrayBlockingQueue
ArrayBlockingQueue是一個基于數組的阻塞隊列實現,內部維護了一個定長數組,以便緩存數據。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致放入操作受阻塞;試圖從空隊列中檢索元素將導致類似阻塞。ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
- ArrayBlockingQueue(int capacity) 創建一個帶有給定的(固定)容量和默認訪問策略(非公平鎖)的 ArrayBlockingQueue。capacity是隊列容量。
- ArrayBlockingQueue(int capacity, boolean fair) 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。fair訪問策略如果為 true,則按照 FIFO 順序訪問插入或移除時受阻塞線程的隊列,如果為 false,則訪問順序是不確定的。fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖,fair為false,表示是非公平鎖。
- ArrayBlockingQueue(int capacity, boolean fair, Collectionc) 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,并以 collection 迭代器的遍歷順序添加元素。
由于ArrayBlockingQueue內部只維護一個ReentrantLock類型的lock鎖對象,所以在生成者-消費者模型中,并不能真正的實現并行,這一點不同于LinkedBlockingQueue,LinkedBlockingQueue內部維護了兩個鎖。事實上ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
LinkedBlockingQueue
LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列,支持真正的并行操作,因為內部使用ReentrantLock實現插入鎖(putLock)和取出鎖(takeLock),維護了兩個所對象。其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
在開發中新建LinkedBlockingQueue實例的時候,一般要指定其大小,如果沒有指定大小,大小默認是Integer.MAX_VALUE,這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。在線程池框架Executors中newSingleThreadExecutor和newFixedThreadPool方法內部維護的都是LinkedBlockingQueue。
PriorityBlockingQueue
PriorityBlockingQueue是一個按照優先級排序的隊列,如果想要某個隊列不是按照FIFO的順序來處理元素,該隊列非常有用,內部維護一個堆的數據結構。PriorityBlockingQueue既可以根據元素的自然順序進行排序,如果元素實現了Comparable接口,也可以根據Comparator進行比較。該隊列看似有界隊列,實際上它會自動擴容,因此是無界隊列,因此在生產者-消費者模型中,生產者并不會真正的阻塞,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是非公平鎖。
DelayQueue
DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿后保存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且 poll 將返回null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小于等于 0 的值時,將發生到期。即使無法使用 take 或 poll移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不允許使用 null 元素。
DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。
SynchronousQueue
SynchronousQueue是這樣一種阻塞隊列,其中每個 put 必須等待一個take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有,它不會為隊列中的元素維護存儲空間。與其它隊列不同的是,它維護一組線程,這些線程在等待著元素加入或者移除隊列。不能在同步隊列上進行peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;也不能迭代隊列,因為其中沒有元素可用于迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 如果沒有已排隊線程,則不添加元素并且頭為null。SynchronousQueue類似于無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。
SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
創建SynchronousQueue有兩種構造方法,一種時SynchronousQueue(),默認采用非公平的形式,從JDK1.6開始SynchronousQueue的實現采用了一種性能更好的無鎖算法。競爭機制支持公平和非公平兩種:非公平競爭模式使用的數據結構是后進先出棧(LIFO Stack);公平競爭模式則使用先進先出隊列(FIFO),性能上兩者是相當的,一般情況下,FIFO通常可以支持更大的吞吐量,但LIFO可以更大程度的保持線程的本地化。另外一種SynchronousQueue(boolean fair),可以自己指定訪問方式是否采用公平方式。
LinkedTransferQueue
LinkedTransferQueue是JDK1.7中新引入的隊列,該隊列的實現基于CAS無鎖機制,它也是一個基于鏈表實現的無界隊列。相比前面隊列它多transfer和tryTransfer方法。
LinkedBlockingDeque
LinkedBlockingDeque一個基于已鏈接節點的、任選范圍的阻塞雙端隊列。可選的容量范圍構造方法參數是一種防止過度膨脹的方式。如果未指定容量,那么容量將等于 Integer.MAX_VALUE。只要插入元素不會使雙端隊列超出容量,每次插入后都將動態地創建鏈接節點。要想支持阻塞功能,隊列的容量一定是固定的,否則無法在入隊的時候掛起線程。也就是capacity是final類型的。
阻塞隊列示例
這是一個使用LinkedBlockedQueue設計實現的簡單的生產者-消費者模式。
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static final int DEFAULT_SLEEP = 1000;
private static AtomicInteger count = new AtomicInteger();
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
String data = null;
Random r = new Random();
System.out.println("啟動生產者線程!");
try {
while (isRunning) {
Thread.sleep(r.nextInt(DEFAULT_SLEEP));
data = "data:" + count.incrementAndGet();
queue.put(data);
System.out.println("將數據:" + data + "放入隊列...");
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生產者線程!");
}
}
public void stop() {
isRunning = false;
}
}
public class Consumer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static final int DEFAULT_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("啟動消費者線程!");
Random r = new Random();
try {
while (isRunning) {
String data = queue.take();
if (null != data) {
System.out.println("正在消費:" + data);
Thread.sleep(r.nextInt(DEFAULT_SLEEP));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費者線程!");
}
}
public void stop() {
isRunning = false;
}
}
public class MainTest {
public static void main(String[] args) throws InterruptedException {
// 聲明一個容量為10的緩存隊列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動線程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執行20s
Thread.sleep(20 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2 * 1000);
consumer.stop();
// 退出Executor
service.shutdown();
}
}
如果不使用阻塞隊列,使用Object.wait()和Object.notify()非阻塞隊列實現生產者-消費者模式,生產者線程在緩沖區為滿的時候,消費者在緩沖區為空的時候,都應該暫停運行。然后用notify 和notifyAll通知等待中的線程重新開始執行。
參考資料
Java并發編程實踐
來自:http://www.sunnyang.com/733.html