今天要講的是arrayblockqueue,arrayblockqueue是juc提供的線程安全的有界的阻塞隊列,一看到array,第一反應:這貨肯定和數組有關,既然是數組,那自然是有界的了,我們先來看看arrayblockqueue的基本使用方法,然后再看看arrayblockqueue的源碼。
arrayblockqueue基本使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public static void main(string[] args) throws interruptedexception { arrayblockingqueue<integer> arrayblockingqueue= new arrayblockingqueue( 5 ); arrayblockingqueue.offer( 10 ); arrayblockingqueue.offer( 50 ); arrayblockingqueue.add( 20 ); arrayblockingqueue.add( 60 ); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.poll()); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.take()); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.peek()); system.out.println(arrayblockingqueue); } |
運行結果:
- 創建了一個長度為5的arrayblockqueue。
- 用offer方法,向arrayblockqueue添加了兩個元素,分別是10,50。
- 用put方法,向arrayblockqueue添加了兩個元素,分別是20,60。
- 打印出arrayblockqueue,結果是10,50,20,60。
- 用poll方法,彈出arrayblockqueue第一個元素,并且打印出來:10。
- 打印出arrayblockqueue,結果是50,20,60。
- 用take方法,彈出arrayblockqueue第一個元素,并且打印出來:50。
- 打印出arrayblockqueue,結果是20,60。
- 用peek方法,彈出arrayblockqueue第一個元素,并且打印出來:20。
- 打印出arrayblockqueue,結果是20,60。
代碼比較簡單,但是你肯定會有疑問
- offer/add(在上面的代碼中沒有演示)/put都是往隊列里面添加元素,區別是什么?
- poll/take/peek都是彈出隊列的元素,區別是什么?
- 底層代碼是如何保證線程安全的?
- 數據保存在哪里?
要解決上面幾個疑問,最好的辦法當然是看下源碼,通過親自閱讀源碼所產生的印象遠遠要比看視頻,看博客,死記硬背最后的結論要深刻的多。就算真的忘記了,只要再看看源碼,瞬間可以回憶起來。
arrayblockqueue源碼解析
構造方法
arrayblockqueue提供了三個構造方法,如下圖所示:
arrayblockingqueue(int capacity)
1
2
3
|
public arrayblockingqueue( int capacity) { this (capacity, false ); } |
這是最常用的構造方法,傳入capacity,capacity是容量的意思,也就是arrayblockingqueue的最大長度,方法內部直接調用了第二個構造方法,傳入的第二個參數為false。
arrayblockingqueue(int capacity, boolean fair)
1
2
3
4
5
6
7
8
|
public arrayblockingqueue( int capacity, boolean fair) { if (capacity <= 0 ) throw new illegalargumentexception(); this .items = new object[capacity]; lock = new reentrantlock(fair); notempty = lock.newcondition(); notfull = lock.newcondition(); } |
這個構造方法接受兩個參數,分別是capacity和fair,fair是boolean類型的,代表是公平鎖,還是非公平鎖,可以看出如果我們用第一個構造方法來創建arrayblockingqueue的話,采用的是非公平鎖,因為公平鎖會損失一定的性能,在沒有充足的理由的情況下,是沒有必要采用公平鎖的。
方法內部做了幾件事情:
- 創建object類型的數組,容量為capacity,并且賦值給當前類對象的items。
- 創建排他鎖。
- 創建條件變量notempty 。
- 創建條件變量notfull。
至于排他鎖和兩個條件變量是做什么用的,看到后面就明白了。
arrayblockingqueue(int capacity, boolean fair,collection<? extends e> c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public arrayblockingqueue( int capacity, boolean fair, collection<? extends e> c) { //調用第二個構造方法,方法內部就是初始化數組,排他鎖,兩個條件變量 this (capacity, fair); final reentrantlock lock = this .lock; lock.lock(); // 開啟排他鎖 try { int i = 0 ; try { // 循環傳入的集合,把集合中的元素賦值給items數組,其中i會自增 for (e e : c) { checknotnull(e); items[i++] = e; } } catch (arrayindexoutofboundsexception ex) { throw new illegalargumentexception(); } count = i; //把i賦值給count //如果i==capacity,也就是到了最大容量,把0賦值給putindex,否則把i賦值給putindex putindex = (i == capacity) ? 0 : i; } finally { lock.unlock(); //釋放排他鎖 } } |
- 調用第二個構造方法,方法內部就是初始化數組items,排他鎖lock,以及兩個條件變量。
- 開啟排他鎖。
- 循環傳入的集合,將集合中的元素賦值給items數組,其中i會自增。
- 把i賦值給count。
- 如果i==capacity,說明到了最大的容量,就把0賦值給putindex,否則把i賦值給putindex。
- 在finally中釋放排他鎖。
看到這里,我們應該明白這個構造方法的作用是什么了,就是把傳入的集合作為arrayblockingqueuede初始化數據,但是我們又會有一個新的疑問:count,putindex 是做什么用的。
offer(e e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public boolean offer(e e) { checknotnull(e); final reentrantlock lock = this .lock; lock.lock(); //開啟排他鎖 try { if (count == items.length) //如果count==items.length,返回false return false ; else { enqueue(e); //入隊 return true ; //返回true } } finally { lock.unlock(); //釋放鎖 } } |
- 開啟排他鎖。
- 如果count==items.length,也就是到了最大的容量,返回false。
- 如果count<items.length,執行入隊方法,并且返回true。
- 釋放排他鎖。
看到這里,我們應該可以明白了,arrayblockqueue是如何保證線程安全的,還是利用了reentrantlock排他鎖,count就是用來保存數組的當前大小的。我們再來看看enqueue方法。
1
2
3
4
5
6
7
8
|
private void enqueue(e x) { final object[] items = this .items; items[putindex] = x; if (++putindex == items.length) putindex = 0 ; count++; notempty.signal(); } |
這方法比較簡單,在代碼里面就不寫注釋了,做了如下的操作:
- 把x賦值給items[putindex] 。
- 將putindex進行自增,如果自增后的值 == items.length,把0賦值給putindex 。
- 執行count++操作。
- 調用條件變量notempty的signal方法,說明在某個地方,必定調用了notempty的await方法,這里就是喚醒因為調用notempty的await方法而被阻塞的線程。
這里就解答了一個疑問:putindex是做什么的,就是入隊元素的下標。
add(e e)
1
2
3
|
public boolean add(e e) { return super .add(e); } |
1
2
3
4
5
6
|
public boolean add(e e) { if (offer(e)) return true ; else throw new illegalstateexception( "queue full" ); } |
這個方法內部最終還是調用的offer方法。
put(e e)
1
2
3
4
5
6
7
8
9
10
11
12
|
public void put(e e) throws interruptedexception { checknotnull(e); final reentrantlock lock = this .lock; lock.lockinterruptibly(); //開啟響應中斷的排他鎖 try { while (count == items.length) //如果隊列滿了,調用notfull的await notfull.await(); enqueue(e); //入隊 } finally { lock.unlock(); //釋放排他鎖 } } |
- 開啟響應中斷的排他鎖,如果在獲取鎖的過程中,當前的線程被中斷,會拋出異常。
- 如果隊列滿了,調用notfull的await方法,說明在某個地方,必定調用了notfull的signal方法來喚醒當前線程,這里用while循環是為了防止虛假喚醒。
- 執行入隊操作。
- 釋放排他鎖。
可以看到put方法和 offer/add方法的區別了:
- offer/add:如果隊列滿了,直接返回false。
- put:如果隊列滿了,當前線程被阻塞,等待喚醒。
poll()
1
2
3
4
5
6
7
8
9
|
public e poll() { final reentrantlock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } |
- 開啟排他鎖。
- 如果count==0,直接返回null,否則執行dequeue出隊操作。
- 釋放排他鎖。
我們來看dequeue方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private e dequeue() { final object[] items = this .items; @suppresswarnings ( "unchecked" ) e x = (e) items[takeindex]; //獲得元素的值 items[takeindex] = null ; //把null賦值給items[takeindex] if (++takeindex == items.length) //如果takeindex自增后的值== items.length,就把0賦值給takeindex takeindex = 0 ; count--; if (itrs != null ) itrs.elementdequeued(); notfull.signal(); //喚醒因為調用notfull的await方法而被阻塞的線程 return x; } |
- 獲取元素的值,takeindex保存的是出隊的下標。
- 把null賦值給items[takeindex],也就是清空被彈出的元素。
- 如果takeindex自增后的值== items.length,就把0賦值給takeindex。
- count--。
- 喚醒因為調用notfull的await方法而被阻塞的線程。
這里調用了notfull的signal方法來喚醒因為調用notfull的await方法而被阻塞的線程,那到底在哪里調用了notfull的await方法呢,還記不記得在put方法中調用了notfull的await方法,我們再看看:
1
2
|
while (count == items.length) notfull.await(); |
當隊列滿了,就調用 notfull.await()來等待,在出隊操作中,又調用了notfull.signal()來喚醒。
take()
1
2
3
4
5
6
7
8
9
10
11
|
public e take() throws interruptedexception { final reentrantlock lock = this .lock; lock.lockinterruptibly(); try { while (count == 0 ) notempty.await(); return dequeue(); } finally { lock.unlock(); } } |
- 開啟排他鎖。
- 如果count==0,代表隊列是空的,則調用notempty的await方法,用while循環是為了防止虛假喚醒。
- 執行出隊操作。
- 釋放排他鎖。
這里調用了notempty的await方法,那么哪里調用了notempty的signal方法呢?在enqueue入隊方法里。
我們可以看到take和poll的區別:
- take:如果隊列為空,會阻塞,直到被喚醒了。
- poll: 如果隊列為空,直接返回null。
peek()
1
2
3
4
5
6
7
8
9
|
public e peek() { final reentrantlock lock = this .lock; lock.lock(); try { return itemat(takeindex); } finally { lock.unlock(); } } |
1
2
3
|
final e itemat( int i) { return (e) items[i]; } |
- 開啟排他鎖。
- 獲得元素。
- 釋放排他鎖。
我們可以看到peek和poll/take的區別:
- peek,只是獲取元素,不會清空元素。
- poll/take,獲取并清空元素。
size()
1
2
3
4
5
6
7
8
9
|
public int size() { final reentrantlock lock = this .lock; lock.lock(); try { return count; } finally { lock.unlock(); } } |
- 開啟排他鎖。
- 返回count。
- 釋放排他鎖。
總結
至此,arrayblockqueue的核心源碼就分析完畢了,我們來做一個總結:
- arrayblockqueue有幾個比較重要的字段,分別是items,保存的是隊列的數據,putindex保存的是入隊的下標,takeindex保存的是出隊的下標,count用來統計隊列元素的個數,lock用來保證線程的安全性,notempty和notfull兩個條件變量實現喚醒和阻塞。
- offer和add是一樣的,其中add方法內部調用的就是offer方法,如果隊列滿了,直接返回false。
- put,如果隊列滿了,會被阻塞。
- peek,只是彈出元素,不會清空元素。
- poll,彈出并清空元素,如果隊列為空,直接返回null。
- take,彈出并清空元素,如果隊列為空,會被阻塞。
以上所述是小編給大家介紹的arrayblockqueue源碼解析詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://www.cnblogs.com/CodeBear/p/10668582.html