前言
在介紹線程池之前,我們先回顧下線程的基本知識。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時線程池 ,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。
線程
線程是調(diào)度CPU資源的最小單位,線程模型分為KLT模型與ULT模型,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關(guān)系,也就是說有一個Java線程也會在操作系統(tǒng)里有一個對應(yīng)的線程。
內(nèi)核線程模型

內(nèi)核線程(KLT):系統(tǒng)內(nèi)核管理線程(KLT),內(nèi)核保存線程的狀態(tài)和上下文信息,線程阻塞不會引起進(jìn)程阻塞。在多處理器系統(tǒng)上,多線程在多處理器上并行運(yùn)行。線程的創(chuàng)建、調(diào)度和管理由內(nèi)核完成,效率比ULT要慢,比進(jìn)程操作快。
用戶線程模型

用戶線程(ULT):用戶程序?qū)崿F(xiàn),不依賴操作系統(tǒng)核心,應(yīng)用提供創(chuàng)建、同步、調(diào)度和管理線程的函數(shù)來控制用戶線程。不需要用戶態(tài)/內(nèi)核態(tài)切換,速度快。內(nèi)核對ULT無感知,線程阻塞則進(jìn)程(包括它的所有線程)阻塞。
Java線程生命狀態(tài)
Java線程有多種生命狀態(tài):
- NEW ,新建
- RUNNABLE ,運(yùn)行
- BLOCKED ,阻塞
- WAITING ,等待
- TIMED_WAITING ,超時等待
- TERMINATED,終結(jié)
狀態(tài)切換如下圖所示:

