java多線程-BlockingQueue
來自: http://www.cnblogs.com/lcngu/p/5224476.html
- BlockingQueue簡介
ArrayBlockingQueue:基于數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。并且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基于鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE,每次插入后都將動態地創建鏈接節點。
PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素,依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。
- BlockingQueue內容
BlockingQueue主要方法:
| 拋出異常 | 特殊值 | 阻塞 | 超時 | |
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
對于非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。
- BlockingQueue實現原理
以ArrayBlockingQueue為例,查看其源代碼,其中主要包含以下對象:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** 數組對象,用于放置對象 */
final Object[] items;
/** put, offer, or add方法放入數組的索引 */
int putIndex;
/** take, poll, peek or remove方法取出數據的數組索引 */
int takeIndex;
/** queue隊列的總數 */
int count;
/**可重入鎖,控制并發*/
final ReentrantLock lock;
/** 非空信號量,可以取數*/
private final Condition notEmpty;
/** 非滿信號量,可以放數 */
private final Condition notFull;
}</pre>
下面主要介紹下put()和take()方法,來觀察其同步的實現:
1 public void put(E e) throws InterruptedException {
2 checkNotNull(e);
3 final ReentrantLock lock = this.lock;
4 lock.lockInterruptibly();
5 try {
6 while (count == items.length)
7 notFull.await();
8 insert(e);
9 } finally {
10 lock.unlock();
11 }
12 }
1 public E take() throws InterruptedException {
2 final ReentrantLock lock = this.lock;
3 lock.lockInterruptibly();
4 try {
5 while (count == 0)
6 notEmpty.await();
7 return extract();
8 } finally {
9 lock.unlock();
10 }
11 }
大家應該明白了阻塞隊列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者的思路類似,只不過它把這些工作一起集成到了阻塞隊列中實現。并且在前面Condition中我們也模擬實現了一個阻塞隊列,實現與其大同小異。
- BlockingQueue應用
1:啟動兩個線程實現互斥等待:
1 public class BlockingQueueTest {
2 public static void main(String[] args) {
3 final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
4 for (int i = 0; i < 2; i++) {
5 new Thread(new Runnable() {
6 @Override
7 public void run() {
8 while (true) {
9 System.out.println("Thread "+Thread.currentThread().getName()+"正在準備放入數據");
10 try {
11 //模擬線程的放數速度
12 Thread.sleep(new Random().nextInt(1000));
13 } catch (InterruptedException e) {
14 // TODO Auto-generated catch block
15 e.printStackTrace();
16 }
17 try {
18 queue.put(1);
19 } catch (InterruptedException e) {
20 // TODO Auto-generated catch block
21 e.printStackTrace();
22 }
23 System.out.println("Thread "+Thread.currentThread().getName()+"放入數據,此時隊列中的數據為:"+queue.size());
24 }
25 }
26 }).start();
27 new Thread(new Runnable() {
28 @Override
29 public void run() {
30 while (true) {
31 System.out.println("Thread "+Thread.currentThread().getName()+"正在取得數據");
32 try {
33 //模擬線程的去數速度
34 Thread.sleep(100);
35 } catch (InterruptedException e) {
36 // TODO Auto-generated catch block
37 e.printStackTrace();
38 }
39 try {
40 queue.take();
41 } catch (InterruptedException e) {
42 // TODO Auto-generated catch block
43 e.printStackTrace();
44 }
45 System.out.println("Thread "+Thread.currentThread().getName()+"取得數據,此時隊列中的數據為:"+queue.size());
46 }
47 }
48 }).start();
49 }
50
51 }
52 }
2:前面介紹傳統線程通信中,主線程和子線程交替運行,現在以阻塞隊列來實現。
1 public class BlockingQueueCommunication {
2 public static void main(String[] args) {
3 final Business business = new Business();
4 new Thread(new Runnable() {
5
6 @Override
7 public void run() {
8 // TODO Auto-generated method stub
9 for (int i = 0; i < 50; i++) {
10 try {
11 business.sub(i);
12 } catch (InterruptedException e) {
13 // TODO Auto-generated catch block
14 e.printStackTrace();
15 }
16 }
17 }
18 }).start();
19 for (int i = 0; i < 50; i++) {
20 try {
21 business.main(i);
22 } catch (InterruptedException e) {
23 // TODO Auto-generated catch block
24 e.printStackTrace();
25 }
26 }
27 }
28 static class Business{
29 BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
30 BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
31 {
32 try {
33 queue2.put(1);//保證queue2阻塞
34 } catch (InterruptedException e) {
35 // TODO Auto-generated catch block
36 e.printStackTrace();
37 }
38 }
39
40 public void main(int i) throws InterruptedException{
41 queue1.put(1);//阻塞queue1
42 for (int j = 0; j < 100; j++) {
43 System.out.println("main thread is looping of "+j +" in " + i);
44 }
45 queue2.take();//喚醒queue2
46 }
47 public void sub(int i) throws InterruptedException{
48 queue2.put(1);//阻塞queue2
49 for (int j = 0; j < 10; j++) {
50 System.out.println("sub thread is looping of "+j +" in " + i);
51 }
52 queue1.take();//喚醒queue1
53 }
54 }
55 }
BlockingQueue實現了線程同步,不可在方法中再次加入同步限制,否則會出現死鎖。
3:在API中有一個阻塞對象實現生產者和消費者的例子
1 class Producer implements Runnable {
2 private final BlockingQueue queue;
3 Producer(BlockingQueue q) { queue = q; }
4 public void run() {
5 try {
6 while(true) { queue.put(produce()); }
7 } catch (InterruptedException ex) { ... handle ...}
8 }
9 Object produce() { ... }
10 }
11
12 class Consumer implements Runnable {
13 private final BlockingQueue queue;
14 Consumer(BlockingQueue q) { queue = q; }
15 public void run() {
16 try {
17 while(true) { consume(queue.take()); }
18 } catch (InterruptedException ex) { ... handle ...}
19 }
20 void consume(Object x) { ... }
21 }
22
23 class Setup {
24 void main() {
25 BlockingQueue q = new SomeQueueImplementation();
26 Producer p = new Producer(q);
27 Consumer c1 = new Consumer(q);
28 Consumer c2 = new Consumer(q);
29 new Thread(p).start();
30 new Thread(c1).start();
31 new Thread(c2).start();
32 }
33 }
使用阻塞隊列代碼要簡單得多,不需要再單獨考慮同步和線程間通信的問題。
在并發編程中,一般推薦使用阻塞隊列,這樣實現可以盡量地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然后解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。
參考資料:http://www.cnblogs.com/dolphin0520/p/3932906.html
javaAPI
</div>