線程池中ThreadGroup的坑
在Java中每一個線程都歸屬于某個線程組管理的一員,例如在主函數main()主工作流程中產生一個線程,則產生的線程屬于main這個線程組管理的一員。簡單地說,線程組(ThreadGroup)就是由線程組成的管理線程的類,這個類是java.lang.ThreadGroup類。
定義一個線程組,通過以下代碼可以實現。
1
2
|
ThreadGroup group= new ThreadGroup(“groupName”); Thread thread= new Thread(group,”the first thread of group”); |
ThreadGroup類中的某些方法,可以對線程組中的線程產生作用。例如,setMaxPriority()方法可以設定線程組中的所有線程擁有最大的優先權。
所有線程都隸屬于一個線程組。那可以是一個默認線程組(不指定group),亦可是一個創建線程時明確指定的組。在創建之初,線程被限制到一個組里,而且不能改變到一個不同的組。每個應用都至少有一個線程從屬于系統線程組。若創建多個線程而不指定一個組,它們就會自動歸屬于系統線程組。
線程組也必須從屬于其他線程組。必須在構建器里指定新線程組從屬于哪個線程組。若在創建一個線程組的時候沒有指定它的歸屬,則同樣會自動成為系統線程組的一名屬下。因此,一個應用程序中的所有線程組最終都會將系統線程組作為自己的“父”。
那么假如我們需要在線程池中實現一個帶自定義ThreadGroup的線程分組,該怎么實現呢?
我們在給線程池(ThreadPoolExecutor)提交任務的時候可以通過execute(Runnable command)來將一個線程任務加入到該線程池,那么我們是否可以通過new一個指定了ThreadGroup的Thread實例來加入線程池來達到前面說到的目的呢?
ThreadGroup是否可行
通過new Thread(threadGroup,runnable)實現線程池中任務分組
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public static void main(String[] args) { ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); final ThreadGroup group = new ThreadGroup( "Main_Test_Group" ); for ( int i = 0 ; i < 5 ; i++) { Thread thread = new Thread(group, new Runnable() { @Override public void run() { int sleep = ( int )(Math.random() * 10 ); try { Thread.sleep( 1000 * 3 ); System.out.println(Thread.currentThread().getName()+ "執行完畢" ); System.out.println( "當前線程組中的運行線程數" +group.activeCount()); } catch (InterruptedException e) { e.printStackTrace(); } } }, group.getName()+ " #" +i+ "" ); pool.execute(thread); } } |
運行結果
pool-1-thread-3執行完畢
pool-1-thread-1執行完畢
當前線程組中的運行線程數0
pool-1-thread-2執行完畢
當前線程組中的運行線程數0
當前線程組中的運行線程數0
pool-1-thread-4執行完畢
pool-1-thread-5執行完畢
當前線程組中的運行線程數0
當前線程組中的運行線程數0
運行結果中可以看到group中的線程并沒有因為線程池啟動了這個線程任務而運行起來.因此通過線程組來對線程池中的線層任務分組不可行.
從java.util.concurrent.ThreadPoolExecutor源碼中可以看到如下構造函數:
1
2
3
4
5
6
7
8
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } |
如果我們在實例化ThreadPoolExecutor時不指定ThreadFactory,那么將以默認的ThreadFactory來創建Thread.
Executors內部類DefaultThreadFactory
下面的源碼即是默認的Thread工廠
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
從唯一的構造函數可以看到DefaultThreadFactory以SecurityManager 實例中的ThreadGroup來指定線程的group,如果SecurityManager 獲取到的ThreadGroup為null才默認以當前線程的group來指定.public Thread newThread(Runnable r) 則以group來new 一個Thead.這樣我們可以在實例化ThreadPoolExecutor對象的時候在其構造函數內傳入自定義的ThreadFactory實例即可達到目的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class MyTheadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; private ThreadGroup defaultGroup; public MyTheadFactory() { SecurityManager s = System.getSecurityManager(); defaultGroup = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public MyTheadFactory(ThreadGroup group) { this .defaultGroup = group; namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(defaultGroup, null , namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
ThreadGroup的使用及手寫線程池
監聽線程異常關閉
以下代碼在window下不方便測試,需在linux 上 測試
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 以下線程如果強制關閉的話,是無法打印`線程被殺掉了` // 模擬關閉 kill PID public static void main(String[] args) { Runtime.getRuntime().addShutdownHook( new Thread( () -> { System.out.println( "線程被殺掉了" ); })); while ( true ){ System.out.println( "i am working ..." ); try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
如何拿到Thread線程中異常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public static void main(String[] args) { Thread thread = new Thread(() -> { try { Thread.sleep( 1000 ); int i = 10 / 0 ; } catch (InterruptedException e) { e.printStackTrace(); } }); thread.setUncaughtExceptionHandler((t,e)->{ System.out.println( "線程的名字" + t.getName()); System.out.println(e); }); // 通過注入接口的方式 thread.start(); } |
ThreadGroup
注意: threadGroup 設置為isDaemon 后,會隨最后一個線程結束而銷毀,如果沒有設置isDaemon ,則需要手動調用 destory()
線程池使用
自己搭建的簡單線程池實現
其中ThreadGroup 的應用沒有寫,但是我們可以觀察線程關閉后,檢查ThreadGroup 中是否還有活躍的線程等,具體參考ThreadGroup API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.stream.IntStream; /** * @Author: shengjm * @Date: 2020/2/10 9:52 * @Description: */ public class SimpleThreadPool extends Thread{ /** * 線程數量 */ private int size; private final int queueSize; /** * 默認線程隊列數量 */ private final static int DEFAULR_TASK_QUEUE_SIZE = 2000 ; private static volatile int seq = 0 ; private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_" ; private final static ThreadGroup GROUP = new ThreadGroup( "Pool_Group" ); private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); private final DiscardPolicy discardPolicy; private volatile boolean destory = false ; private int min; private int max; private int active; /** * 定義異常策略的實現 */ private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException( "線程池已經被撐爆了,后繼多余的人將丟失" ); }; /** * */ public SimpleThreadPool(){ this ( 4 , 8 , 12 ,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY); } /** * */ public SimpleThreadPool( int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) { this .min = min; this .active = active; this .max = max; this .queueSize = queueSize; this .discardPolicy = discardPolicy; init(); } /** * 初始化 */ private void init() { for ( int i = 0 ; i < min; i++){ createWorkTask(); } this .size = min; this .start(); } private void createWorkTask(){ WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++)); task.start(); THREAD_QUEUE.add(task); } /** * 線程池自動擴充 */ @Override public void run() { while (!destory){ System.out.println( this .min + " --- " + this .active+ " --- " + this .max + " --- " + this .size + " --- " + TASK_QUEUE.size()); try { Thread.sleep( 1000 ); if (TASK_QUEUE.size() > active && size < active){ for ( int i = size; i < active;i++){ createWorkTask(); } size = active; } else if (TASK_QUEUE.size() > max && size < max){ for ( int i = size; i < max;i++){ createWorkTask(); } size = max; } synchronized (THREAD_QUEUE){ if (TASK_QUEUE.isEmpty() && size > active){ int release = size - active; for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){ if (release <= 0 ){ break ; } WorkerTask task = it.next(); task.close(); task.interrupt(); it.remove(); release--; } size = active; } } } catch (InterruptedException e) { break ; } } } public void submit(Runnable runnable){ synchronized (TASK_QUEUE){ if (destory){ throw new DiscardException( "線程池已經被摧毀了..." ); } if (TASK_QUEUE.size() > queueSize){ discardPolicy.discard(); } TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } } /** * 關閉 */ public void shutdown(){ while (!TASK_QUEUE.isEmpty()){ try { Thread.sleep( 10 ); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (THREAD_QUEUE) { int initVal = THREAD_QUEUE.size(); while (initVal > 0 ) { for (WorkerTask workerTask : THREAD_QUEUE) { if (workerTask.getTaskState() == TaskState.BLOCKED) { workerTask.interrupt(); workerTask.close(); initVal--; } else { try { Thread.sleep( 10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } this .destory = true ; } } public int getSize() { return size; } public int getMin() { return min; } public int getMax() { return max; } public int getActive() { return active; } /** * 線程狀態 */ private enum TaskState{ FREE , RUNNING , BLOCKED , DEAD } /** * 自定義異常類 */ public static class DiscardException extends RuntimeException{ public DiscardException(String message){ super (message); } } /** * 定義異常策略 */ @FunctionalInterface public interface DiscardPolicy{ void discard() throws DiscardException; } private static class WorkerTask extends Thread{ private volatile TaskState taskState = TaskState.FREE; public TaskState getTaskState(){ return this .taskState; } public WorkerTask(ThreadGroup group , String name){ super (group , name); } @Override public void run(){ OUTER: while ( this .taskState != TaskState.DEAD){ Runnable runnable; synchronized (TASK_QUEUE){ while (TASK_QUEUE.isEmpty()){ try { taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { break OUTER; } } runnable = TASK_QUEUE.removeFirst(); } if (runnable != null ){ taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; } } } public void close(){ this .taskState = TaskState.DEAD; } } /** * 測試 * @param args */ public static void main(String[] args) { SimpleThreadPool simpleThreadPool = new SimpleThreadPool(); // SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY); IntStream.rangeClosed( 0 , 40 ).forEach(i -> { simpleThreadPool.submit(() -> { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "the runnable " + i + "be servered by " + Thread.currentThread()); }); }); // try { // Thread.sleep(15000); // } catch (InterruptedException e) { // e.printStackTrace(); // } simpleThreadPool.shutdown(); } } |
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/tyBaoErGe/article/details/50196379