Java線程實(shí)現(xiàn)方式
Java線程實(shí)現(xiàn)方式主要有四種:
- 繼承Thread類
- 實(shí)現(xiàn)Runnable接口、
- 實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程、
- 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程。
其中前兩種方式線程執(zhí)行完后都沒有返回值,后兩種是帶返回值的。
繼承Thread類創(chuàng)建線程
Thread類本質(zhì)上是實(shí)現(xiàn)了Runnable接口的一個實(shí)例,代表一個線程的實(shí)例。啟動線程的唯一方法就是通過Thread類的start()實(shí)例方法。start()方法是一個native方法,它將啟動一個新線程,并執(zhí)行run()方法。這種方式實(shí)現(xiàn)多線程很簡單,通過自己的類直接extend Thread,并復(fù)寫run()方法,就可以啟動新線程并執(zhí)行自己定義的run()方法。例如:
- public class MyThread extends Thread {
- public void run() {
- System.out.println("關(guān)注一角錢技術(shù),獲取Java架構(gòu)資料");
- }
- }
- MyThread myThread1 = new MyThread();
- MyThread myThread2 = new MyThread();
- myThread1.start();
- myThread2.start();
實(shí)現(xiàn)Runnable接口創(chuàng)建線程
如果自己的類已經(jīng)extends另一個類,就無法直接extends Thread,此時,可以實(shí)現(xiàn)一個Runnable接口,如下:
- // 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行,表示一個基本的任務(wù)
- public interface Runnable {
- // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù)
- public abstract void run();
- }
- public class MyThread implements Runnable {
- public void run() {
- System.out.println("關(guān)注一角錢技術(shù),獲取Java架構(gòu)資料");
- }
- }
為了啟動MyThread,需要首先實(shí)例化一個Thread,并傳入自己的MyThread實(shí)例:
- MyThread myThread = new MyThread();
- Thread thread = new Thread(myThread);
- thread.start();
事實(shí)上,當(dāng)傳入一個Runnable target參數(shù)給Thread后,Thread的run()方法就會調(diào)用target.run(),參考JDK源代碼:
- public void run() {
- if (target != null) {
- target.run();
- }
- }
實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程
Callable接口(也只有一個方法)定義如下:
- public interface Callable<V> {
- V call() throws Exception;
- }
- //Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型,同時它執(zhí)行任務(wù)后帶有返回內(nèi)容
- public class SomeCallable<V> implements Callable<V> {
- // 相對于run方法的帶有返回值的call方法
- @Override
- public V call() throws Exception {
- // TODO Auto-generated method stub
- return null;
- }
- }
- Callable<V> oneCallable = new SomeCallable<V>();
- //由Callable<Integer>創(chuàng)建一個FutureTask<Integer>對象:
- FutureTask<V> oneTask = new FutureTask<V>(oneCallable);
- //注釋:FutureTask<Integer>是一個包裝器,它通過接受Callable<Integer>來創(chuàng)建,它同時實(shí)現(xiàn)了Future和Runnable接口。
- //由FutureTask<Integer>創(chuàng)建一個Thread對象:
- Thread oneThread = new Thread(oneTask);
- oneThread.start();
- //至此,一個線程就創(chuàng)建完成了。
使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程
ExecutorService、Callable、Future三個接口實(shí)際上都是屬于Executor框架。返回結(jié)果的線程是在JDK1.5中引入的新特征,有了這種特征就不需要再為了得到返回值而大費(fèi)周折了。而且自己實(shí)現(xiàn)了也可能漏洞百出。(下部分來講線程池了)
- 可返回值的任務(wù)必須實(shí)現(xiàn)Callable接口。
- 類似的,無返回值的任務(wù)必須實(shí)現(xiàn)Runnable接口。
執(zhí)行Callable任務(wù)后,可以獲取一個Future的對象,在該對象上調(diào)用get就可以獲取到Callable任務(wù)返回的Object了。
- 注意:get方法是阻塞的,即:線程無返回結(jié)果,get方法會一直等待。
再結(jié)合線程池接口ExecutorService就可以實(shí)現(xiàn)傳說中有返回結(jié)果的多線程了。
下面提供了一個完整的有返回結(jié)果的多線程測試?yán)印4a如下:
- package com.niuh.thread.v4;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- /**
- * <p>
- * 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程
- * </p>
- */
- public class MyThread {
- public static void main(String[] args) throws ExecutionException,
- InterruptedException {
- System.out.println(("----程序開始運(yùn)行----"));
- Date date1 = new Date();
- int taskSize = 5;
- // 創(chuàng)建一個線程池
- ExecutorService pool = Executors.newFixedThreadPool(taskSize);
- // 創(chuàng)建多個有返回值的任務(wù)
- List<Future> list = new ArrayList<Future>();
- for (int i = 0; i < taskSize; i++) {
- Callable c = new MyCallable(i + " ");
- // 執(zhí)行任務(wù)并獲取Future對象
- Future f = pool.submit(c);
- // System.out.println(">>>" + f.get().toString());
- list.add(f);
- }
- // 關(guān)閉線程池
- pool.shutdown();
- // 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果
- for (Future f : list) {
- // 從Future對象上獲取任務(wù)的返回值,并輸出到控制臺
- System.out.println(">>>" + f.get().toString());
- }
- Date date2 = new Date();
- System.out.println("----程序結(jié)束運(yùn)行----,程序運(yùn)行時間【"
- + (date2.getTime() - date1.getTime()) + "毫秒】");
- }
- }
- class MyCallable implements Callable<Object> {
- private String taskNum;
- MyCallable(String taskNum) {
- this.taskNum = taskNum;
- }
- public Object call() throws Exception {
- System.out.println(">>>" + taskNum + "任務(wù)啟動");
- Date dateTmp1 = new Date();
- Thread.sleep(1000);
- Date dateTmp2 = new Date();
- long time = dateTmp2.getTime() - dateTmp1.getTime();
- System.out.println(">>>" + taskNum + "任務(wù)終止");
- return taskNum + "任務(wù)返回運(yùn)行結(jié)果,當(dāng)前任務(wù)時間【" + time + "毫秒】";
- }
- }
協(xié)程
協(xié)程(纖程,用戶級線程),目的是為了追求最大力度的發(fā)揮硬件性能和提升軟件的速度,協(xié)程基本原理是:在某個點(diǎn)掛起當(dāng)前的任務(wù),并且保存棧信息,去執(zhí)行另一個任務(wù);等完成或達(dá)到某個條件時,再還原原來的棧信息并繼續(xù)執(zhí)行(整個過程不需要上下文切換)。
協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用。
協(xié)程的目的:當(dāng)我們在使用多線程的時候,如果存在長時間的I/O操作。這個時候線程一直處于阻塞狀態(tài),如果線程很多的時候,會存在很多線程處于空閑狀態(tài),造成了資源應(yīng)用不徹底。相對的協(xié)程不一樣了,在單線程中多個任務(wù)來回執(zhí)行如果出現(xiàn)長時間的I/O操作,讓其讓出目前的協(xié)程調(diào)度,執(zhí)行下一個任務(wù)。當(dāng)然可能所有任務(wù),全部卡在同一個點(diǎn)上,但是這只是針對于單線程而言,當(dāng)所有數(shù)據(jù)正常返回時,會同時處理當(dāng)前的I/O操作。
Java原生不支持協(xié)程,在純java代碼里需要使用協(xié)程的話需要引入第三方包,如:quasar
- <dependency>
- <groupId>co.paralleluniverse</groupId>
- <artifactId>quasar-core</artifactId>
- <version>0.8.0</version>
- <classifier>jdk8</classifier>
- </dependency>
線程池
“線程池”,顧名思義就是一個線程緩存,線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,因此 Java 中提供線程池對線程進(jìn)行同一分配、調(diào)優(yōu)和監(jiān)控。
線程池介紹
在web開發(fā)中,服務(wù)器需要接受并處理請求,所以會為一個請求分配一個線程來進(jìn)行處理。如果每次請求都創(chuàng)建一個線程的話實(shí)現(xiàn)起來非常簡單,但是存在一個問題:如果并發(fā)的請求數(shù)量非常多,但每個線程執(zhí)行的時間很短,這樣就會頻繁的創(chuàng)建和銷毀線程,如此一來會大大降低系統(tǒng)的效率。可能出現(xiàn)服務(wù)器在為每個請求創(chuàng)建新線程和銷毀線程上花費(fèi)的時間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請求的時間和資源更多。
那么有沒有一種辦法使執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?
這就是線程池的目的。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€任務(wù)上。
什么時候使用線程池?
- 單個任務(wù)處理時間比較短;
- 需要處理的任務(wù)數(shù)量很大。
線程池優(yōu)勢
- 重用存在的線程。減少線程黃金、消亡的開銷,提高性能;
- 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行;
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行同一的分配、調(diào)優(yōu)和監(jiān)控。
Executor框架
Executor接口是線程池框架中最基礎(chǔ)的部分,定義來一個用于執(zhí)行 Runnable 的 execute 方法。下面為它的繼承與實(shí)現(xiàn)

