1 Exchanger 介紹
前面分別介紹了CyclicBarrier、CountDownLatch、Semaphore,現在介紹并發工具類中的最后一個Exchange
。
Exchanger
是一個用于線程間協作的工具類,Exchanger
用于進行線程間的數據交換,它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange
方法交換數據,如果第一個線程先執行exchange
方法,它會一直等待第二個線程也執行exchange
方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據。
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有幾個要點:
- 此類提供對外的操作是同步的;
- 用于成對出現的線程之間交換數據;
- 可以視作雙向的同步隊列;
- 可應用于基因算法、流水線設計等場景。
- 接著看api文檔,這個類提供對外的接口非常簡潔,一個無參構造函數,兩個重載的范型exchange方法:
1
2
|
public V exchange(V x) throws InterruptedException public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException |
2 Exchanger 實例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class ExchangerTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executor.execute( new Runnable() { String data = "data1" ; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.execute( new Runnable() { String data = "data2" ; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.shutdown(); } private static void doExchangeWork(String data, Exchanger exchanger) { try { System.out.println(Thread.currentThread().getName() + "正在把數據 " + data + " 交換出去" ); Thread.sleep(( long ) (Math.random() * 1000 )); String exchangeData = (String) exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + "交換得到數據 " + exchangeData); } catch (InterruptedException e) { e.printStackTrace(); } } } |
pool-1-thread-1正在把數據 data1 交換出去
pool-1-thread-2正在把數據 data2 交換出去
pool-1-thread-2交換得到數據 data1
pool-1-thread-1交換得到數據 data2
當線程A調用Exchange
對象的exchange()
方法后,他會陷入阻塞狀態,直到線程B也調用了exchange()
方法,然后以線程安全的方式交換數據,之后線程A和B繼續運行。
exchange等待超時
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public class ExchangerTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executor.execute( new Runnable() { String data = "data1" ; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.execute( new Runnable() { String data = "data2" ; @Override public void run() { try { Thread.sleep(( long ) ( 3000 )); } catch (InterruptedException e) { e.printStackTrace(); } doExchangeWork(data, exchanger); } }); executor.shutdown(); } private static void doExchangeWork(String data, Exchanger exchanger) { try { System.out.println(Thread.currentThread().getName() + "正在把數據 " + data + " 交換出去" ); //遠小于3秒拋出異常 String exchangeData = (String) exchanger.exchange(data, 1 , TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + "交換得到數據 " + exchangeData); } catch ( TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
pool-1-thread-1正在把數據 data1 交換出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$1.run(ExchangerTest.java:12)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2正在把數據 data2 交換出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$2.run(ExchangerTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
實戰場景:
設計一個定時任務,每日凌晨執行。在定時任務中啟動兩個線程,一個線程負責對業務明細表(xxx_info)進行查詢統計,把統計的結果放置在內存緩沖區,另一個線程負責讀取緩沖區中的統計結果并插入到業務統計表(xxx_statistics)中。
親,這樣的場景是不是聽起來很有感覺?沒錯!兩個線程在內存中批量交換數據,這個事情我們可以使用Exchanger去做!
3 實現原理
Exchanger
(交換者)是一個用于線程間協作的工具類。Exchanger
用于進行線程間的數據交換。它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。這兩個線程通過exchange
方法交換數據, 如果第一個線程先執行exchange
方法,它會一直等待第二個線程也執行exchange
,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。因此使用Exchanger
的重點是成對的線程使用exchange()方法,當有一對線程達到了同步點,就會進行交換數據。因此該工具類的線程對象是成對的。
Exchanger類提供了兩個方法,String exchange(V x):用
于交換,啟動交換并等待另一個線程調用exchange
;String
exchange(V x,long timeout,TimeUnit unit):用于交換,啟動交換并等待另一個線程調用exchange
,并且設置最大等待時間,當等待時間超過timeout
便停止等待。
到此這篇關于Java多線程之同步工具類Exchanger的文章就介紹到這了,更多相關Java多線程 Exchanger內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://juejin.cn/post/7021034812295102501