国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - ThreadPoolExecutor線程池原理及其execute方法(詳解)

ThreadPoolExecutor線程池原理及其execute方法(詳解)

2020-11-19 10:47Java教程網 Java教程

下面小編就為大家帶來一篇ThreadPoolExecutor線程池原理及其execute方法(詳解)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

jdk1.7.0_79

對于線程池大部分人可能會用,也知道為什么用。無非就是任務需要異步執行,再者就是線程需要統一管理起來。對于從線程池中獲取線程,大部分人可能只知道,我現在需要一個線程來執行一個任務,那我就把任務丟到線程池里,線程池里有空閑的線程就執行,沒有空閑的線程就等待。實際上對于線程池的執行原理遠遠不止這么簡單。

Java并發包中提供了線程池類——ThreadPoolExecutor,實際上更多的我們可能用到的是Executors工廠類為我們提供的線程池newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,這三個線程池并不是ThreadPoolExecutor的子類,關于這幾者之間的關系,我們先來查看ThreadPoolExecutor,查看源碼發現其一共有4個構造方法。

?
1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

首先就從這幾個參數開始來了解線程池ThreadPoolExecutor的執行原理

corePoolSize:核心線程池的線程數量

maximumPoolSize:最大的線程池線程數量

keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。

unit:線程活動保持時間的單位。

workQueue:指定任務隊列所使用的阻塞隊列

corePoolSizemaximumPoolSize都在指定線程池中的線程數量,好像平時用到線程池的時候最多就只需要傳遞一個線程池大小的參數就能創建一個線程池啊,Java為我們提供了一些常用的線程池類就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,當然如果我們想要自己發揮創建自定義的線程池就得自己來“配置”有關線程池的一些參數。

當把一個任務交給線程池來處理的時候,線程池的執行原理如下圖所示參考自《Java并發編程的藝術》

ThreadPoolExecutor線程池原理及其execute方法(詳解)

首先會判斷核心線程池里是否有線程可執行,有空閑線程則創建一個線程來執行任務。

②當核心線程池里已經沒有線程可執行的時候,此時將任務丟到任務隊列中去。

③如果任務隊列(有界)也已經滿了的話,但運行的線程數小于最大線程池的數量的時候,此時將會新建一個線程用于執行任務,但如果運行的線程數已經達到最大線程池的數量的時候,此時將無法創建線程執行任務。

所以實際上對于線程池不僅是單純地將任務丟到線程池,線程池中有線程就執行任務,沒線程就等待。

為鞏固一下線程池的原理,現在再來了解上面提到的常用的3個線程池:

Executors.newFixedThreadPool創建一個固定數量線程的線程池。

?
1
2
3
4
// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

可以看到newFixedThreadPool中調用的是ThreadPoolExecutor類,傳遞的參數corePoolSize= maximumPoolSize=nThread。回顧線程池的執行原理,當一個任務提交到線程池中,首先判斷核心線程池里有沒有空閑線程,有則創建線程,沒有則將任務放到任務隊列(這里是有界阻塞隊列LinkedBlockingQueue)中,如果任務隊列已經滿了的話,對于newFixedThreadPool來說,它的最大線程池數量=核心線程池數量,此時任務隊列也滿了,將不能擴展創建新的線程來執行任務。

Executors.newSingleThreadExecutor:創建只包含一個線程的線程池。  

?
1
2
3
4
//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

只有一個線程的線程池好像有點奇怪,并且并沒有直接將返回ThreadPoolExecutor,甚至也沒有直接將線程池數量1傳遞給newFixedThreadPool返回。那就說明這個只含有一個線程的線程池,或許并沒有只包含一個線程那么簡單。在其源碼注釋中這么寫到:創建只有一個工作線程的線程池用于操作一個無界隊列(如果由于前驅節點的執行被終止結束了,一個新的線程將會繼續執行后繼節點線程)任務得以繼續執行,不同于newFixedThreadPool(1)不會有額外的線程來重新繼續執行后繼節點。也就是說newSingleThreadExecutor自始至終都只有一個線程在執行,這和newFixedThreadPool一樣,但如果線程終止結束過后newSingleThreadExecutor則會重新創建一個新的線程來繼續執行任務隊列中的線程,而newFixedThreaPool則不會。