ExecutorService接口
從圖中可以看出 Executor 下有一個重要的子接口 ExecutorService ,其中定義來線程池的具體行為

- execute(Runnable command):履行Ruannable類型的任務(wù);
- submit(task):可用來提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future對象;
- shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù);
- shutdownNow():停止所有正在履行的任務(wù)并封閉辦事;
- isTerminated():測試是否所有任務(wù)都履行完畢了;
- isShutdown():測試是否該ExecutorService已被關(guān)閉;
- awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個參數(shù),用于設(shè)定超時時間及單位。當(dāng)?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用;
- invokeAll :作用是等待所有的任務(wù)執(zhí)行完成后統(tǒng)一返回;
- invokeAny :將第一個得到的結(jié)果作為返回值,然后立刻終止所有的線程。如果設(shè)置了超時時間,未超時完成則正常返回結(jié)果,如果超時未完成則報超時異常。
AbstractExcutorService抽象類
此類的定義并沒有特殊的意義僅僅是實(shí)現(xiàn)了ExecutorService接口

- public abstract class AbstractExecutorService implements ExecutorService {
- //此方法很簡單就是對runnable保證,將其包裝為一個FutureTask
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
- //包裝callable為FutureTask
- //FutureTask其實(shí)就是對Callable的一個封裝
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new FutureTask<T>(callable);
- }
- //提交一個Runnable類型的任務(wù)
- public Future<?> submit(Runnable task) {
- //如果為null則拋出NPE
- if (task == null) throw new NullPointerException();
- //包裝任務(wù)為一個Future
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- //將任務(wù)丟給執(zhí)行器,而此處會拋出拒絕異常,在講述ThreadPoolExecutor的時候有講述,不記得的讀者可以去再看看
- execute(ftask);
- return ftask;
- }
- //與上方方法相同只不過指定了返回結(jié)果
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- //與上方方法相同只是換成了callable
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
- //執(zhí)行集合tasks結(jié)果是最后一個執(zhí)行結(jié)束的任務(wù)結(jié)果
- //可以設(shè)置超時 timed為true并且nanos是未來的一個時間
- //任何一個任務(wù)完成都將會返回結(jié)果
- private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
- boolean timed, long nanos)
- throws InterruptedException, ExecutionException, TimeoutException {
- //傳入的任務(wù)集合不能為null
- if (tasks == null)
- throw new NullPointerException();
- //傳入的任務(wù)數(shù)不能是0
- int ntasks = tasks.size();
- if (ntasks == 0)
- throw new IllegalArgumentException();
- //滿足上面的校驗后將任務(wù)分裝到一個ArrayList中
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
- //并且創(chuàng)建一個執(zhí)行器傳入this
- //這里簡單講述他的執(zhí)行原理,傳入this會使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù),當(dāng)submit提交任務(wù)的時候回將任務(wù)
- //封裝為一個內(nèi)部的Future并且重寫他的done而此方法就是在future完成的時候調(diào)用的,而他的寫法則是將當(dāng)前完成的future添加到esc
- //維護(hù)的結(jié)果隊列中
- ExecutorCompletionService<T> ecs =
- new ExecutorCompletionService<T>(this);
- try {
- //創(chuàng)建一個執(zhí)行異常,以便后面拋出
- ExecutionException ee = null;
- //如果開啟了超時則計算死線時間如果時間是0則代表沒有開啟執(zhí)行超時
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- //獲取任務(wù)的迭代器
- Iterator<? extends Callable<T>> it = tasks.iterator();
- //先獲取迭代器中的第一個任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器
- futures.add(ecs.submit(it.next()));
- //前面記錄的任務(wù)數(shù)減一
- --ntasks;
- //當(dāng)前激活數(shù)為1
- int active = 1;
- //進(jìn)入死循環(huán)
- for (;;) {
- //獲取剛才提價的任務(wù)是否完成如果完成則f不是null否則為null
- Future<T> f = ecs.poll();
- //如果為null則代表任務(wù)還在繼續(xù)
- if (f == null) {
- //如果當(dāng)前任務(wù)大于0 說明除了剛才的任務(wù)還有別的任務(wù)存在
- if (ntasks > 0) {
- //則任務(wù)數(shù)減一
- --ntasks;
- //并且再次提交新的任務(wù)
- futures.add(ecs.submit(it.next()));
- //當(dāng)前的存活的執(zhí)行任務(wù)加一
- ++active;
- }
- //如果當(dāng)前存活任務(wù)數(shù)是0則代表沒有任務(wù)在執(zhí)行了從而跳出循環(huán)
- else if (active == 0)
- break;
- //如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時時間
- else if (timed) {
- //則設(shè)置指定的超時時間獲取
- f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
- //等待執(zhí)行超時還沒有獲取到則拋出超時異常
- if (f == null)
- throw new TimeoutException();
- //否則使用當(dāng)前時間計算剩下的超時時間用于下一個循環(huán)使用
- nanos = deadline - System.nanoTime();
- }
- //如果沒有設(shè)置超時則直接獲取任務(wù)
- else
- f = ecs.take();
- }
- //如果獲取到了任務(wù)結(jié)果f!=null
- if (f != null) {
- //激活數(shù)減一
- --active;
- try {
- //返回獲取到的結(jié)果
- return f.get();
- //如果獲取結(jié)果出錯則包裝異常
- } catch (ExecutionException eex) {
- ee = eex;
- } catch (RuntimeException rex) {
- ee = new ExecutionException(rex);
- }
- }
- }
- //如果異常不是null則拋出如果是則創(chuàng)建一個
- if (ee == null)
- ee = new ExecutionException();
- throw ee;
- } finally {
- //其他任務(wù)則設(shè)置取消
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
- //對上方方法的封裝
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- try {
- return doInvokeAny(tasks, false, 0);
- } catch (TimeoutException cannotHappen) {
- assert false;
- return null;
- }
- }
- //對上方法的封裝
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return doInvokeAny(tasks, true, unit.toNanos(timeout));
- }
- //相對于上一個方法執(zhí)行成功任何一個則返回結(jié)果而此方法是全部執(zhí)行完然后統(tǒng)一返回結(jié)果
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- //傳入的任務(wù)集合不能是null
- if (tasks == null)
- throw new NullPointerException();
- //創(chuàng)建一個集合用來保存獲取到的執(zhí)行future
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
- //任務(wù)是否執(zhí)行完成
- boolean done = false;
- try {
- //遍歷傳入的任務(wù)并且調(diào)用執(zhí)行方法將創(chuàng)建的future添加到集合中
- for (Callable<T> t : tasks) {
- RunnableFuture<T> f = newTaskFor(t);
- futures.add(f);
- execute(f);
- }
- //遍歷獲取到的future
- for (int i = 0, size = futures.size(); i < size; i++) {
- Future<T> f = futures.get(i);
- //如果當(dāng)前任務(wù)沒有成功則進(jìn)行f.get方法等待此方法執(zhí)行成功,如果方法執(zhí)行異常或者被取消將忽略異常
- if (!f.isDone()) {
- try {
- f.get();
- } catch (CancellationException ignore) {
- } catch (ExecutionException ignore) {
- }
- }
- }
- //到這一步則代表所有的任務(wù)都已經(jīng)有了確切的結(jié)果
- done = true;
- //返回任務(wù)結(jié)果集合
- return futures;
- } finally {
- //如果不是true是false 則代表執(zhí)行過程中被中斷了則需要對任務(wù)進(jìn)行取消操作,如果正常完成則不會被取消
- if (!done)
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
- //與上方方法的區(qū)別在于對于任務(wù)集合可以設(shè)置超時時間
- //這里會針對差異進(jìn)行講解
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException {
- if (tasks == null)
- throw new NullPointerException();
- //計算設(shè)置時長的納秒時間
- long nanos = unit.toNanos(timeout);
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
- boolean done = false;
- try {
- for (Callable<T> t : tasks)
- futures.add(newTaskFor(t));
- //計算最終計算的確切時間點(diǎn),運(yùn)行時長不能超過此時間也就是時間死線
- //這里是個細(xì)節(jié)future創(chuàng)建的時間并沒有算作執(zhí)行時間
- final long deadline = System.nanoTime() + nanos;
- //獲取當(dāng)前結(jié)果數(shù)
- final int size = futures.size();
- //遍歷將任務(wù)進(jìn)行執(zhí)行
- for (int i = 0; i < size; i++) {
- execute((Runnable)futures.get(i));
- //并且每次都計算死線
- nanos = deadline - System.nanoTime();
- //如果時間已經(jīng)超過則返回結(jié)果
- if (nanos <= 0L)
- return futures;
- }
- //否則遍歷future確定每次執(zhí)行都獲取到了結(jié)果
- for (int i = 0; i < size; i++) {
- Future<T> f = futures.get(i);
- if (!f.isDone()) {
- //如果在等待過程中已經(jīng)超時則返回當(dāng)前等待結(jié)合
- if (nanos <= 0L)
- return futures;
- try {
- //如果沒有超過死線則設(shè)置從future中獲取結(jié)果的時間如果超過則會派出timeout
- f.get(nanos, TimeUnit.NANOSECONDS);
- } catch (CancellationException ignore) {
- } catch (ExecutionException ignore) {
- } catch (TimeoutException toe) {
- //拋出了異常則會返回當(dāng)前的列表
- return futures;
- }
- //計算最新的超時時間
- nanos = deadline - System.nanoTime();
- }
- }
- //之前的返回都沒有設(shè)置為true所以在finally中都會設(shè)置為取消唯獨(dú)正常執(zhí)行完成到此處返回的結(jié)果才是最終的結(jié)果
- done = true;
- return futures;
- } finally {
- if (!done)
- for (int i = 0, size = futures.size(); i < size; i++)
- futures.get(i).cancel(true);
- }
- }
- }
線程池的具體實(shí)現(xiàn)

