queue是什么
隊(duì)列,是一種數(shù)據(jù)結(jié)構(gòu)。除了優(yōu)先級隊(duì)列和lifo隊(duì)列外,隊(duì)列都是以fifo(先進(jìn)先出)的方式對各個元素進(jìn)行排序的。無論使用哪種排序方式,隊(duì)列的頭都是調(diào)用remove()或poll()移除元素的。在fifo隊(duì)列中,所有新元素都插入隊(duì)列的末尾。
queue中的方法
queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下jdkapi就知道了:
注意一點(diǎn)就好,queue通常不允許插入null,盡管某些實(shí)現(xiàn)(比如linkedlist)是允許的,但是也不建議。
blockingqueue
1、blockingqueue概述
blockingqueue也是java.util.concurrent下的主要用來控制線程同步的工具。
blockingqueue有四個具體的實(shí)現(xiàn)類,根據(jù)不同需求,選擇不同的實(shí)現(xiàn)類
1、arrayblockingqueue:一個由數(shù)組支持的有界阻塞隊(duì)列,規(guī)定大小的blockingqueue,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小.其所含的對象是以fifo(先入先出)順序排序的。
2、linkedblockingqueue:大小不定的blockingqueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù),生成的blockingqueue有大小限制,若不帶大小參數(shù),所生成的blockingqueue的大小由integer.max_value來決定.其所含的對象是以fifo(先入先出)順序排序的。
3、priorityblockingqueue:類似于linkedblockqueue,但其所含對象的排序不是fifo,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)的comparator決定的順序。
4、synchronousqueue:特殊的blockingqueue,對其的操作必須是放和取交替完成的。
linkedblockingqueue可以指定容量,也可以不指定,不指定的話,默認(rèn)最大是integer.max_value,其中主要用到put和take方法,put方法在隊(duì)列滿的時候會阻塞直到有隊(duì)列成員被消費(fèi),take方法在隊(duì)列空的時候會阻塞,直到有隊(duì)列成員被放進(jìn)來。
講blockingqueue,因?yàn)閎lockingqueue是queue中的一個重點(diǎn),并且通過blockingqueue我們再次加深對于生產(chǎn)者/消費(fèi)者模型的理解。其他的queue都不難,通過查看jdkapi和簡單閱讀源碼完全可以理解他們的作用。
blockingqueue,顧名思義,阻塞隊(duì)列。blockingqueue是在java.util.concurrent下的,因此不難理解,blockingqueue是為了解決多線程中數(shù)據(jù)高效安全傳輸而提出的。
多線程中,很多場景都可以使用隊(duì)列實(shí)現(xiàn),比如經(jīng)典的生產(chǎn)者/消費(fèi)者模型,通過隊(duì)列可以便利地實(shí)現(xiàn)兩者之間數(shù)據(jù)的共享,定義一個生產(chǎn)者線程,定義一個消費(fèi)者線程,通過隊(duì)列共享數(shù)據(jù)就可以了。
當(dāng)然現(xiàn)實(shí)不可能都是理想的,比如消費(fèi)者消費(fèi)速度比生產(chǎn)者生產(chǎn)的速度要快,那么消費(fèi)者消費(fèi)到一定程度上的時候,必須要暫停等待一下了(使消費(fèi)者線程處于waiting狀態(tài))。blockingqueue的提出,就是為了解決這個問題的,他不用程序員去控制這些細(xì)節(jié),同時還要兼顧效率和線程安全。
阻塞隊(duì)列所謂的"阻塞",指的是某些情況下線程會掛起(即阻塞),一旦條件滿足,被掛起的線程又會自動喚醒。使用blockingqueue,不需要關(guān)心什么時候需要阻塞線程,什么時候需要喚醒線程,這些內(nèi)容blockingqueue都已經(jīng)做好了
2、blockingqueue中的方法
blockingqueue既然是queue的子接口,必然有queue中的方法,上面已經(jīng)列了。看一下blockingqueue中特有的方法:
(1)voidput(ee)throwsinterruptedexception
把e添加進(jìn)blockingqueue中,如果blockingqueue中沒有空間,則調(diào)用線程被阻塞,進(jìn)入等待狀態(tài),直到blockingqueue中有空間再繼續(xù)
(2)voidtake()throwsinterruptedexception
取走blockingqueue里面排在首位的對象,如果blockingqueue為空,則調(diào)用線程被阻塞,進(jìn)入等待狀態(tài),直到blockingqueue有新的數(shù)據(jù)被加入
(3)intdrainto(collection<?supere>c,intmaxelements)
一次性取走blockingqueue中的數(shù)據(jù)到c中,可以指定取的個數(shù)。通過該方法可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖
3、arrayblockingqueue
基于數(shù)組的阻塞隊(duì)列,必須指定隊(duì)列大小。比較簡單。arrayblockingqueue中只有一個reentrantlock對象,這意味著生產(chǎn)者和消費(fèi)者無法并行運(yùn)行(見下面的代碼)。另外,創(chuàng)建arrayblockingqueue時,可以指定reentrantlock是否為公平鎖,默認(rèn)采用非公平鎖。
1
2
3
4
5
6
|
/** main lock guarding all access */ private final reentrantlock lock; /** condition for waiting takes */ private final condition notempty; /** condition for waiting puts */ private final condition notfull; |
4、linkedblockingqueue
基于鏈表的阻塞隊(duì)列,和arrayblockingqueue差不多。不過linkedblockingqueue如果不指定隊(duì)列容量大小,會默認(rèn)一個類似無限大小的容量,之所以說是類似是因?yàn)檫@個無限大小是integer.max_value,這么說就好理解arrayblockingqueue為什么必須要制定大小了,如果arrayblockingqueue不指定大小的話就用integer.max_value,那將造成大量的空間浪費(fèi),但是基于鏈表實(shí)現(xiàn)就不一樣的,一個一個節(jié)點(diǎn)連起來而已。另外,linkedblockingqueue生產(chǎn)者和消費(fèi)者都有自己的鎖(見下面的代碼),這意味著生產(chǎn)者和消費(fèi)者可以"同時"運(yùn)行。
1
2
3
4
5
6
7
8
|
/** lock held by take, poll, etc */ private final reentrantlock takelock = new reentrantlock(); /** wait queue for waiting takes */ private final condition notempty = takelock.newcondition(); /** lock held by put, offer, etc */ private final reentrantlock putlock = new reentrantlock(); /** wait queue for waiting puts */ private final condition notfull = putlock.newcondition(); |
5、synchronousqueue
比較特殊,一種沒有緩沖的等待隊(duì)列。什么叫做沒有緩沖區(qū),arrayblocking中有:
1
2
|
/** the queued items */ private final e[] items; |
數(shù)組用以存儲隊(duì)列。linkedblockingqueue中有:
1
2
3
4
5
6
7
8
9
|
/** * linked list node class */ static class node<e> { /** the item, volatile to ensure barrier separating write and read */ volatile e item; node<e> next; node(e x) { item = x; } } |
將隊(duì)列以鏈表形式連接。
生產(chǎn)者/消費(fèi)者操作數(shù)據(jù)實(shí)際上都是通過這兩個"中介"來操作數(shù)據(jù)的,但是synchronousqueue則是生產(chǎn)者直接把數(shù)據(jù)給消費(fèi)者(消費(fèi)者直接從生產(chǎn)者這里拿數(shù)據(jù)),好像又回到了沒有生產(chǎn)者/消費(fèi)者模型的老辦法了。換句話說,每一個插入操作必須等待一個線程對應(yīng)的移除操作。synchronousqueue又有兩種模式:
1、公平模式
采用公平鎖,并配合一個fifo隊(duì)列(queue)來管理多余的生產(chǎn)者和消費(fèi)者
2、非公平模式
采用非公平鎖,并配合一個lifo棧(stack)來管理多余的生產(chǎn)者和消費(fèi)者,這也是synchronousqueue默認(rèn)的模式
利用blockingqueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
上一篇我們寫的生產(chǎn)者消費(fèi)者模型有局限,局限體現(xiàn)在:
緩沖區(qū)內(nèi)只能存放一個數(shù)據(jù),實(shí)際生產(chǎn)者/消費(fèi)者模型中的緩沖區(qū)內(nèi)可以存放大量生產(chǎn)者生產(chǎn)出來的數(shù)據(jù)
生產(chǎn)者和消費(fèi)者處理數(shù)據(jù)的速度幾乎一樣
ok,我們就用blockingqueue來簡單寫一個例子,并且讓生產(chǎn)者、消費(fèi)者處理數(shù)據(jù)速度不同。子類選擇的是arrayblockingqueue,大小定為10:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public static void main(string[] args) { final blockingqueue<string> bq = new arrayblockingqueue<string>( 10 ); runnable producerrunnable = new runnable() { int i = 0 ; public void run() { while ( true ) { try { system.out.println( "我生產(chǎn)了一個" + i++); bq.put(i + "" ); thread.sleep( 1000 ); } catch (interruptedexception e) { e.printstacktrace(); } } } }; runnable customerrunnable = new runnable() { public void run() { while ( true ) { try { system.out.println( "我消費(fèi)了一個" + bq.take()); thread.sleep( 3000 ); } catch (interruptedexception e) { e.printstacktrace(); } } } }; thread producerthread = new thread(producerrunnable); thread customerthread = new thread(customerrunnable); producerthread.start(); customerthread.start(); } |
代碼的做法是讓生產(chǎn)者生產(chǎn)速度快于消費(fèi)者消費(fèi)速度的,看一下運(yùn)行結(jié)果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
我生產(chǎn)了一個 0 我消費(fèi)了一個 1 我生產(chǎn)了一個 1 我生產(chǎn)了一個 2 我消費(fèi)了一個 2 我生產(chǎn)了一個 3 我生產(chǎn)了一個 4 我生產(chǎn)了一個 5 我消費(fèi)了一個 3 我生產(chǎn)了一個 6 我生產(chǎn)了一個 7 我生產(chǎn)了一個 8 我消費(fèi)了一個 4 我生產(chǎn)了一個 9 我生產(chǎn)了一個 10 我生產(chǎn)了一個 11 我消費(fèi)了一個 5 我生產(chǎn)了一個 12 我生產(chǎn)了一個 13 我生產(chǎn)了一個 14 我消費(fèi)了一個 6 我生產(chǎn)了一個 15 我生產(chǎn)了一個 16 我消費(fèi)了一個 7 我生產(chǎn)了一個 17 我消費(fèi)了一個 8 我生產(chǎn)了一個 18 |
分兩部分來看輸出結(jié)果:
1、第1行~第23行。這塊blockingqueue未滿,所以生產(chǎn)者隨便生產(chǎn),消費(fèi)者隨便消費(fèi),基本上都是生產(chǎn)3個消費(fèi)1個,消費(fèi)者消費(fèi)速度慢
2、第24行~第27行,從前面我們可以看出,生產(chǎn)到16,消費(fèi)到6,說明到了arrayblockingqueue的極限10了,這時候沒辦法,生產(chǎn)者生產(chǎn)一個arrayblockingqueue就滿了,所以不能繼續(xù)生產(chǎn)了,只有等到消費(fèi)者消費(fèi)完才可以繼續(xù)生產(chǎn)。所以之后的打印內(nèi)容一定是一個生產(chǎn)者、一個消費(fèi)者
這就是前面一章開頭說的"通過平衡生產(chǎn)者和消費(fèi)者的處理能力來提高整體處理數(shù)據(jù)的速度",這給例子應(yīng)該體現(xiàn)得很明顯。另外,也不要擔(dān)心非單一生產(chǎn)者/消費(fèi)者場景下的系統(tǒng)假死問題,緩沖區(qū)空、緩沖區(qū)滿的場景blockingqueue都是定義了不同的condition,所以不會喚醒自己的同類。
總結(jié)
以上就是本文關(guān)于java多線程queue、blockingqueue和使用blockingqueue實(shí)現(xiàn)生產(chǎn)消費(fèi)者模型方法解析的全部內(nèi)容,希望對大家有所幫助。如有不足之處,歡迎留言指出。
原文鏈接:https://www.cnblogs.com/xrq730/p/4855857.html