線程通信用來保證線程協調運行,一般在做線程同步的時候才需要考慮線程通信的問題。
1、傳統的線程通信
通常利用Objeclt類提供的三個方法:
- wait() 導致當前線程等待,并釋放該同步監視器的鎖定,直到其它線程調用該同步監視器的notify()或者notifyAll()方法喚醒線程。
- notify(),喚醒在此同步監視器上等待的線程,如果有多個會任意選擇一個喚醒
- notifyAll() 喚醒在此同步監視器上等待的所有線程,這些線程通過調度競爭資源后,某個線程獲取此同步監視器的鎖,然后得以運行。
這三個方法必須由同步監視器對象調用,分為兩張情況:
同步方法時,由于同步監視器為this對象,所以可以直接調用這三個方法。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
public class SyncMethodThreadCommunication { static class DataWrap{ int data = 0 ; boolean flag = false ; public synchronized void addThreadA(){ if (flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } data++; System.out.println(Thread.currentThread().getName() + " " + data); flag = true ; notify(); } public synchronized void addThreadB() { if (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } data++; System.out.println(Thread.currentThread().getName() + " " + data); flag = false ; notify(); } } static class ThreadA extends Thread { private DataWrap data; public ThreadA(DataWrap dataWrap) { this .data = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { data.addThreadA(); } } } static class ThreadB extends Thread { private DataWrap data; public ThreadB(DataWrap dataWrap) { this .data = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { data.addThreadB(); } } } public static void main(String[] args) { //實現兩個線程輪流對數據進行加一操作 DataWrap dataWrap = new DataWrap(); new ThreadA(dataWrap).start(); new ThreadB(dataWrap).start(); } } |
同步代碼塊時,需要使用監視器對象調用這三個方法。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class SyncBlockThreadComminication { static class DataWrap{ boolean flag; int data; } static class ThreadA extends Thread{ DataWrap dataWrap; public ThreadA(DataWrap dataWrap){ this .dataWrap = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { synchronized (dataWrap) { if (dataWrap.flag) { try { dataWrap.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } dataWrap.data++; System.out.println(getName() + " " + dataWrap.data); dataWrap.flag = true ; dataWrap.notify(); } } } } static class ThreadB extends Thread{ DataWrap dataWrap; public ThreadB(DataWrap dataWrap){ this .dataWrap = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { synchronized (dataWrap) { if (!dataWrap.flag) { try { dataWrap.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } dataWrap.data++; System.out.println(getName() + " " + dataWrap.data); dataWrap.flag = false ; dataWrap.notify(); } } } } public static void main(String[] args) { //實現兩個線程輪流對數據進行加一操作 DataWrap dataWrap = new DataWrap(); new ThreadA(dataWrap).start(); new ThreadB(dataWrap).start(); } } |
2、使用Condition控制線程通信
當使用Lock對象保證同步時,則使用Condition對象來保證協調。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.sun.media.sound.RIFFInvalidDataException; import javafx.scene.chart.PieChart.Data; public class SyncLockThreadCommunication { static class DataWrap { int data; boolean flag; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); public void addThreadA() { lock.lock(); try { if (flag) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } data++; System.out.println(Thread.currentThread().getName() + " " + data); flag = true ; condition.signal(); } finally { lock.unlock(); } } public void addThreadB() { lock.lock(); try { if (!flag) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } data++; System.out.println(Thread.currentThread().getName() + " " + data); flag = false ; condition.signal(); } finally { lock.unlock(); } } } static class ThreadA extends Thread{ DataWrap dataWrap; public ThreadA(DataWrap dataWrap) { this .dataWrap = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { dataWrap.addThreadA(); } } } static class ThreadB extends Thread{ DataWrap dataWrap; public ThreadB(DataWrap dataWrap) { this .dataWrap = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { dataWrap.addThreadB(); } } } public static void main(String[] args) { //實現兩個線程輪流對數據進行加一操作 DataWrap dataWrap = new DataWrap(); new ThreadA(dataWrap).start(); new ThreadB(dataWrap).start(); } } |
其中Condition對象的await(), singal(),singalAll()分別對應wait(),notify()和notifyAll()方法。
3、使用阻塞隊列BlockingQueue控制線程通信
BlockingQueue是Queue接口的子接口,主要用來做線程通信使用,它具有一個特征:當生產者線程試圖向BlockingQueue中放入元素時,如果隊列已滿,則該線程被阻塞;當消費者線程試圖從BlockingQueue中取出元素時,如果隊列已空,則該線程被阻塞。這兩個特征分別對應兩個支持阻塞的方法,put(E e)和take()
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueThreadComminication { static class DataWrap{ int data; } static class ThreadA extends Thread{ private BlockingQueue<DataWrap> blockingQueue; public ThreadA(BlockingQueue<DataWrap> blockingQueue, String name) { super (name); this .blockingQueue = blockingQueue; } @Override public void run() { for ( int i = 0 ; i < 100 ; i++) { try { DataWrap dataWrap = blockingQueue.take(); dataWrap.data++; System.out.println(getName() + " " + dataWrap.data); sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class ThreadB extends Thread{ private BlockingQueue<DataWrap> blockingQueue; private DataWrap dataWrap; public ThreadB(BlockingQueue<DataWrap> blockingQueue, DataWrap dataWrap, String name) { super (name); this .blockingQueue = blockingQueue; this .dataWrap = dataWrap; } @Override public void run() { for ( int i = 0 ; i < 100 ; i++) { try { dataWrap.data++; System.out.println(getName() + " " + dataWrap.data); blockingQueue.put(dataWrap); sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { ///實現兩個線程輪流對數據進行加一操作 DataWrap dataWrap = new DataWrap(); BlockingQueue<DataWrap> blockingQueue = new ArrayBlockingQueue<>( 1 ); new ThreadA(blockingQueue, "Consumer" ).start(); new ThreadB(blockingQueue, dataWrap, "Producer" ).start(); } } |
BlockingQueue共有五個實現類:
ArrayBlockingQueue 基于數組實現的BlockingQueue隊列
LinkedBlockingQueue 基于鏈表實現的BlockingQueue隊列
PriorityBlockingQueue 中元素需實現Comparable接口,其中元素的排序是按照Comparator進行的定制排序。
SynchronousQueue 同步隊列,要求對該隊列的存取操作必須是交替進行。
DelayQueue 集合元素必須實現Delay接口,隊列中元素排序按照Delay接口方法getDelay()的返回值進行排序。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。