- ThreadPoolExecutor 默認(rèn)線程池
- ScheduledThreadPoolExecutor 定時線程池 (下篇再做介紹)
ThreadPoolExecutor
線程池重點(diǎn)屬性
- //用來標(biāo)記線程池狀態(tài)(高3位),線程個數(shù)(低29位)
- //默認(rèn)是RUNNING狀態(tài),線程個數(shù)為0
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- //線程個數(shù)掩碼位數(shù),并不是所有平臺int類型是32位,所以準(zhǔn)確說是具體平臺下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個數(shù),
- private static final int COUNT_BITS = Integer.SIZE - 3;
- //線程最大個數(shù)(低29位)000 11111111111111111111111111111
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl 是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。
ctl相關(guān)方法
- runStateOf:獲取運(yùn)行狀態(tài);
- workerCountOf:獲取活動線程數(shù);
- ctlOf:獲取運(yùn)行狀態(tài)和活動線程數(shù)的值。
- / 獲取高三位 運(yùn)行狀態(tài)
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- //獲取低29位 線程個數(shù)
- private static int workerCountOf(int c) { return c & CAPACITY; }
- //計算ctl新值,線程狀態(tài) 與 線程個數(shù)
- private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池存在5種狀態(tài)
- //運(yùn)行中 111 00000000000000000000000000000
- private static final int RUNNING = -1 << COUNT_BITS;
- //關(guān)閉 000 00000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- //停止 001 00000000000000000000000000000
- private static final int STOP = 1 << COUNT_BITS;
- //整理 010 00000000000000000000000000000
- private static final int TIDYING = 2 << COUNT_BITS;
- //終止 011 00000000000000000000000000000
- private static final int TERMINATED = 3 << COUNT_BITS;
使用一個整形,前3位表示狀態(tài),后29位表示線程容量,也就是說線程最多有 230−1 個

