国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 并發(fā)編程之ThreadPoolExecutor線程池原理解析

并發(fā)編程之ThreadPoolExecutor線程池原理解析

2020-12-08 23:51今日頭條一角錢技術(shù) Java教程

在介紹線程池之前,我們先回顧下線程的基本知識。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時線程池 ,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。

 前言

在介紹線程池之前,我們先回顧下線程的基本知識。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時線程池 ,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。

線程

線程是調(diào)度CPU資源的最小單位,線程模型分為KLT模型與ULT模型,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關(guān)系,也就是說有一個Java線程也會在操作系統(tǒng)里有一個對應(yīng)的線程。

內(nèi)核線程模型

并發(fā)編程之ThreadPoolExecutor線程池原理解析

內(nèi)核線程(KLT):系統(tǒng)內(nèi)核管理線程(KLT),內(nèi)核保存線程的狀態(tài)和上下文信息,線程阻塞不會引起進(jìn)程阻塞。在多處理器系統(tǒng)上,多線程在多處理器上并行運(yùn)行。線程的創(chuàng)建、調(diào)度和管理由內(nèi)核完成,效率比ULT要慢,比進(jìn)程操作快。

用戶線程模型

并發(fā)編程之ThreadPoolExecutor線程池原理解析

用戶線程(ULT):用戶程序?qū)崿F(xiàn),不依賴操作系統(tǒng)核心,應(yīng)用提供創(chuàng)建、同步、調(diào)度和管理線程的函數(shù)來控制用戶線程。不需要用戶態(tài)/內(nèi)核態(tài)切換,速度快。內(nèi)核對ULT無感知,線程阻塞則進(jìn)程(包括它的所有線程)阻塞。

Java線程生命狀態(tài)

Java線程有多種生命狀態(tài):

  • NEW ,新建
  • RUNNABLE ,運(yùn)行
  • BLOCKED ,阻塞
  • WAITING ,等待
  • TIMED_WAITING ,超時等待
  • TERMINATED,終結(jié)

狀態(tài)切換如下圖所示:

并發(fā)編程之ThreadPoolExecutor線程池原理解析

Java線程實(shí)現(xiàn)方式

Java線程實(shí)現(xiàn)方式主要有四種:

  • 繼承Thread類
  • 實(shí)現(xiàn)Runnable接口、
  • 實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程、
  • 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程。

其中前兩種方式線程執(zhí)行完后都沒有返回值,后兩種是帶返回值的。

繼承Thread類創(chuàng)建線程

Thread類本質(zhì)上是實(shí)現(xiàn)了Runnable接口的一個實(shí)例,代表一個線程的實(shí)例。啟動線程的唯一方法就是通過Thread類的start()實(shí)例方法。start()方法是一個native方法,它將啟動一個新線程,并執(zhí)行run()方法。這種方式實(shí)現(xiàn)多線程很簡單,通過自己的類直接extend Thread,并復(fù)寫run()方法,就可以啟動新線程并執(zhí)行自己定義的run()方法。例如:

  1. public class MyThread extends Thread {   
  2.   public void run() {   
  3.    System.out.println("關(guān)注一角錢技術(shù),獲取Java架構(gòu)資料");   
  4.   }   
  5. }   
  6.   
  7. MyThread myThread1 = new MyThread();   
  8. MyThread myThread2 = new MyThread();   
  9. myThread1.start();   
  10. myThread2.start(); 

實(shí)現(xiàn)Runnable接口創(chuàng)建線程

