在上一章中,我們介紹了阻塞隊列blockingqueue,下面我們介紹它的常用實現類arrayblockingqueue。
一. 用數組來實現隊列
因為隊列這種數據結構的特殊要求,所以它天然適合用鏈表的方式來實現,用兩個變量分別記錄鏈表頭和鏈表尾,當刪除或插入隊列時,只要改變鏈表頭或鏈表尾就可以了,而且鏈表使用引用的方式鏈接的,所以它的容量幾乎是無限的。
那么怎么使用數組來實現隊列,我們需要四個變量:object[] array來存儲隊列中元素,headindex和tailindex分別記錄隊列頭和隊列尾,count記錄隊列的個數。
- 因為數組的長度是固定,所以當count==array.length時,表示隊列已經滿了,當count==0的時候,表示隊列是空的。
- 當添加元素的時候,將array[tailindex] = e將tailindex位置設置成新元素,之后將tailindex++自增,然后將count++自增。但是有兩點需要注意,在添加之前必須先判斷隊列是否已滿,不然會出現覆蓋已有元素。當tailindex的值等于數組最后一個位置的時候,需要將tailindex=0,循環利用數組
- 當刪除元素的時候,將先記錄下array[headindex] 元素,之后將headindex++自增,然后將count--自減。但是有兩點需要注意要注意,在刪除之前,必須先判斷隊列是否為空,不然可能會刪除已刪除的元素。
這里用了一個很巧妙的方式,我們知道當向隊列中插入一個元素,那么就占用了數組的一個位置,當刪除一個元素的時候,那么其實數組的這個位置就空閑出來了,表示這個位置又可以插入新元素了。
所以我們插入新元素前,必須檢查隊列是否已滿,刪除元素之前,必須檢查隊列是否為空。
二. arrayblockingqueue中重要成員變量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/** 儲存隊列的中元素 */ final object[] items; /** 隊列頭的位置 */ int takeindex; /** 隊列尾的位置 */ int putindex; /** 當前隊列擁有的元素個數 */ int count; /** 用來保證多線程操作共享變量的安全問題 */ final reentrantlock lock; /** 當隊列為空時,就會調用notempty的wait方法,讓當前線程等待 */ private final condition notempty; /** 當隊列為滿時,就會調用notfull的wait方法,讓當前線程等待 */ private final condition notfull; |
就是多了lock、notempty、notfull變量來實現多線程安全和線程等待條件的,它們三個是怎么操作的呢?
2.1 lock的作用
因為arrayblockingqueue是在多線程下操作的,所以修改items、takeindex、putindex和count這些成員變量時,必須要考慮多線程安全問題,所以這里使用lock獨占鎖,來保證并發操作的安全。
2.2 notempty與notfull的作用
因為阻塞隊列必須實現,當隊列為空或隊列已滿的時候,隊列的讀取或插入操作要等待。所以我們想到了并發框架下的condition對象,使用它來控制。
在aqs中,我們介紹了這個類的作用:
- await系列方法,會釋放當前鎖,并讓當前線程等待。
- signal與signalall方法,會喚醒當前線程。其實它并不是喚醒當前線程,而是將在這個condition條件上等待的線程,添加到lock鎖上的等待線程池中,所以當鎖被釋放時,會喚醒lock鎖上的等待線程池中一個線程。具體在aqs中有源碼分析。
三. 添加元素方法
3.1 add(e e)與offer(e e)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 調用abstractqueue父類中的方法。 public boolean add(e e) { // 通過調用offer來時實現 if (offer(e)) return true ; else throw new illegalstateexception( "queue full" ); } //向隊列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會拋出異常 public boolean offer(e e) { checknotnull(e); final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lock(); try { // 隊列已滿,添加元素失敗,返回false。 if (count == items.length) return false ; else { // 調用enqueue方法將元素插入隊列中 enqueue(e); return true ; } } finally { lock.unlock(); } } |
add方法是調用offer方法實現的。在offer方法中,必須先判斷隊列是否已滿,如果已滿就直接返回false,而不會阻塞當前線程。如果不滿就調用enqueue方法將元素插入隊列中。
注意:這里使用lock.lock()是保證同一時間只有一個線程修改成員變量,防止出現并發操作問題。雖然它也會阻塞當前線程,但是它并不是條件等待,只是因為鎖被其他線程持有,而arrayblockingqueue中方法操作時間都不長,這里相當于不阻塞線程。
3.2 put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 向隊列末尾新添加元素,如果隊列已滿,當前線程就等待。響應中斷異常 public void put(e e) throws interruptedexception { checknotnull(e); final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lockinterruptibly(); try { // 隊列已滿,則調用notfull.await()方法,讓當前線程等待,直到隊列不是滿的 while (count == items.length) notfull.await(); // 調用enqueue方法將元素插入隊列中 enqueue(e); } finally { lock.unlock(); } } |
與offer方法大體流程一樣,只是當隊列已滿的時候,會調用notfull.await()方法,讓當前線程阻塞等待,直到隊列被別的線程移除了元素,隊列不滿的時候,會喚醒這個等待線程。
3.3 offer(e e, long timeout, timeunit unit)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/** * 向隊列末尾新添加元素,如果隊列中沒有可用空間,當前線程就等待, * 如果等待時間超過timeout了,那么返回false,表示添加失敗 */ public boolean offer(e e, long timeout, timeunit unit) throws interruptedexception { checknotnull(e); // 計算一共最多等待的時間值nanos long nanos = unit.tonanos(timeout); final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lockinterruptibly(); try { // 隊列已滿 while (count == items.length) { // nanos <= 0表示最大等待時間已到,那么不用再等待了,返回false,表示添加失敗。 if (nanos <= 0 ) return false ; // 調用notfull.awaitnanos(nanos)方法,超時nanos時間會被自動喚醒, // 如果被提前喚醒,那么返回剩余的時間 nanos = notfull.awaitnanos(nanos); } // 調用enqueue方法將元素插入隊列中 enqueue(e); return true ; } finally { lock.unlock(); } } |
與put的方法大體流程一樣,只不過是調用notfull.awaitnanos(nanos)方法,讓當前線程等待,并設置等待時間。
四. 刪除元素方法
4.1 remove()和poll()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 調用abstractqueue父類中的方法。 public e remove() { // 通過調用poll來時實現 e x = poll(); if (x != null ) return x; else throw new nosuchelementexception(); } // 刪除隊列第一個元素(即隊列頭),并返回它。如果隊列是空的,它不會拋出異常,而是會返回null。 public e poll() { final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lock(); try { // 如果count == 0,列表為空,就返回null,否則調用dequeue方法,返回列表頭元素 return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } |
remove方法是調用poll()方法實現的。在 poll()方法中,如果列表為空,就返回null,否則調用dequeue方法,返回列表頭元素。
4.2 take()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/** * 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。響應中斷異常 */ public e take() throws interruptedexception { final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lockinterruptibly(); try { // 如果隊列為空,就調用notempty.await()方法,讓當前線程等待。 // 直到有別的線程向隊列中插入元素,那么這個線程會被喚醒。 while (count == 0 ) notempty.await(); // 調用dequeue方法,返回列表頭元素 return dequeue(); } finally { lock.unlock(); } } |
take()方法當隊列為空的時候,當前線程必須等待,直到有別的線程向隊列中插入新元素,那么這個線程會被喚醒。
4.3 poll(long timeout, timeunit unit)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * 返回并移除隊列第一個元素,如果隊列是空的,就前線程就等待。 * 如果等待時間超過timeout了,那么返回false,表示獲取元素失敗 */ public e poll( long timeout, timeunit unit) throws interruptedexception { // 計算一共最多等待的時間值nanos long nanos = unit.tonanos(timeout); final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lockinterruptibly(); try { // 隊列為空 while (count == 0 ) { // nanos <= 0表示最大等待時間已到,那么不用再等待了,返回null。 if (nanos <= 0 ) return null ; // 調用notempty.awaitnanos(nanos)方法讓檔期線程等待,并設置超時時間。 nanos = notempty.awaitnanos(nanos); } // 調用dequeue方法,返回列表頭元素 return dequeue(); } finally { lock.unlock(); } } |
與take()方法流程一樣,只不過調用awaitnanos(nanos)方法,讓當前線程等待,并設置等待時間。
五. 查看元素的方法
5.1 element()與peek() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 調用abstractqueue父類中的方法。 public e element() { e x = peek(); if (x != null ) return x; else throw new nosuchelementexception(); } // 查看隊列頭元素 public e peek() { final reentrantlock lock = this .lock; // 使用lock來保證,多線程修改成員屬性的安全 lock.lock(); try { // 返回當前隊列頭的元素 return itemat(takeindex); // null when queue is empty } finally { lock.unlock(); } } |
六. 其他重要方法
6.1 enqueue和dequeue方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 將元素x插入隊列尾 private void enqueue(e x) { // assert lock.getholdcount() == 1; // assert items[putindex] == null; //當前putindex位置元素一定是null final object[] items = this .items; items[putindex] = x; // 如果putindex等于items.length,那么將putindex重新設置為0 if (++putindex == items.length) putindex = 0 ; // 隊列數量加一 count++; // 因為插入一個元素,那么當前隊列肯定不為空,那么喚醒在notempty條件下等待的一個線程 notempty.signal(); } // 刪除隊列頭的元素,返回它 private e dequeue() { // assert lock.getholdcount() == 1; // assert items[takeindex] != null; final object[] items = this .items; // 得到當前隊列頭的元素 @suppresswarnings ( "unchecked" ) e x = (e) items[takeindex]; // 將當前隊列頭位置設置為null items[takeindex] = null ; if (++takeindex == items.length) takeindex = 0 ; // 隊列數量減一 count--; if (itrs != null ) itrs.elementdequeued(); // 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notfull條件下等待的一個線程 notfull.signal(); return x; } |
這兩個方法分別代表,向隊列中插入元素和從隊列中刪除元素。而且它們要喚醒等待的線程。當插入元素后,那么隊列一定不為空,那么就要喚醒在notempty條件下等待的線程。當刪除元素后,那么隊列一定不滿,那么就要喚醒在notfull條件下等待的線程。
6.2 remove(object o)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// 刪除隊列中對象o元素,最多刪除一條 public boolean remove(object o) { if (o == null ) return false ; final object[] items = this .items; // 使用lock來保證,多線程修改成員屬性的安全 final reentrantlock lock = this .lock; lock.lock(); try { // 當隊列中有值的時候,才進行刪除。 if (count > 0 ) { // 隊列尾下一個位置 final int putindex = this .putindex; // 隊列頭的位置 int i = takeindex; do { // 當前位置元素與被刪除元素相同 if (o.equals(items[i])) { // 刪除i位置元素 removeat(i); // 返回true return true ; } if (++i == items.length) i = 0 ; // 當i==putindex表示遍歷完所有元素 } while (i != putindex); } return false ; } finally { lock.unlock(); } } |
從隊列中刪除指定對象o,那么就要遍歷隊列,刪除第一個與對象o相同的元素,如果隊列中沒有對象o元素,那么返回false刪除失敗。
這里有兩點需要注意:
如何遍歷隊列,就是從隊列頭遍歷到隊列尾。就要靠takeindex和putindex兩個變量了。
為什么object[] items = this.items;這句代碼沒有放到同步鎖lock代碼塊內。items是成員變量,那么多線程操作的時候,不會有并發問題么?
這個是因為items是個引用變量,不是基本數據類型,而且我們對隊列的插入和刪除操作,都是針對這一個items數組,沒有改變數組的引用,所以在lock代碼中,items會得到其他線程對它最新的修改。但是如果這里將int putindex = this.putindex;方法lock代碼塊外面,就會產生問題。
removeat(final int removeindex)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// 刪除隊列removeindex位置的元素 void removeat( final int removeindex) { // assert lock.getholdcount() == 1; // assert items[removeindex] != null; // assert removeindex >= 0 && removeindex < items.length; final object[] items = this .items; // 表示刪除元素是列表頭,就容易多了,與dequeue方法流程差不多 if (removeindex == takeindex) { // 移除removeindex位置元素 items[takeindex] = null ; // 到了數組末尾,就要轉到數組頭位置 if (++takeindex == items.length) takeindex = 0 ; // 隊列數量減一 count--; if (itrs != null ) itrs.elementdequeued(); } else { // an "interior" remove final int putindex = this .putindex; for ( int i = removeindex;;) { int next = i + 1 ; if (next == items.length) next = 0 ; // 還沒有到隊列尾,那么就將后一個位置元素覆蓋前一個位置的元素 if (next != putindex) { items[i] = items[next]; i = next; } else { // 將隊列尾元素置位null items[i] = null ; // 重新設置putindex的值 this .putindex = i; break ; } } // 隊列數量減一 count--; if (itrs != null ) itrs.removedat(removeindex); } // 因為刪除了一個元素,那么隊列肯定不滿了,那么喚醒在notfull條件下等待的一個線程 notfull.signal(); } |
在隊列中刪除指定位置的元素。需要注意的是刪除之后的數組還能保持隊列形式,分為兩種情況:
- 如果刪除位置是隊列頭,那么簡單,只需要將隊列頭的位置元素設置為null,將將隊列頭位置加一。
- 如果刪除位置不是隊列頭,那么麻煩了,這個時候,我們就要將從removeindex位置后的元素全部左移一位,覆蓋前一個元素。最后將原來隊列尾的元素置位null。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.jianshu.com/p/766b97ba9d01