也可以看出當(dāng)ctl小于零表示線程池仍在運(yùn)行
RUNNING
- 狀態(tài)說明:線程池處在RUNNING狀態(tài)時,能夠接收新任務(wù),以及對已添加的任務(wù)進(jìn)行處理。
- 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!
SHUTDOWN
- 狀態(tài)說明:線程池處在SHUTDOWN狀態(tài)時,不接收新任務(wù),但能處理已添加的任務(wù)。
- 狀態(tài)切換:調(diào)用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
STOP
- 狀態(tài)說明:線程池處在STOP狀態(tài)時,不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。
- 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
TIDYING
- 狀態(tài)說明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進(jìn)行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實(shí)現(xiàn)。
- 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊列為空并且線程池中執(zhí)行的任務(wù)也為空時,就會由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時,就會由STOP -> TIDYING。
TERMINATED
- 狀態(tài)說明:線程池徹底終止,就變成TERMINATED狀態(tài)。
- 狀態(tài)切換:線程池處在TIDYING狀態(tài)時,執(zhí)行完terminated()之后,就會由 TIDYING -> TERMINATED。
進(jìn)入TERMINATED的條件如下:
- 線程池不是RUNNING狀態(tài);
- 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
- 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
- workerCount為0;
- 設(shè)置TIDYING狀態(tài)成功。

線程池參數(shù)
corePoolSize
線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;
keepAliveTim
線程池維護(hù)線程所允許的空閑時間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
unit
keepAliveTime的單位;
workQueue
用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊列:
1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);
2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列;
threadFactory
它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護(hù)線程,同時也設(shè)置了線程的名稱。
handler
線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
- AbortPolicy:直接拋出異常,默認(rèn)策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù);
上面的4種策略都是ThreadPoolExecutor的內(nèi)部類。

當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。
線程池的創(chuàng)建
有四個構(gòu)造函數(shù),其他三個都是調(diào)用下面代碼中的這個構(gòu)造函數(shù)
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
線程池監(jiān)控
- public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
- public long getCompletedTaskCount() //已完成的任務(wù)數(shù)
- public int getPoolSize() //線程池當(dāng)前的線程數(shù)
- public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量
線程池原理

核心方法分析
由于篇幅有限,核心方法解析請閱讀文末的擴(kuò)展鏈接。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git
原文地址:https://www.toutiao.com/i6903537887346819595/