如果自己的類已經(jīng)extends另一個類,就無法直接extends Thread,此時,可以實(shí)現(xiàn)一個Runnable接口,如下:

  1. // 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行,表示一個基本的任務(wù) 
  2. public interface Runnable { 
  3.     // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù) 
  4.     public abstract void run(); 

  1. public class MyThread implements Runnable {   
  2.   public void run() {   
  3.    System.out.println("關(guān)注一角錢技術(shù),獲取Java架構(gòu)資料");   
  4.   }   

為了啟動MyThread,需要首先實(shí)例化一個Thread,并傳入自己的MyThread實(shí)例:

  1. MyThread myThread = new MyThread();   
  2. Thread thread = new Thread(myThread);   
  3. thread.start();   

事實(shí)上,當(dāng)傳入一個Runnable target參數(shù)給Thread后,Thread的run()方法就會調(diào)用target.run(),參考JDK源代碼:

  1. public void run() {   
  2.   if (target != null) {   
  3.    target.run();   
  4.   }   

實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程

Callable接口(也只有一個方法)定義如下:

  1. public interface Callable<V> {  
  2.  V call() throws Exception;    
  3. }  

  1. //Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型,同時它執(zhí)行任務(wù)后帶有返回內(nèi)容 
  2. public class SomeCallable<V> implements Callable<V> { 
  3.  // 相對于run方法的帶有返回值的call方法 
  4.     @Override 
  5.     public V call() throws Exception { 
  6.         // TODO Auto-generated method stub 
  7.         return null
  8.     } 
  9.  

  1. Callable<V> oneCallable = new SomeCallable<V>();    
  2. //由Callable<Integer>創(chuàng)建一個FutureTask<Integer>對象:    
  3. FutureTask<V> oneTask = new FutureTask<V>(oneCallable); 
  4. //注釋:FutureTask<Integer>是一個包裝器,它通過接受Callable<Integer>來創(chuàng)建,它同時實(shí)現(xiàn)了Future和Runnable接口。 
  5. //由FutureTask<Integer>創(chuàng)建一個Thread對象:    
  6. Thread oneThread = new Thread(oneTask);    
  7. oneThread.start();    
  8. //至此,一個線程就創(chuàng)建完成了。 

使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程

ExecutorService、Callable、Future三個接口實(shí)際上都是屬于Executor框架。返回結(jié)果的線程是在JDK1.5中引入的新特征,有了這種特征就不需要再為了得到返回值而大費(fèi)周折了。而且自己實(shí)現(xiàn)了也可能漏洞百出。(下部分來講線程池了)

  • 可返回值的任務(wù)必須實(shí)現(xiàn)Callable接口。
  • 類似的,無返回值的任務(wù)必須實(shí)現(xiàn)Runnable接口。

執(zhí)行Callable任務(wù)后,可以獲取一個Future的對象,在該對象上調(diào)用get就可以獲取到Callable任務(wù)返回的Object了。

  • 注意:get方法是阻塞的,即:線程無返回結(jié)果,get方法會一直等待。

再結(jié)合線程池接口ExecutorService就可以實(shí)現(xiàn)傳說中有返回結(jié)果的多線程了。

下面提供了一個完整的有返回結(jié)果的多線程測試?yán)印4a如下:

  1. package com.niuh.thread.v4; 
  2.  
  3. import java.util.ArrayList; 
  4. import java.util.Date
  5. import java.util.List; 
  6. import java.util.concurrent.Callable; 
  7. import java.util.concurrent.ExecutionException; 
  8. import java.util.concurrent.ExecutorService; 
  9. import java.util.concurrent.Executors; 
  10. import java.util.concurrent.Future; 
  11.  
  12. /** 
  13.  * <p> 
  14.  * 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程 
  15.  * </p> 
  16.  */ 
  17. public class MyThread { 
  18.      
  19.     public static void main(String[] args) throws ExecutionException, 
  20.             InterruptedException { 
  21.  
  22.         System.out.println(("----程序開始運(yùn)行----")); 
  23.         Date date1 = new Date(); 
  24.  
  25.         int taskSize = 5; 
  26.         // 創(chuàng)建一個線程池 
  27.         ExecutorService pool = Executors.newFixedThreadPool(taskSize); 
  28.         // 創(chuàng)建多個有返回值的任務(wù) 
  29.         List<Future> list = new ArrayList<Future>(); 
  30.         for (int i = 0; i < taskSize; i++) { 
  31.             Callable c = new MyCallable(i + " "); 
  32.             // 執(zhí)行任務(wù)并獲取Future對象 
  33.             Future f = pool.submit(c); 
  34.             // System.out.println(">>>" + f.get().toString()); 
  35.             list.add(f); 
  36.         } 
  37.         // 關(guān)閉線程池 
  38.         pool.shutdown(); 
  39.  
  40.         // 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果 
  41.         for (Future f : list) { 
  42.             // 從Future對象上獲取任務(wù)的返回值,并輸出到控制臺 
  43.             System.out.println(">>>" + f.get().toString()); 
  44.         } 
  45.  
  46.         Date date2 = new Date(); 
  47.         System.out.println("----程序結(jié)束運(yùn)行----,程序運(yùn)行時間【" 
  48.                 + (date2.getTime() - date1.getTime()) + "毫秒】"); 
  49.     } 
  50.  
  51. class MyCallable implements Callable<Object> { 
  52.     private String taskNum; 
  53.  
  54.     MyCallable(String taskNum) { 
  55.         this.taskNum = taskNum; 
  56.     } 
  57.  
  58.     public Object call() throws Exception { 
  59.         System.out.println(">>>" + taskNum + "任務(wù)啟動"); 
  60.         Date dateTmp1 = new Date(); 
  61.         Thread.sleep(1000); 
  62.         Date dateTmp2 = new Date(); 
  63.         long time = dateTmp2.getTime() - dateTmp1.getTime(); 
  64.         System.out.println(">>>" + taskNum + "任務(wù)終止"); 
  65.         return taskNum + "任務(wù)返回運(yùn)行結(jié)果,當(dāng)前任務(wù)時間【" + time + "毫秒】"
  66.     } 

協(xié)程

協(xié)程(纖程,用戶級線程),目的是為了追求最大力度的發(fā)揮硬件性能和提升軟件的速度,協(xié)程基本原理是:在某個點(diǎn)掛起當(dāng)前的任務(wù),并且保存棧信息,去執(zhí)行另一個任務(wù);等完成或達(dá)到某個條件時,再還原原來的棧信息并繼續(xù)執(zhí)行(整個過程不需要上下文切換)。

協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用。

協(xié)程的目的:當(dāng)我們在使用多線程的時候,如果存在長時間的I/O操作。這個時候線程一直處于阻塞狀態(tài),如果線程很多的時候,會存在很多線程處于空閑狀態(tài),造成了資源應(yīng)用不徹底。相對的協(xié)程不一樣了,在單線程中多個任務(wù)來回執(zhí)行如果出現(xiàn)長時間的I/O操作,讓其讓出目前的協(xié)程調(diào)度,執(zhí)行下一個任務(wù)。當(dāng)然可能所有任務(wù),全部卡在同一個點(diǎn)上,但是這只是針對于單線程而言,當(dāng)所有數(shù)據(jù)正常返回時,會同時處理當(dāng)前的I/O操作。

Java原生不支持協(xié)程,在純java代碼里需要使用協(xié)程的話需要引入第三方包,如:quasar

  1. <dependency> 
  2.  <groupId>co.paralleluniverse</groupId> 
  3.  <artifactId>quasar-core</artifactId> 
  4.  <version>0.8.0</version> 
  5.  <classifier>jdk8</classifier> 
  6. </dependency> 

線程池

“線程池”,顧名思義就是一個線程緩存,線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,因此 Java 中提供線程池對線程進(jìn)行同一分配、調(diào)優(yōu)和監(jiān)控。

線程池介紹

在web開發(fā)中,服務(wù)器需要接受并處理請求,所以會為一個請求分配一個線程來進(jìn)行處理。如果每次請求都創(chuàng)建一個線程的話實(shí)現(xiàn)起來非常簡單,但是存在一個問題:如果并發(fā)的請求數(shù)量非常多,但每個線程執(zhí)行的時間很短,這樣就會頻繁的創(chuàng)建和銷毀線程,如此一來會大大降低系統(tǒng)的效率。可能出現(xiàn)服務(wù)器在為每個請求創(chuàng)建新線程和銷毀線程上花費(fèi)的時間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請求的時間和資源更多。

那么有沒有一種辦法使執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的目的。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€任務(wù)上。

什么時候使用線程池?

  • 單個任務(wù)處理時間比較短;
  • 需要處理的任務(wù)數(shù)量很大。

線程池優(yōu)勢

  • 重用存在的線程。減少線程黃金、消亡的開銷,提高性能;
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行;
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行同一的分配、調(diào)優(yōu)和監(jiān)控。

Executor框架

Executor接口是線程池框架中最基礎(chǔ)的部分,定義來一個用于執(zhí)行 Runnable 的 execute 方法。下面為它的繼承與實(shí)現(xiàn)

并發(fā)編程之ThreadPoolExecutor線程池原理解析

ExecutorService接口

從圖中可以看出 Executor 下有一個重要的子接口 ExecutorService ,其中定義來線程池的具體行為

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  • execute(Runnable command):履行Ruannable類型的任務(wù);
  • submit(task):可用來提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future對象;
  • shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù);
  • shutdownNow():停止所有正在履行的任務(wù)并封閉辦事;
  • isTerminated():測試是否所有任務(wù)都履行完畢了;
  • isShutdown():測試是否該ExecutorService已被關(guān)閉;
  • awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個參數(shù),用于設(shè)定超時時間及單位。當(dāng)?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用;
  • invokeAll :作用是等待所有的任務(wù)執(zhí)行完成后統(tǒng)一返回;
  • invokeAny :將第一個得到的結(jié)果作為返回值,然后立刻終止所有的線程。如果設(shè)置了超時時間,未超時完成則正常返回結(jié)果,如果超時未完成則報超時異常。

AbstractExcutorService抽象類

此類的定義并沒有特殊的意義僅僅是實(shí)現(xiàn)了ExecutorService接口

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  1. public abstract class AbstractExecutorService implements ExecutorService { 
  2.     //此方法很簡單就是對runnable保證,將其包裝為一個FutureTask 
  3.     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
  4.         return new FutureTask<T>(runnable, value); 
  5.     } 
  6.     //包裝callable為FutureTask 
  7.     //FutureTask其實(shí)就是對Callable的一個封裝 
  8.     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
  9.         return new FutureTask<T>(callable); 
  10.     } 
  11.     //提交一個Runnable類型的任務(wù) 
  12.     public Future<?> submit(Runnable task) { 
  13.         //如果為null則拋出NPE 
  14.         if (task == null) throw new NullPointerException(); 
  15.         //包裝任務(wù)為一個Future 
  16.         RunnableFuture<Void> ftask = newTaskFor(task, null); 
  17.         //將任務(wù)丟給執(zhí)行器,而此處會拋出拒絕異常,在講述ThreadPoolExecutor的時候有講述,不記得的讀者可以去再看看 
  18.         execute(ftask); 
  19.         return ftask; 
  20.     } 
  21.  
  22.     //與上方方法相同只不過指定了返回結(jié)果 
  23.     public <T> Future<T> submit(Runnable task, T result) { 
  24.         if (task == null) throw new NullPointerException(); 
  25.         RunnableFuture<T> ftask = newTaskFor(task, result); 
  26.         execute(ftask); 
  27.         return ftask; 
  28.     } 
  29.     //與上方方法相同只是換成了callable 
  30.     public <T> Future<T> submit(Callable<T> task) { 
  31.         if (task == null) throw new NullPointerException(); 
  32.         RunnableFuture<T> ftask = newTaskFor(task); 
  33.         execute(ftask); 
  34.         return ftask; 
  35.     } 
  36.  
  37.     //執(zhí)行集合tasks結(jié)果是最后一個執(zhí)行結(jié)束的任務(wù)結(jié)果 
  38.     //可以設(shè)置超時 timed為true并且nanos是未來的一個時間 
  39.     //任何一個任務(wù)完成都將會返回結(jié)果 
  40.     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 
  41.                               boolean timed, long nanos) 
  42.         throws InterruptedException, ExecutionException, TimeoutException { 
  43.         //傳入的任務(wù)集合不能為null 
  44.         if (tasks == null
  45.             throw new NullPointerException(); 
  46.         //傳入的任務(wù)數(shù)不能是0 
  47.         int ntasks = tasks.size(); 
  48.         if (ntasks == 0) 
  49.             throw new IllegalArgumentException(); 
  50.         //滿足上面的校驗后將任務(wù)分裝到一個ArrayList中 
  51.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 
  52.         //并且創(chuàng)建一個執(zhí)行器傳入this 
  53.         //這里簡單講述他的執(zhí)行原理,傳入this會使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù),當(dāng)submit提交任務(wù)的時候回將任務(wù) 
  54.         //封裝為一個內(nèi)部的Future并且重寫他的done而此方法就是在future完成的時候調(diào)用的,而他的寫法則是將當(dāng)前完成的future添加到esc 
  55.         //維護(hù)的結(jié)果隊列中 
  56.         ExecutorCompletionService<T> ecs = 
  57.             new ExecutorCompletionService<T>(this); 
  58.  
  59.         try { 
  60.             //創(chuàng)建一個執(zhí)行異常,以便后面拋出 
  61.             ExecutionException ee = null
  62.             //如果開啟了超時則計算死線時間如果時間是0則代表沒有開啟執(zhí)行超時 
  63.             final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  64.             //獲取任務(wù)的迭代器 
  65.             Iterator<? extends Callable<T>> it = tasks.iterator(); 
  66.             //先獲取迭代器中的第一個任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器 
  67.             futures.add(ecs.submit(it.next())); 
  68.             //前面記錄的任務(wù)數(shù)減一 
  69.             --ntasks; 
  70.             //當(dāng)前激活數(shù)為1 
  71.             int active = 1; 
  72.             //進(jìn)入死循環(huán) 
  73.             for (;;) { 
  74.                 //獲取剛才提價的任務(wù)是否完成如果完成則f不是null否則為null 
  75.                 Future<T> f = ecs.poll(); 
  76.                 //如果為null則代表任務(wù)還在繼續(xù) 
  77.                 if (f == null) { 
  78.                     //如果當(dāng)前任務(wù)大于0 說明除了剛才的任務(wù)還有別的任務(wù)存在 
  79.                     if (ntasks > 0) { 
  80.                         //則任務(wù)數(shù)減一 
  81.                         --ntasks; 
  82.                         //并且再次提交新的任務(wù) 
  83.                         futures.add(ecs.submit(it.next())); 
  84.                         //當(dāng)前的存活的執(zhí)行任務(wù)加一 
  85.                         ++active; 
  86.                     } 
  87.                     //如果當(dāng)前存活任務(wù)數(shù)是0則代表沒有任務(wù)在執(zhí)行了從而跳出循環(huán) 
  88.                     else if (active == 0) 
  89.                         break; 
  90.                     //如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時時間 
  91.                     else if (timed) { 
  92.                         //則設(shè)置指定的超時時間獲取 
  93.                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 
  94.                         //等待執(zhí)行超時還沒有獲取到則拋出超時異常 
  95.                         if (f == null
  96.                             throw new TimeoutException(); 
  97.                         //否則使用當(dāng)前時間計算剩下的超時時間用于下一個循環(huán)使用 
  98.                         nanos = deadline - System.nanoTime(); 
  99.                     } 
  100.                     //如果沒有設(shè)置超時則直接獲取任務(wù) 
  101.                     else 
  102.                         f = ecs.take(); 
  103.                 } 
  104.                 //如果獲取到了任務(wù)結(jié)果f!=null 
  105.                 if (f != null) { 
  106.                     //激活數(shù)減一 
  107.                     --active; 
  108.                     try { 
  109.                         //返回獲取到的結(jié)果 
  110.                         return f.get(); 
  111.                         //如果獲取結(jié)果出錯則包裝異常 
  112.                     } catch (ExecutionException eex) { 
  113.                         ee = eex; 
  114.                     } catch (RuntimeException rex) { 
  115.                         ee = new ExecutionException(rex); 
  116.                     } 
  117.                 } 
  118.             } 
  119.             //如果異常不是null則拋出如果是則創(chuàng)建一個 
  120.             if (ee == null
  121.                 ee = new ExecutionException(); 
  122.             throw ee; 
  123.  
  124.         } finally { 
  125.             //其他任務(wù)則設(shè)置取消 
  126.             for (int i = 0, size = futures.size(); i < size; i++) 
  127.                 futures.get(i).cancel(true); 
  128.         } 
  129.     } 
  130.     //對上方方法的封裝 
  131.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
  132.         throws InterruptedException, ExecutionException { 
  133.         try { 
  134.             return doInvokeAny(tasks, false, 0); 
  135.         } catch (TimeoutException cannotHappen) { 
  136.             assert false
  137.             return null
  138.         } 
  139.     } 
  140.     //對上方法的封裝 
  141.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 
  142.                            long timeout, TimeUnit unit) 
  143.         throws InterruptedException, ExecutionException, TimeoutException { 
  144.         return doInvokeAny(tasks, true, unit.toNanos(timeout)); 
  145.     } 
  146.     //相對于上一個方法執(zhí)行成功任何一個則返回結(jié)果而此方法是全部執(zhí)行完然后統(tǒng)一返回結(jié)果 
  147.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
  148.         throws InterruptedException { 
  149.         //傳入的任務(wù)集合不能是null 
  150.         if (tasks == null
  151.             throw new NullPointerException(); 
  152.         //創(chuàng)建一個集合用來保存獲取到的執(zhí)行future 
  153.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  154.         //任務(wù)是否執(zhí)行完成 
  155.         boolean done = false
  156.         try { 
  157.             //遍歷傳入的任務(wù)并且調(diào)用執(zhí)行方法將創(chuàng)建的future添加到集合中 
  158.             for (Callable<T> t : tasks) { 
  159.                 RunnableFuture<T> f = newTaskFor(t); 
  160.                 futures.add(f); 
  161.                 execute(f); 
  162.             } 
  163.             //遍歷獲取到的future 
  164.             for (int i = 0, size = futures.size(); i < size; i++) { 
  165.                 Future<T> f = futures.get(i); 
  166.                 //如果當(dāng)前任務(wù)沒有成功則進(jìn)行f.get方法等待此方法執(zhí)行成功,如果方法執(zhí)行異常或者被取消將忽略異常 
  167.                 if (!f.isDone()) { 
  168.                     try { 
  169.                         f.get(); 
  170.                     } catch (CancellationException ignore) { 
  171.                     } catch (ExecutionException ignore) { 
  172.                     } 
  173.                 } 
  174.             } 
  175.             //到這一步則代表所有的任務(wù)都已經(jīng)有了確切的結(jié)果 
  176.             done = true
  177.             //返回任務(wù)結(jié)果集合 
  178.             return futures; 
  179.         } finally { 
  180.             //如果不是truefalse 則代表執(zhí)行過程中被中斷了則需要對任務(wù)進(jìn)行取消操作,如果正常完成則不會被取消 
  181.             if (!done) 
  182.                 for (int i = 0, size = futures.size(); i < size; i++) 
  183.                     futures.get(i).cancel(true); 
  184.         } 
  185.     } 
  186.     //與上方方法的區(qū)別在于對于任務(wù)集合可以設(shè)置超時時間 
  187.     //這里會針對差異進(jìn)行講解 
  188.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 
  189.                                          long timeout, TimeUnit unit) 
  190.         throws InterruptedException { 
  191.         if (tasks == null
  192.             throw new NullPointerException(); 
  193.         //計算設(shè)置時長的納秒時間 
  194.         long nanos = unit.toNanos(timeout); 
  195.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  196.         boolean done = false
  197.         try { 
  198.             for (Callable<T> t : tasks) 
  199.                 futures.add(newTaskFor(t)); 
  200.             //計算最終計算的確切時間點(diǎn),運(yùn)行時長不能超過此時間也就是時間死線 
  201.             //這里是個細(xì)節(jié)future創(chuàng)建的時間并沒有算作執(zhí)行時間 
  202.             final long deadline = System.nanoTime() + nanos; 
  203.             //獲取當(dāng)前結(jié)果數(shù) 
  204.             final int size = futures.size(); 
  205.             //遍歷將任務(wù)進(jìn)行執(zhí)行 
  206.             for (int i = 0; i < size; i++) { 
  207.                 execute((Runnable)futures.get(i)); 
  208.                 //并且每次都計算死線 
  209.                 nanos = deadline - System.nanoTime(); 
  210.                 //如果時間已經(jīng)超過則返回結(jié)果 
  211.                 if (nanos <= 0L) 
  212.                     return futures; 
  213.             } 
  214.             //否則遍歷future確定每次執(zhí)行都獲取到了結(jié)果 
  215.             for (int i = 0; i < size; i++) { 
  216.                 Future<T> f = futures.get(i); 
  217.                 if (!f.isDone()) { 
  218.                     //如果在等待過程中已經(jīng)超時則返回當(dāng)前等待結(jié)合 
  219.                     if (nanos <= 0L) 
  220.                         return futures; 
  221.                     try { 
  222.                         //如果沒有超過死線則設(shè)置從future中獲取結(jié)果的時間如果超過則會派出timeout 
  223.                         f.get(nanos, TimeUnit.NANOSECONDS); 
  224.                     } catch (CancellationException ignore) { 
  225.                     } catch (ExecutionException ignore) { 
  226.                     } catch (TimeoutException toe) { 
  227.                         //拋出了異常則會返回當(dāng)前的列表 
  228.                         return futures; 
  229.                     } 
  230.                     //計算最新的超時時間 
  231.                     nanos = deadline - System.nanoTime(); 
  232.                 } 
  233.             } 
  234.             //之前的返回都沒有設(shè)置為true所以在finally中都會設(shè)置為取消唯獨(dú)正常執(zhí)行完成到此處返回的結(jié)果才是最終的結(jié)果 
  235.             done = true
  236.             return futures; 
  237.         } finally { 
  238.             if (!done) 
  239.                 for (int i = 0, size = futures.size(); i < size; i++) 
  240.                     futures.get(i).cancel(true); 
  241.         } 
  242.     } 
  243.  

線程池的具體實(shí)現(xiàn)

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  • ThreadPoolExecutor 默認(rèn)線程池
  • ScheduledThreadPoolExecutor 定時線程池 (下篇再做介紹)

ThreadPoolExecutor

線程池重點(diǎn)屬性

  1. //用來標(biāo)記線程池狀態(tài)(高3位),線程個數(shù)(低29位) 
  2. //默認(rèn)是RUNNING狀態(tài),線程個數(shù)為0 
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  4.  
  5. //線程個數(shù)掩碼位數(shù),并不是所有平臺int類型是32位,所以準(zhǔn)確說是具體平臺下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個數(shù), 
  6. private static final int COUNT_BITS = Integer.SIZE - 3; 
  7.  
  8. //線程最大個數(shù)(低29位)000 11111111111111111111111111111 
  9. private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 

ctl 是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

ctl相關(guān)方法

  • runStateOf:獲取運(yùn)行狀態(tài);
  • workerCountOf:獲取活動線程數(shù);
  • ctlOf:獲取運(yùn)行狀態(tài)和活動線程數(shù)的值。
  1. / 獲取高三位 運(yùn)行狀態(tài) 
  2. private static int runStateOf(int c)     { return c & ~CAPACITY; } 
  3.  
  4. //獲取低29位 線程個數(shù) 
  5. private static int workerCountOf(int c)  { return c & CAPACITY; } 
  6.  
  7. //計算ctl新值,線程狀態(tài) 與 線程個數(shù) 
  8. private static int ctlOf(int rs, int wc) { return rs | wc; } 

線程池存在5種狀態(tài)

  1. //運(yùn)行中 111 00000000000000000000000000000 
  2. private static final int RUNNING    = -1 << COUNT_BITS; 
  3. //關(guān)閉 000 00000000000000000000000000000 
  4. private static final int SHUTDOWN   =  0 << COUNT_BITS; 
  5. //停止 001 00000000000000000000000000000 
  6. private static final int STOP       =  1 << COUNT_BITS; 
  7. //整理 010 00000000000000000000000000000 
  8. private static final int TIDYING    =  2 << COUNT_BITS; 
  9. //終止 011 00000000000000000000000000000 
  10. private static final int TERMINATED =  3 << COUNT_BITS; 

使用一個整形,前3位表示狀態(tài),后29位表示線程容量,也就是說線程最多有 230−1 個

并發(fā)編程之ThreadPoolExecutor線程池原理解析

也可以看出當(dāng)ctl小于零表示線程池仍在運(yùn)行

RUNNING

  • 狀態(tài)說明:線程池處在RUNNING狀態(tài)時,能夠接收新任務(wù),以及對已添加的任務(wù)進(jìn)行處理。
  • 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!

SHUTDOWN

  • 狀態(tài)說明:線程池處在SHUTDOWN狀態(tài)時,不接收新任務(wù),但能處理已添加的任務(wù)。
  • 狀態(tài)切換:調(diào)用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。

STOP

  • 狀態(tài)說明:線程池處在STOP狀態(tài)時,不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。
  • 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 狀態(tài)說明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進(jìn)行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實(shí)現(xiàn)。
  • 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊列為空并且線程池中執(zhí)行的任務(wù)也為空時,就會由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時,就會由STOP -> TIDYING。

TERMINATED

  • 狀態(tài)說明:線程池徹底終止,就變成TERMINATED狀態(tài)。
  • 狀態(tài)切換:線程池處在TIDYING狀態(tài)時,執(zhí)行完terminated()之后,就會由 TIDYING -> TERMINATED。

進(jìn)入TERMINATED的條件如下:

  • 線程池不是RUNNING狀態(tài);
  • 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
  • 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
  • workerCount為0;
  • 設(shè)置TIDYING狀態(tài)成功。
并發(fā)編程之ThreadPoolExecutor線程池原理解析

線程池參數(shù)

corePoolSize

線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。

maximumPoolSize

線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

keepAliveTim

線程池維護(hù)線程所允許的空閑時間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;

unit

keepAliveTime的單位;

workQueue

用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊列:

1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);

2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列;

threadFactory

它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護(hù)線程,同時也設(shè)置了線程的名稱。

handler

線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常,默認(rèn)策略;
  2. CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
  4. DiscardPolicy:直接丟棄任務(wù);

上面的4種策略都是ThreadPoolExecutor的內(nèi)部類。

并發(fā)編程之ThreadPoolExecutor線程池原理解析

當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。

線程池的創(chuàng)建

有四個構(gòu)造函數(shù),其他三個都是調(diào)用下面代碼中的這個構(gòu)造函數(shù)

  1. public ThreadPoolExecutor(int corePoolSize, 
  2.                           int maximumPoolSize, 
  3.                           long keepAliveTime, 
  4.                           TimeUnit unit, 
  5.                           BlockingQueue<Runnable> workQueue, 
  6.                           ThreadFactory threadFactory, 
  7.                           RejectedExecutionHandler handler)  

線程池監(jiān)控

  1. public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù) 
  2. public long getCompletedTaskCount() //已完成的任務(wù)數(shù) 
  3. public int getPoolSize() //線程池當(dāng)前的線程數(shù) 
  4. public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量 

線程池原理

并發(fā)編程之ThreadPoolExecutor線程池原理解析

核心方法分析

由于篇幅有限,核心方法解析請閱讀文末的擴(kuò)展鏈接。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

原文地址:https://www.toutiao.com/i6903537887346819595/

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日韩码有限公司在线观看 | 日本中文字幕在线播放 | 成人欧美一区二区三区在线观看 | 一区二区三区视频在线观看 | 亚洲精品999| 犬夜叉在线观看 | 欧美不卡| 欧美色欧美亚洲另类七区 | 欧美午夜精品久久久久久蜜 | 亚洲精品男人的天堂 | 国产精品久久久久久久久久久天堂 | 婷婷五月在线视频 | 美女久久久 | 欧美第一页 | 色日韩 | 爱爱综合网 | 日日夜夜精品免费视频 | 国产aaaaav久久久一区二区 | 一区二区三区精品视频 | 97精品国产一区二区三区 | 黄色免费网 | 久久久精品观看 | 夜夜骑日日操 | 欧美日韩高清在线 | 999久久久国产999久久久 | 精品免费视频 | 成人免费毛片aaaaaa片 | 欧美综合一区二区三区 | 欧美日韩在线观看视频 | a视频在线 | 久久1区 | 欧美一级片在线 | 91成人小视频 | 国产一级纯肉体一级毛片 | 亚洲精品久久久久久久久久久久久 | 成人国产在线 | 偷拍第一页 | 精品久久久久久久久久久久久久久久久久久 | 日日日日干干干干 | 国产综合中文字幕 | 亚洲网站视频 |