Executors.newCachedThreadPool:根據需要創建新線程的線程池。

?
1
2
3
4
//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

可以看到newCachedThread返回的是ThreadPoolExecutor,其參數核心線程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,這也就是說當任務被提交到newCachedThread線程池時,將會直接把任務放到SynchronousQueue任務隊列中,maximumPool從任務隊列中獲取任務。注意SynchronousQueue是一個沒有容量的隊列,也就是說每個入隊操作必須等待另一個線程的對應出隊操作,如果主線程提交任務的速度高于maximumPool中線程處理任務的速度時,newCachedThreadPool會不斷創建線程,線程多并不是一件好事,嚴重會耗盡CPU和內存資源。

題外話:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,這三者都直接或間接調用了ThreadPoolExecutor,為什么它們三者沒有直接是其子類,而是通過Executors來實例化呢?這是所采用的靜態工廠方法,在java.util.Connections接口中同樣也是采用的靜態工廠方法來創建相關的類。這樣有很多好處,靜態工廠方法是用來產生對象的,產生什么對象沒關系,只要返回原返回類型或原返回類型的子類型都可以,降低API數目和使用難度,在《Effective Java》中的第1條就是靜態工廠方法。

回到ThreadPoolExecutor,首先來看它的繼承關系:

ThreadPoolExecutor線程池原理及其execute方法(詳解)

ThreadPoolExecutor它的頂級父類是Executor接口,只包含了一個方法——execute,這個方法也就是線程池的“執行”。

?
1
2
3
4
//Executor#execute
public interface Executor {
 void execute(Runnable command);
}

Executor#execute的實現則是在ThreadPoolExecutor中實現的:

?
1
2
3
4
5
6
7
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null)
  throw new NullPointerException();
  int c = ctl.get();
 
}

一來就碰到個不知所云的ctl變量它的定義:

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

這個變量使用來干嘛的呢?它的作用有點類似我們在《ReadWriteLock接口及其實現ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態,而AQS則只提供了state一個int型變量,此時將state高16位表示為讀狀態,低16位表示為寫狀態。這里的clt同樣也是,它表示了兩個概念:

workerCount:當前有效的線程數

runState:當前線程池的五種狀態,Running、Shutdown、Stop、Tidying、Terminate。

int型變量一共有32位,線程池的五種狀態runState至少需要3位來表示,故workCount只能有29位,所以代碼中規定線程池的有效線程數最多為229-1。

?
1
2
3
4
5
6
7
8
9
//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3//32-3=29,線程數量所占位數
private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大線程數,229-1
//五種線程池狀態
private static final int RUNNING = -1 << COUNT_BITS; /int型變量高3位(含符號位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000
private static final int STOP = 1 << COUNT_BITS; //高3位001
private static final int TIDYING = 2 << COUNT_BITS; //高3位010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位011

再次回到ThreadPoolExecutor#execute方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
   int c = ctl.get(); //由它可以獲取到當前有效的線程數和線程池的狀態
/*1.獲取當前正在運行線程數是否小于核心線程池,是則新創建一個線程執行任務,否則將任務放到任務隊列中*/
 if (workerCountOf(c) < corePoolSize){
  if (addWorker(command, tre))  //在addWorker中創建工作線程執行任務
   return ;
  c = ctl.get();
 }
/*2.當前核心線程池中全部線程都在運行workerCountOf(c) >= corePoolSize,所以此時將線程放到任務隊列中*/
 if (isRunning(c) && workQueue.offer(command)) { //線程池是否處于運行狀態,且是否任務插入任務隊列成功
  int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))  //線程池是否處于運行狀態,如果不是則使剛剛的任務出隊
       reject(command); //拋出RejectedExceptionException異常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
/*3.插入隊列不成功,且當前線程數數量小于最大線程池數量,此時則創建新線程執行任務,創建失敗拋出異常*/
  else if (!addWorker(command, false)){
    reject(command); //拋出RejectedExceptionException異常
  }
}

