簡介:
生產(chǎn)者、消費者模型是多線程編程的常見問題,最簡單的一個生產(chǎn)者、一個消費者線程模型大多數(shù)人都能夠寫出來,但是一旦條件發(fā)生變化,我們就很容易掉進多線程的bug中。這篇文章主要講解了生產(chǎn)者和消費者的數(shù)量,商品緩存位置數(shù)量,商品數(shù)量等多個條件的不同組合下,寫出正確的生產(chǎn)者消費者模型的方法。
歡迎探討,如有錯誤敬請指正
生產(chǎn)消費者模型
生產(chǎn)者消費者模型具體來講,就是在一個系統(tǒng)中,存在生產(chǎn)者和消費者兩種角色,他們通過內(nèi)存緩沖區(qū)進行通信,生產(chǎn)者生產(chǎn)消費者需要的資料,消費者把資料做成產(chǎn)品。生產(chǎn)消費者模式如下圖。
定義商品類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package demo; /*定義商品*/ public class goods { public final string name; public final int price; public final int id; public goods(string name, int price, int id){ this.name = name; /*類型*/ this.price = price; /*價格*/ this.id = id; /*商品序列號*/ } @override public string tostring(){ return "name: " + name + ", price:" + price + ", id: " + id; } } |
基本要求:
1)生產(chǎn)者不能重復生產(chǎn)一個商品,也就是說不能有兩個id相同的商品
2)生產(chǎn)者不能覆蓋一個商品(當前商品還未被消費,就被下一個新商品覆蓋)。也就是說消費商品時,商品的id屬性可以不連續(xù),但不能出現(xiàn)缺號的情況
3)消費者不能重復消費一個商品
1.生產(chǎn)者線程無線生產(chǎn),消費者線程無限消費的模式
1.1使用線程對象,一個生產(chǎn)者線程,一個消費者線程,一個商品存儲位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package demo; import java.util.random; /*使用線程對象,一個緩存位置,一個生產(chǎn)者,一個消費者,無限生產(chǎn)商品消費商品*/ public class productercomsumerdemo1 { /*定義一個商品緩存位置*/ private volatile goods goods; /*定義一個對象作為鎖,不使用goods作為鎖是因為生產(chǎn)者每次會產(chǎn)生一個新的對象*/ private object obj = new object(); /*isfull == true 生產(chǎn)者線程休息,消費者線程消費 *isfull == false 消費者線程休息,生產(chǎn)者線程生產(chǎn)*/ private volatile boolean isfull = false; /*商品的id編號,生產(chǎn)者制造的每個商品的id都不一樣,每生產(chǎn)一個id自增1*/ private int id = 1; /*隨機產(chǎn)生一個sleep時間*/ private random rnd = new random(); /*=================定義消費者線程==================*/ public class comsumethread implements runnable{ @override public void run(){ try{ while(true){ /*獲取obj對象的鎖, id 和 isfull 的操作都在同步代碼塊中*/ synchronized(obj){ if(!isfull){ /*wait方法使當前線程阻塞,并釋放鎖*/ obj.wait(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); /*模擬消費商品*/ system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); isfull = false; /*喚醒阻塞obj上的生產(chǎn)者線程*/ obj.notify(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); } } catch (interruptedexception e){ /*什么都不做*/ } } } /*=================定義生產(chǎn)者線程==================*/ public class productthread implements runnable{ @override public void run(){ try { while(true){ synchronized(obj){ if(isfull){ obj.wait(); } thread.sleep(rnd.nextint(500)); /*如果id為偶數(shù),生產(chǎn)價格為2的產(chǎn)品a *如果id為奇數(shù),生產(chǎn)價格為1的產(chǎn)品b*/ if(id % 2 == 0){ goods = new goods("a", 2, id); } else{ goods = new goods("b", 1, id); } thread.sleep(rnd.nextint(250)); id++; isfull = true; /*喚醒阻塞的消費者線程*/ obj.notify(); } } } catch (interruptedexception e) { /*什么都不做*/ } } } public static void main(string[] args) throws interruptedexception{ productercomsumerdemo1 pcd = new productercomsumerdemo1(); runnable c = pcd. new comsumethread(); runnable p = pcd. new productthread(); new thread(p).start(); new thread(c).start(); } } |
運行結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
name: b, price: 1 , id: 1 name: a, price: 2 , id: 2 name: b, price: 1 , id: 3 name: a, price: 2 , id: 4 name: b, price: 1 , id: 5 name: a, price: 2 , id: 6 name: b, price: 1 , id: 7 name: a, price: 2 , id: 8 name: b, price: 1 , id: 9 name: a, price: 2 , id: 10 name: b, price: 1 , id: 11 name: a, price: 2 , id: 12 name: b, price: 1 , id: 13 …… |
從結果看出,商品類型交替生產(chǎn),每個商品的id都不相同,且不會漏過任何一個id,生產(chǎn)者沒有重復生產(chǎn),消費者沒有重復消費,結果完全正確。
1.2.使用線程對象,多個生產(chǎn)者線程,多個消費者線程,1個緩存位置
1.2.1一個經(jīng)典的bug
對于多生產(chǎn)者,多消費者這個問題,看起來我們似乎不用修改代碼,只需在main方法中多添加幾個線程就好。假設我們需要三個消費者,一個生產(chǎn)者,那么我們只需要在main方法中再添加兩個消費者線程。
1
2
3
4
5
6
7
8
9
|
public static void main(string[] args) throws interruptedexception{ productercomsumerdemo1 pcd = new productercomsumerdemo1(); runnable c = pcd. new comsumethread(); runnable p = pcd. new productthread(); new thread(c).start(); new thread(p).start(); new thread(c).start(); new thread(c).start(); } |
運行結果
1
2
3
4
5
6
7
8
9
10
11
|
name: b, price: 1 , id: 1 name: a, price: 2 , id: 2 name: a, price: 2 , id: 2 name: b, price: 1 , id: 3 name: b, price: 1 , id: 3 name: a, price: 2 , id: 4 name: a, price: 2 , id: 4 name: b, price: 1 , id: 5 name: b, price: 1 , id: 5 name: a, price: 2 , id: 6 …… |
從結果中,我們發(fā)現(xiàn)消費者重復消費了商品,所以這樣做顯然是錯誤的。這里我們定義多個消費者,一個生產(chǎn)者,所以遇到了重復消費的問題,如果定義成一個消費者,多個生產(chǎn)者就會遇到id覆蓋的問題。如果我們定義多個消費者,多個生產(chǎn)者,那么即會遇到重復消費,也會遇到id覆蓋的問題。注意,上面的代碼使用的notifyall喚醒方法,如果使用notify方法喚醒bug仍然可能發(fā)生。
現(xiàn)在我們來分析一下原因。當生產(chǎn)者生產(chǎn)好了商品,會喚醒因沒有商品而阻塞消費者線程,假設喚醒的消費者線程超過兩個,這兩個線程會競爭獲取鎖,獲取到鎖的線程就會從obj.wait()方法中返回,然后消費商品,并把isfull置為false,然后釋放鎖。當被喚醒的另一個線程競爭獲取到鎖了以后也會從obj.wait()方法中返回。會再次消費同一個商品。顯然,每一個被喚醒的線程應該再次檢查isfull這個條件。所以無論是消費者,還是生產(chǎn)者,isfull的判斷必須改成while循環(huán),這樣才能得到正確的結果而不受生產(chǎn)者的線程數(shù)和消費者的線程數(shù)的影響。
而對于只有一個生產(chǎn)者線程,一個消費者線程,用if判斷是沒有問題的,但是仍然強烈建議改成while語句進行判斷。
1.2.2正確的姿勢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
package demo; import java.util.random; /*使用線程對象,一個緩存位置,一個生產(chǎn)者,一個消費者,無限生產(chǎn)商品消費商品*/ public class productercomsumerdemo1 { /*定義一個商品緩存位置*/ private volatile goods goods; /*定義一個對象作為鎖,不使用goods作為鎖是因為生產(chǎn)者每次會產(chǎn)生一個新的對象*/ private object obj = new object(); /*isfull == true 生產(chǎn)者線程休息,消費者線程消費 *isfull == false 消費者線程消費,生產(chǎn)者線程生產(chǎn)*/ private volatile boolean isfull = false; /*商品的id編號,生產(chǎn)者制造的每個商品的id都不一樣,每生產(chǎn)一個id自增1*/ private int id = 1; /*隨機產(chǎn)生一個sleep時間*/ private random rnd = new random(); /*=================定義消費者線程==================*/ public class comsumethread implements runnable{ @override public void run(){ try{ while(true){ /*獲取obj對象的鎖, id 和 isfull 的操作都在同步代碼塊中*/ synchronized(obj){ while(!isfull){ /*wait方法使當前線程阻塞,并釋放鎖*/ obj.wait(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); /*模擬消費商品*/ system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); isfull = false; /*喚醒阻塞obj上的生產(chǎn)者線程*/ obj.notifyall(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); } } catch (interruptedexception e){ /*我就是任性,這里什么都不做*/ } } } /*=================定義生產(chǎn)者線程==================*/ public class productthread implements runnable{ @override public void run(){ try { while(true){ synchronized(obj){ while(isfull){ obj.wait(); } thread.sleep(rnd.nextint(500)); /*如果id為偶數(shù),生產(chǎn)價格為2的產(chǎn)品a 如果id為奇數(shù),生產(chǎn)價格為1的產(chǎn)品b*/ if(id % 2 == 0){ goods = new goods("a", 2, id); } else{ goods = new goods("b", 1, id); } thread.sleep(rnd.nextint(250)); id++; isfull = true; /*喚醒阻塞的消費者線程*/ obj.notifyall(); } } } catch (interruptedexception e) { /*我就是任性,這里什么都不做*/ } } } public static void main(string[] args) throws interruptedexception{ productercomsumerdemo1 pcd = new productercomsumerdemo1(); runnable c = pcd. new comsumethread(); runnable p = pcd. new productthread(); new thread(p).start(); new thread(p).start(); new thread(p).start(); new thread(c).start(); new thread(c).start(); new thread(c).start(); } } |
1.3使用線程對象,多個緩存位置(有界),多生產(chǎn)者,多消費者
1)當緩存位置滿時,我們應該阻塞生產(chǎn)者線程
2)當緩存位置空時,我們應該阻塞消費者線程
下面的代碼我沒有用java對象內(nèi)置的鎖,而是用了reentrantlock對象。是因為普通對象的鎖只有一個阻塞隊列,如果使用notify方式,無法保證喚醒的就是特定類型的線程(消費者線程或生產(chǎn)者線程),而notifyall方法會喚醒所有的線程,當剩余的緩存商品的數(shù)量小于生產(chǎn)者線程數(shù)量或已緩存商品的數(shù)量小于消費者線程時效率就比較低。所以這里我們通過reentrantlock對象構造兩個阻塞隊列提高效率。
1.3.1普通方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package demo; import java.util.linkedlist; import java.util.random; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; /*使用線程對象,多個緩存位置(有界),多生產(chǎn)者,多消費者,無限循環(huán)模式*/ public class productercomsumerdemo2 { /*最大緩存商品數(shù)*/ private final int max_slot = 2; /*定義緩存商品的容器*/ private linkedlist<goods> queue = new linkedlist<goods>(); /*定義線程鎖和鎖對應的阻塞隊列*/ private lock lock = new reentrantlock(); private condition full = lock.newcondition(); private condition empty = lock.newcondition(); /*商品的id編號,生產(chǎn)者制造的每個商品的id都不一樣,每生產(chǎn)一個id自增1*/ private int id = 1; /*隨機產(chǎn)生一個sleep時間*/ private random rnd = new random(); /*=================定義消費者線程==================*/ public class comsumethread implements runnable{ @override public void run(){ while(true){ /*加鎖,queue的出列操作都在同步代碼塊中*/ lock.lock(); try { while(queue.isempty()){ system.out.println("queue is empty"); empty.await(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(200)); /*模擬消費商品*/ goods goods = queue.remove(); system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(200)); /*喚醒阻塞的生產(chǎn)者線程*/ full.signal(); } catch (interruptedexception e) { /*什么都不做*/ } finally{ lock.unlock(); } /*釋放鎖后隨機延時一段時間*/ try { thread.sleep(rnd.nextint(200)); } catch (interruptedexception e) { /*什么都不做*/ } } } } /*=================定義生產(chǎn)者線程==================*/ public class productthread implements runnable{ @override public void run(){ while(true){ /*加鎖,queue的入列操作,id操作都在同步代碼塊中*/ lock.lock(); try{ while(queue.size() == max_slot){ system.out.println("queue is full"); full.await(); } thread.sleep(rnd.nextint(200)); goods goods = null; /*根據(jù)序號產(chǎn)生不同的商品*/ switch(id%3){ case 0 : goods = new goods("a", 1, id); break; case 1 : goods = new goods("b", 2, id); break; case 2 : goods = new goods("c", 3, id); break; } thread.sleep(rnd.nextint(200)); queue.add(goods); id++; /*喚醒阻塞的消費者線程*/ empty.signal(); } catch(interruptedexception e){ /*什么都不做*/ } finally{ lock.unlock(); } /*釋放鎖后隨機延時一段時間*/ try { thread.sleep(rnd.nextint(100)); } catch (interruptedexception e) { /*什么都不做*/ } } } } /*=================main==================*/ public static void main(string[] args) throws interruptedexception{ productercomsumerdemo2 pcd = new productercomsumerdemo2(); runnable c = pcd.new comsumethread(); runnable p = pcd.new productthread(); /*兩個生產(chǎn)者線程,兩個消費者線程*/ new thread(p).start(); new thread(p).start(); new thread(c).start(); new thread(c).start(); } } |
運行結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
queue is empty queue is empty name: b, price: 2 , id: 1 name: c, price: 3 , id: 2 name: a, price: 1 , id: 3 queue is full name: b, price: 2 , id: 4 name: c, price: 3 , id: 5 queue is full name: a, price: 1 , id: 6 name: b, price: 2 , id: 7 name: c, price: 3 , id: 8 name: a, price: 1 , id: 9 name: b, price: 2 , id: 10 name: c, price: 3 , id: 11 name: a, price: 1 , id: 12 name: b, price: 2 , id: 13 name: c, price: 3 , id: 14 …… |
1.3.2 更優(yōu)雅的實現(xiàn)方式
下面使用線程池(threadpool)和阻塞隊列(linkedblockingqueue)原子類(atomicinteger)以更加優(yōu)雅的方式實現(xiàn)上述功能。linkedblockingqueue阻塞隊列僅在take和put方法上鎖,所以id必須定義為原子類。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package demo; import java.util.random; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.atomic.atomicinteger; /*使用線程對象,多個緩存位置(有界),多生產(chǎn)者,多消費者,無限循環(huán)模式*/ public class productercomsumerdemo4 { /*最大緩存商品數(shù)*/ private final int max_slot = 3; /*定義緩存商品的容器*/ private linkedblockingqueue<goods> queue = new linkedblockingqueue<goods>(max_slot); /*商品的id編號,生產(chǎn)者制造的每個商品的id都不一樣,每生產(chǎn)一個id自增1*/ private atomicinteger id = new atomicinteger(1); /*隨機產(chǎn)生一個sleep時間*/ private random rnd = new random(); /*=================定義消費者線程==================*/ public class comsumethread implements runnable{ @override public void run(){ while(true){ try { /*隨機延時一段時間*/ thread.sleep(rnd.nextint(200)); /*模擬消費商品*/ goods goods = queue.take(); system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(200)); } catch (interruptedexception e) { /*什么都不做*/ } } } } /*=================定義生產(chǎn)者線程==================*/ public class productthread implements runnable{ @override public void run(){ while(true){ try{ int x = id.getandincrement(); goods goods = null; thread.sleep(rnd.nextint(200)); /*根據(jù)序號產(chǎn)生不同的商品*/ switch(x%3){ case 0 : goods = new goods("a", 1, x); break; case 1 : goods = new goods("b", 2, x); break; case 2 : goods = new goods("c", 3, x); break; } thread.sleep(rnd.nextint(200)); queue.put(goods); thread.sleep(rnd.nextint(100)); } catch(interruptedexception e){ /*什么都不做*/ } } } } /*=================main==================*/ public static void main(string[] args) throws interruptedexception{ productercomsumerdemo4 pcd = new productercomsumerdemo4(); runnable c = pcd.new comsumethread(); runnable p = pcd.new productthread(); /*定義線程池*/ executorservice es = executors.newcachedthreadpool(); /*三個生產(chǎn)者線程,兩個消費者線程*/ es.execute(p); es.execute(p); es.execute(p); es.execute(c); es.execute(c); es.shutdown(); } } |
2.有限商品個數(shù)
這個問題顯然比上面的問題要復雜不少,原因在于要保證緩存區(qū)的商品要全部消費掉,沒有重復消費商品,沒有覆蓋商品,同時還要保證所有線程能夠正常結束,防止存在一直阻塞的線程。
2.1使用線程對象,多個緩存位置(有界),多生產(chǎn)者,多消費者
思路定義一下三個變量
1
2
3
4
5
6
7
8
|
/*需要生產(chǎn)的總商品數(shù)*/ private final int total_num = 30; /*已產(chǎn)生的數(shù)量*/ private volatile int productnum = 0; /*已消耗的商品數(shù)*/ private volatile int comsumednum = 0 ; |
每生產(chǎn)一個商品 productnum 自增1,直到total_num為止,如果不滿足條件 productnum < total_num 則結束進程,自增操作必須在full.await()方法調用之前,防止生產(chǎn)者線程無法喚醒。
同理,每消費一個商品 comsumednum 自增1,直到total_num為止,如果不滿足條件 comsumednum < total_num 則結束進程,自增操作必須在empty.await()方法調用之前,防止消費者線程無法喚醒。
comsumednum和productnum相當于計劃經(jīng)濟時代的糧票一樣,有了它能夠保證生產(chǎn)者線程在喚醒后一定需要生產(chǎn)一個商品,消費者線程在喚醒以后一定能夠消費一個商品
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
package demo; import java.util.linkedlist; import java.util.random; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; /*使用線程對象,多個緩存位置(有界),多生產(chǎn)者,多消費者, 有限商品個數(shù)*/ public class productercomsumerdemo3 { /*需要生產(chǎn)的總商品數(shù)*/ private final int total_num = 30; /*已產(chǎn)生的數(shù)量*/ private volatile int productnum = 0; /*已消耗的商品數(shù)*/ private volatile int comsumednum = 0; /*最大緩存商品數(shù)*/ private final int max_slot = 2; /*定義線程公用的鎖和條件*/ private lock lock = new reentrantlock(); private condition full = lock.newcondition(); private condition empty = lock.newcondition(); /*定義緩存商品的容器*/ private linkedlist<goods> queue = new linkedlist<goods>(); /*商品的id編號,生產(chǎn)者制造的每個商品的id都不一樣,每生產(chǎn)一個id自增1*/ private int id = 1; /*隨機產(chǎn)生一個sleep時間*/ private random rnd = new random(); /*=================定義消費者線程==================*/ public class comsumethread implements runnable{ @override public void run(){ while(true){ /*加鎖, id、comsumednum 操作都在同步代碼塊中*/ lock.lock(); try { /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); if(comsumednum < total_num){ comsumednum++; } else{ /*這里會自動執(zhí)行finally的語句,釋放鎖*/ break; } while(queue.isempty()){ system.out.println("queue is empty"); empty.await(); } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); /*模擬消費商品*/ goods goods = queue.remove(); system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); /*喚醒阻塞的生產(chǎn)者線程*/ full.signal(); } catch (interruptedexception e) { } finally{ lock.unlock(); } /*釋放鎖后,隨機延時一段時間*/ try { thread.sleep(rnd.nextint(250)); } catch (interruptedexception e) { } } system.out.println( "customer " + thread.currentthread().getname() + " is over"); } } /*=================定義生產(chǎn)者線程==================*/ public class productthread implements runnable{ @override public void run(){ while(true){ lock.lock(); try{ /*隨機延時一段時間*/ thread.sleep(rnd.nextint(250)); if(productnum < total_num){ productnum++; } else{ /*這里會自動執(zhí)行finally的語句,釋放鎖*/ break; } thread.sleep(rnd.nextint(250)); while(queue.size() == max_slot){ system.out.println("queue is full"); full.await(); } thread.sleep(rnd.nextint(250)); goods goods = null; /*根據(jù)序號產(chǎn)生不同的商品*/ switch(id%3){ case 0 : goods = new goods("a", 1, id); break; case 1 : goods = new goods("b", 2, id); break; case 2 : goods = new goods("c", 3, id); break; } queue.add(goods); id++; /*喚醒阻塞的消費者線程*/ empty.signal(); } catch(interruptedexception e){ } finally{ lock.unlock(); } /*釋放鎖后,隨機延時一段時間*/ try { thread.sleep(rnd.nextint(250)); } catch (interruptedexception e) { /*什么都不做*/ } } system.out.println( "producter " + thread.currentthread().getname() + " is over"); } } /*=================main==================*/ public static void main(string[] args) throws interruptedexception{ productercomsumerdemo3 pcd = new productercomsumerdemo3(); comsumethread c = pcd. new comsumethread(); productthread p = pcd. new productthread(); new thread(p).start(); new thread(p).start(); new thread(p).start(); new thread(c).start(); new thread(c).start(); new thread(c).start(); system.out.println( "main thread is over" ); } } |
2.2利用線程池,原子類,阻塞隊列,以更優(yōu)雅的方式實現(xiàn)
linkedblockingqueue阻塞隊列僅在take和put方法上鎖,所以productnum和comsumednum必須定義為原子類。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
package demo; import java.util.random; import java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.atomic.atomicinteger; /*使用線程池,多個緩存位置(有界),多生產(chǎn)者,多消費者, 有限商品個數(shù)*/ public class linkedblockingqueuedemo { /*需要生產(chǎn)的總商品數(shù)*/ private final int total_num = 20; /*已產(chǎn)生商品的數(shù)量*/ volatile atomicinteger productnum = new atomicinteger(0); /*已消耗的商品數(shù)*/ volatile atomicinteger comsumednum = new atomicinteger(0); /*最大緩存商品數(shù)*/ private final int max_slot = 5; /*同步阻塞隊列,隊列容量為max_slot*/ private linkedblockingqueue<goods> lbq = new linkedblockingqueue<goods>(max_slot); /*隨機數(shù)*/ private random rnd = new random(); /*pn表示產(chǎn)品的編號,產(chǎn)品編號從1開始*/ private volatile atomicinteger pn = new atomicinteger(1); /*=================定義消費者線程==================*/ public class customerthread implements runnable{ @override public void run(){ while(comsumednum.getandincrement() < total_num){ try{ /*隨機延時一段時間*/ thread.sleep(rnd.nextint(500)); /*從隊列中取出商品,隊列空時發(fā)生阻塞*/ goods goods = lbq.take(); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(500)); /*模擬消耗商品*/ system.out.println(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(500)); } catch(interruptedexception e){ } } system.out.println( "customer " + thread.currentthread().getname() + " is over"); } } /*=================定義生產(chǎn)者線程==================*/ public class producerthread implements runnable{ @override public void run(){ while(productnum.getandincrement() < total_num){ try { int x = pn.getandincrement(); goods goods = null; /*根據(jù)序號產(chǎn)生不同的商品*/ switch(x%3){ case 0 : goods = new goods("a", 1, x); break; case 1 : goods = new goods("b", 2, x); break; case 2 : goods = new goods("c", 3, x); break; } /*隨機延時一段時間*/ thread.sleep(rnd.nextint(500)); /*產(chǎn)生的新產(chǎn)品入列,隊列滿時發(fā)生阻塞*/ lbq.put(goods); /*隨機延時一段時間*/ thread.sleep(rnd.nextint(500)); } catch (interruptedexception e1) { /*什么都不做*/ } } system.out.println( "producter " + thread.currentthread().getname() + " is over "); } } /*=================main==================*/ public static void main(string[] args){ linkedblockingqueuedemo lbqd = new linkedblockingqueuedemo(); runnable c = lbqd. new customerthread(); runnable p = lbqd. new producerthread(); executorservice es = executors.newcachedthreadpool(); es.execute(c); es.execute(c); es.execute(c); es.execute(p); es.execute(p); es.execute(p); es.shutdown(); system.out.println( "main thread is over" ); } } |
總結
以上就是本文關于java多線程中不同條件下編寫生產(chǎn)消費者模型方法介紹的全部內(nèi)容,希望對大家有所幫助。如有不足之處,歡迎留言指出。
原文鏈接:http://www.cnblogs.com/nullzx/p/7798504.html