Executor框架的兩級(jí)調(diào)度模型
在HotSpot VM的模型中,Java線程被一對(duì)一映射為本地操作系統(tǒng)線程。JAVA線程啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)本地操作系統(tǒng)線程,當(dāng)JAVA線程終止時(shí),對(duì)應(yīng)的操作系統(tǒng)線程也被銷(xiāo)毀回收,而操作系統(tǒng)會(huì)調(diào)度所有線程并將它們分配給可用的CPU。
在上層,JAVA程序會(huì)將應(yīng)用分解為多個(gè)任務(wù),然后使用應(yīng)用級(jí)的調(diào)度器(Executor)將這些任務(wù)映射成固定數(shù)量的線程;在底層,操作系統(tǒng)內(nèi)核將這些線程映射到硬件處理器上。
Executor框架類(lèi)圖
在前面介紹的JAVA線程既是工作單元,也是執(zhí)行機(jī)制。而在Executor框架中,我們將工作單元與執(zhí)行機(jī)制分離開(kāi)來(lái)。Runnable和Callable是工作單元(也就是俗稱(chēng)的任務(wù)),而執(zhí)行機(jī)制由Executor來(lái)提供。這樣一來(lái)Executor是基于生產(chǎn)者消費(fèi)者模式的,提交任務(wù)的操作相當(dāng)于生成者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者。
1、從類(lèi)圖上看,Executor接口是異步任務(wù)執(zhí)行框架的基礎(chǔ),該框架能夠支持多種不同類(lèi)型的任務(wù)執(zhí)行策略。
1
2
3
4
|
public interface Executor { void execute(Runnable command); } |
Executor接口就提供了一個(gè)執(zhí)行方法,任務(wù)是Runnbale類(lèi)型,不支持Callable類(lèi)型。
2、ExecutorService接口實(shí)現(xiàn)了Executor接口,主要提供了關(guān)閉線程池和submit方法:
1
2
3
4
5
6
7
8
9
10
11
|
public interface ExecutorService extends Executor { List<Runnable> shutdownNow(); boolean isTerminated(); <T> Future<T> submit(Callable<T> task); } |
另外該接口有兩個(gè)重要的實(shí)現(xiàn)類(lèi):ThreadPoolExecutor與ScheduledThreadPoolExecutor。
其中ThreadPoolExecutor是線程池的核心實(shí)現(xiàn)類(lèi),用來(lái)執(zhí)行被提交的任務(wù);而ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類(lèi),可以在給定的延遲后運(yùn)行任務(wù),或者定期執(zhí)行命令。
在上一篇文章中,我是使用ThreadPoolExecutor來(lái)通過(guò)給定不同的參數(shù)從而創(chuàng)建自己所需的線程池,但是在后面的工作中不建議這種方式,推薦使用Exectuors工廠方法來(lái)創(chuàng)建線程池
這里先來(lái)區(qū)別線程池和線程組(ThreadGroup與ThreadPoolExecutor)這兩個(gè)概念:
a、線程組就表示一個(gè)線程的集合。
b、線程池是為線程的生命周期開(kāi)銷(xiāo)問(wèn)題和資源不足問(wèn)題提供解決方案,主要是用來(lái)管理線程。
Executors可以創(chuàng)建3種類(lèi)型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadExecutor和CachedThreadPool
a、SingleThreadExecutor:?jiǎn)尉€程線程池
1
|
ExecutorService threadPool = Executors.newSingleThreadExecutor(); |
1
2
3
4
5
6
|
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } |
我們從源碼來(lái)看可以知道,單線程線程池的創(chuàng)建也是通過(guò)ThreadPoolExecutor,里面的核心線程數(shù)和線程數(shù)都是1,并且工作隊(duì)列使用的是無(wú)界隊(duì)列。由于是單線程工作,每次只能處理一個(gè)任務(wù),所以后面所有的任務(wù)都被阻塞在工作隊(duì)列中,只能一個(gè)個(gè)任務(wù)執(zhí)行。
b、FixedThreadExecutor:固定大小線程池
1
|
ExecutorService threadPool = Executors.newFixedThreadPool( 5 ); |
1
2
3
4
5
|
public static ExecutorService newFixedThreadPool( int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } |
這個(gè)與單線程類(lèi)似,只是創(chuàng)建了固定大小的線程數(shù)量。
c、CachedThreadPool:無(wú)界線程池
1
|
ExecutorService threadPool = Executors.newCachedThreadPool(); |
1
2
3
4
5
|
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0 , Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } |
無(wú)界線程池意味著沒(méi)有工作隊(duì)列,任務(wù)進(jìn)來(lái)就執(zhí)行,線程數(shù)量不夠就創(chuàng)建,與前面兩個(gè)的區(qū)別是:空閑的線程會(huì)被回收掉,空閑的時(shí)間是60s。這個(gè)適用于執(zhí)行很多短期異步的小程序或者負(fù)載較輕的服務(wù)器。
Callable、Future、FutureTash詳解
Callable與Future是在JAVA的后續(xù)版本中引入進(jìn)來(lái)的,Callable類(lèi)似于Runnable接口,實(shí)現(xiàn)Callable接口的類(lèi)與實(shí)現(xiàn)Runnable的類(lèi)都是可以被線程執(zhí)行的任務(wù)。
三者之間的關(guān)系:
Callable是Runnable封裝的異步運(yùn)算任務(wù)。
Future用來(lái)保存Callable異步運(yùn)算的結(jié)果
FutureTask封裝Future的實(shí)體類(lèi)
1、Callable與Runnbale的區(qū)別
a、Callable定義的方法是call,而Runnable定義的方法是run。
b、call方法有返回值,而run方法是沒(méi)有返回值的。
c、call方法可以拋出異常,而run方法不能拋出異常。
2、Future
Future表示異步計(jì)算的結(jié)果,提供了以下方法,主要是判斷任務(wù)是否完成、中斷任務(wù)、獲取任務(wù)執(zhí)行結(jié)果
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public interface Future< V > { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } |
3、FutureTask
可取消的異步計(jì)算,此類(lèi)提供了對(duì)Future的基本實(shí)現(xiàn),僅在計(jì)算完成時(shí)才能獲取結(jié)果,如果計(jì)算尚未完成,則阻塞get方法。
1
|
public class FutureTask<V> implements RunnableFuture<V> |
1
|
public interface RunnableFuture<V> extends Runnable, Future<V> |
FutureTask不僅實(shí)現(xiàn)了Future接口,還實(shí)現(xiàn)了Runnable接口,所以不僅可以將FutureTask當(dāng)成一個(gè)任務(wù)交給Executor來(lái)執(zhí)行,還可以通過(guò)Thread來(lái)創(chuàng)建一個(gè)線程。
Callable與FutureTask
定義一個(gè)callable的任務(wù):
1
2
3
4
5
6
7
8
9
10
11
|
public class MyCallableTask implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println( "callable do somothing" ); Thread.sleep( 5000 ); return new Random().nextInt( 100 ); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class CallableTest { public static void main(String[] args) throws Exception { Callable<Integer> callable = new MyCallableTask(); FutureTask<Integer> future = new FutureTask<Integer>(callable); Thread thread = new Thread(future); thread.start(); Thread.sleep( 100 ); //嘗試取消對(duì)此任務(wù)的執(zhí)行 future.cancel( true ); //判斷是否在任務(wù)正常完成前取消 System.out.println( "future is cancel:" + future.isCancelled()); if (!future.isCancelled()) { System.out.println( "future is cancelled" ); } //判斷任務(wù)是否已完成 System.out.println( "future is done:" + future.isDone()); if (!future.isDone()) { System.out.println( "future get=" + future.get()); } else { //任務(wù)已完成 System.out.println( "task is done" ); } } } |
執(zhí)行結(jié)果:
1
2
3
4
|
callable do somothing future is cancel: true future is done: true task is done |
這個(gè)DEMO主要是通過(guò)調(diào)用FutureTask的狀態(tài)設(shè)置的方法,演示了狀態(tài)的變遷。
a、第11行,嘗試取消對(duì)任務(wù)的執(zhí)行,該方法如果由于任務(wù)已完成、已取消則返回false,如果能夠取消還未完成的任務(wù),則返回true,該DEMO中由于任務(wù)還在休眠狀態(tài),所以可以取消成功。
1
|
future.cancel( true ); |
b、第13行,判斷任務(wù)取消是否成功:如果在任務(wù)正常完成前將其取消,則返回true
1
|
System.out.println( "future is cancel:" + future.isCancelled()); |
c、第19行,判斷任務(wù)是否完成:如果任務(wù)完成,則返回true,以下幾種情況都屬于任務(wù)完成:正常終止、異常或者取消而完成。
我們的DEMO中,任務(wù)是由于取消而導(dǎo)致完成。
1
|
System.out.println( "future is done:" + future.isDone()); |
d、在第22行,獲取異步線程執(zhí)行的結(jié)果,我這個(gè)DEMO中沒(méi)有執(zhí)行到這里,需要注意的是,future.get方法會(huì)阻塞當(dāng)前線程, 直到任務(wù)執(zhí)行完成返回結(jié)果為止。
1
|
System.out.println( "future get=" + future.get()); |
Callable與Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class CallableThread implements Callable<String> { @Override public String call() throws Exception { System.out.println( "進(jìn)入Call方法,開(kāi)始休眠,休眠時(shí)間為:" + System.currentTimeMillis()); Thread.sleep( 10000 ); return "今天停電" ; } public static void main(String[] args) throws Exception { ExecutorService es = Executors.newSingleThreadExecutor(); Callable<String> call = new CallableThread(); Future<String> fu = es.submit(call); es.shutdown(); Thread.sleep( 5000 ); System.out.println( "主線程休眠5秒,當(dāng)前時(shí)間" + System.currentTimeMillis()); String str = fu.get(); System.out.println( "Future已拿到數(shù)據(jù),str=" + str + ";當(dāng)前時(shí)間為:" + System.currentTimeMillis()); } } |
執(zhí)行結(jié)果:
1
2
3
|
進(jìn)入Call方法,開(kāi)始休眠,休眠時(shí)間為: 1478606602676 主線程休眠 5 秒,當(dāng)前時(shí)間 1478606608676 Future已拿到數(shù)據(jù),str=今天停電;當(dāng)前時(shí)間為: 1478606612677 |
這里的future是直接扔到線程池里面去執(zhí)行的。由于要打印任務(wù)的執(zhí)行結(jié)果,所以從執(zhí)行結(jié)果來(lái)看,主線程雖然休眠了5s,但是從Call方法執(zhí)行到拿到任務(wù)的結(jié)果,這中間的時(shí)間差正好是10s,說(shuō)明get方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成。
通過(guò)FutureTask也可以達(dá)到同樣的效果:
1
2
3
4
5
6
7
8
9
10
11
12
|
public static void main(String[] args) throws Exception { ExecutorService es = Executors.newSingleThreadExecutor(); Callable<String> call = new CallableThread(); FutureTask<String> task = new FutureTask<String>(call); es.submit(task); es.shutdown(); Thread.sleep( 5000 ); System.out.println( "主線程等待5秒,當(dāng)前時(shí)間為:" + System.currentTimeMillis()); String str = task.get(); System.out.println( "Future已拿到數(shù)據(jù),str=" + str + ";當(dāng)前時(shí)間為:" + System.currentTimeMillis()); } |
以上的組合可以給我們帶來(lái)這樣的一些變化:
如有一種場(chǎng)景中,方法A返回一個(gè)數(shù)據(jù)需要10s,A方法后面的代碼運(yùn)行需要20s,但是這20s的執(zhí)行過(guò)程中,只有后面10s依賴(lài)于方法A執(zhí)行的結(jié)果。如果與以往一樣采用同步的方式,勢(shì)必會(huì)有10s的時(shí)間被浪費(fèi),如果采用前面兩種組合,則效率會(huì)提高:
1、先把A方法的內(nèi)容放到Callable實(shí)現(xiàn)類(lèi)的call()方法中
2、在主線程中通過(guò)線程池執(zhí)行A任務(wù)
3、執(zhí)行后面方法中10秒不依賴(lài)方法A運(yùn)行結(jié)果的代碼
4、獲取方法A的運(yùn)行結(jié)果,執(zhí)行后面方法中10秒依賴(lài)方法A運(yùn)行結(jié)果的代碼
這樣代碼執(zhí)行效率一下子就提高了,程序不必卡在A方法處。
感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!