上面代碼注釋第7行的即判斷當前核心線程池里是否有空閑線程,有則通過addWorker方法創建工作線程執行任務。addWorker方法較長,篩選出重要的代碼來解析。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查線程池是否處于運行狀態,核心線程池中是否還有空閑線程,都滿足條件過后則會調用compareAndIncrementWorkerCount先將正在運行的線程數+1,數量自增成功則跳出循環,自增失敗則繼續從頭繼續循環*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
/*正在運行的線程數自增成功后則將線程封裝成工作線程Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;  //全局鎖
    w = new Woker(firstTask);  //將線程封裝為Worker工作線程
    final Thread t = w.thread;
    if (t != null) {
      mainLock.lock(); //獲取全局鎖
/*當持有了全局鎖的時候,還需要再次檢查線程池的運行狀態等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);  //線程池運行狀態
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){  //線程池處于運行狀態,或者線程池關閉且任務線程為空
          if (t.isAlive()) //線程處于活躍狀態,即線程已經開始執行或者還未死亡,正確的應線程在這里應該是還未開始執行的
            throw new IllegalThreadStateException();
          workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含線程池中所有的工作線程,只有在獲取了全局的時候才能訪問它。將新構造的工作線程加入到工作線程集合中
          int s = worker.size(); //工作線程數量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true; //新構造的工作線程加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start(); //在被構造為Worker工作線程,且被加入到工作線程集合中后,執行線程任務,注意這里的start實際上執行Worker中run方法,所以接下來分析Worker的run方法
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted) //未能成功創建執行工作線程
      addWorkerFailed(w); //在啟動工作線程失敗后,將工作線程從集合中移除
  }
  return workerStarted;
}

在上面第35代碼中,工作線程被成功添加到工作線程集合中后,則開始start執行,這里start執行的是Worker工作線程中的run方法。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1); //設置AQS的同步狀態為-1,禁止中斷,直到調用runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //通過線程工廠來創建一個線程,將自身作為Runnable傳遞傳遞
  }
  public void run() {
    runWorker(this); //運行工作線程
  }
}

ThreadPoolExecutor#runWorker,在此方法中,Worker在執行完任務后,還會循環獲取任務隊列里的任務執行(其中的getTask方法),也就是說Worker不僅僅是在執行完給它的任務就釋放或者結束,它不會閑著,而是繼續從任務隊列中獲取任務,直到任務隊列中沒有任務可執行時,它才退出循環完成任務。理解了以上的源碼過后,往后線程池執行原理的第二步、第三步的理解實則水到渠成。

以上這篇ThreadPoolExecutor線程池原理及其execute方法(詳解)就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 91精品国产综合久久久久久丝袜 | 日本激情网 | 一区二区日韩精品 | 亚洲深深色噜噜狠狠网站 | 亚洲色图一区二区三区 | 蜜桃一区二区 | 毛片久久久久久 | 黄色国产视频 | 国产成人精品一区二区在线 | 日韩影院一区 | 欧美视频免费看 | 中文字幕在线免费视频 | 北条麻妃99 | 伊人网站| 久久精品麻豆 | 精品国产欧美一区二区 | 日韩精品色| 一级做a爰片久久毛片免费陪 | 中文字幕日产乱码六区小草 | 久久亚洲精品中文字幕 | 91精品国产综合久久久久久丝袜 | 激情久久av一区av二区av三区 | 在线观看免费黄色 | 午夜电影网址 | 国产精品久久国产精品 | 国产中文视频 | av手机在线播放 | 日韩综合一区二区 | 日本精品一区二区三区在线观看视频 | 亚洲精品国产一区 | 日日操日日操 | 中文在线一区 | 91在线视频观看 | 黄网页在线观看 | 91久久精品国产91久久 | 欧美激情综合五月色丁香小说 | 在线看一区 | 久久久成人免费 | 国产91视频在线观看 | 日韩在线资源 | 亚洲精品视频在线观看免费视频 |