頭文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
/* * 多線程管理類 * */ #ifndef CTHREADPOOLMANAGE_H #define CTHREADPOOLMANAGE_H #include <iostream> #include <pthread.h> #include <unistd.h> #include <list> #include <vector> #include <time.h> #include <asm/errno.h> #define USLEEP_TIME 100 #define CHECK_TIME 1 using namespace std; class CDoit { public : virtual int start( void *){}; virtual int end(){}; }; class CthreadPoolManage { private : int _minThreads; //最少保留幾個線程 int _maxThreads; //最多可以有幾個線程 int _waitSec; //空閑多少秒后將線程關閉 class threadInfo{ public : threadInfo(){ isbusy = false ; doFlag = true ; } // pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; bool isbusy; //是否空閑 bool doFlag; // time_t beginTime; //線程不工作開始時間 pthread_t cThreadPid; //線程id pthread_attr_t cThreadAttr; //線程屬性 CDoit * doit; //任務類 void * value; //需要傳遞的值 }; //線程函數 static void * startThread( void *); //任務隊列鎖 pthread_mutex_t _duty_mutex; //任務隊列 list<threadInfo*> _dutyList; //線程隊列鎖 pthread_mutex_t _thread_mutex; //線程隊列 list<threadInfo*> _threadList; ///初始化,創建最小個數線程/// void initThread(); ///任務分配線程/// static void * taskAllocation( void *arg); pthread_t tasktPid; ///線程銷毀、狀態檢查線程/// static void * checkThread( void * arg); pthread_t checktPid; bool checkrun; //線程異常退出清理 static void threadCleanUp( void * arg); // int addThread(list<threadInfo*> *plist,threadInfo* ptinfo); public : CthreadPoolManage(); /* 保留的最少線程,最多線程數,空閑多久銷毀,保留幾個線程的冗余 */ CthreadPoolManage( int min, int max, int waitSec); ~CthreadPoolManage(); int start(); //任務注入器 int putDuty(CDoit *, void *); int getNowThreadNum(); }; #endif // CTHREADPOOLMANAGE_H |
CPP文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
/* * 線程池,線程管理類 * */ #include "cthreadpoolmanage.h" CthreadPoolManage::CthreadPoolManage() { _minThreads = 5; //最少保留幾個線程 _maxThreads = 5; //最多可以有幾個線程 _waitSec = 10; //空閑多少秒后將線程關閉 pthread_mutex_init(&_duty_mutex, NULL); pthread_mutex_init(&_thread_mutex, NULL); checkrun = true ; } CthreadPoolManage::CthreadPoolManage( int min, int max, int waitSec) { CthreadPoolManage(); _minThreads = min; //最少保留幾個線程 _maxThreads = max; //最多可以有幾個線程 _waitSec = waitSec; //空閑多少秒后將線程關閉 } CthreadPoolManage::~CthreadPoolManage() { } void CthreadPoolManage::threadCleanUp( void * arg) { threadInfo* tinfo = (threadInfo*)arg; tinfo->isbusy = false ; pthread_mutex_unlock(&tinfo->mtx); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; } void * CthreadPoolManage::startThread( void * arg) { cout<< "線程開始工作" <<endl; threadInfo* tinfo = (threadInfo*)arg; pthread_cleanup_push(threadCleanUp,arg); while (tinfo->doFlag){ pthread_mutex_lock(&tinfo->mtx); if (tinfo->doit == NULL) { cout<< "開始等待任務" <<endl; pthread_cond_wait(&tinfo->cond,&tinfo->mtx); cout<< "有任務了" <<endl; } tinfo->isbusy = true ; tinfo->doit->start(tinfo->value); tinfo->doit->end(); tinfo->doit=NULL; tinfo->isbusy = false ; time ( &tinfo->beginTime); pthread_mutex_unlock(&tinfo->mtx); } //0正常執行到這兒不執行清理函數,異常會執行 pthread_cleanup_pop(0); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; cout<< "線程結束" <<endl; } void CthreadPoolManage::initThread() { int i = 0; for (i = 0;i< this ->_minThreads;i++) { threadInfo *tinfo = new threadInfo; tinfo->doit = NULL; tinfo->value = NULL; tinfo->isbusy = false ; tinfo->doFlag = true ; // PTHREAD_CREATE_DETACHED (分離線程) 和 PTHREAD _CREATE_JOINABLE (非分離線程) pthread_attr_init(&tinfo->cThreadAttr); pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED ); cout<< "初始化了一個線程" <<endl; if (pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,( void *)tinfo) != 0) { cout<< "創建線程失敗" <<endl; break ; } this ->_threadList.push_back(tinfo); } } int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo) { threadInfo *tinfo = new threadInfo; tinfo->doit = ptinfo->doit; tinfo->value = ptinfo->value; tinfo->isbusy = true ; if (pthread_create(&tinfo->cThreadPid,NULL,startThread,( void *)tinfo) != 0) { cout<< "創建線程失敗" <<endl; return -1; } plist->push_back(tinfo); return 0; } int CthreadPoolManage::putDuty(CDoit* doit, void * value) { threadInfo *tinfo = new threadInfo; time ( &tinfo->beginTime); tinfo->doit= doit; tinfo->value = value; pthread_mutex_lock(&_duty_mutex); this ->_dutyList.push_back(tinfo); pthread_mutex_unlock(&_duty_mutex); return 0; } void * CthreadPoolManage::taskAllocation( void *arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; int size_1 = 0; int size_2 = 0; int i_1 = 0; int i_2 = 0; bool a_1 = true ; bool a_2 = true ; threadInfo* ptinfo; threadInfo* ptinfoTmp; while ( true ){ size_1 = 0; size_2 = 0; pthread_mutex_lock(&ptmanage->_duty_mutex); pthread_mutex_lock(&ptmanage->_thread_mutex); size_1 = ptmanage->_dutyList.size(); size_2 =ptmanage->_threadList.size(); for (list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();) { ptinfo = *itorti1; a_1 = true ; for (list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){ ptinfoTmp = *itorti2; if (EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx)) { continue ; } if (!ptinfoTmp->isbusy) { ptinfoTmp->doit = ptinfo->doit; ptinfoTmp->value = ptinfo->value; ptinfoTmp->isbusy = true ; pthread_cond_signal(&ptinfoTmp->cond); pthread_mutex_unlock(&ptinfoTmp->mtx); a_1 = false ; delete ptinfo; break ; } pthread_mutex_unlock(&ptinfoTmp->mtx); } if (a_1){ if (ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0) { itorti1++; continue ; } else { itorti1 = ptmanage->_dutyList.erase(itorti1); } delete ptinfo; } else { itorti1 = ptmanage->_dutyList.erase(itorti1); } } pthread_mutex_unlock(&ptmanage->_duty_mutex); pthread_mutex_unlock(&ptmanage->_thread_mutex); usleep(USLEEP_TIME); } return 0; } void * CthreadPoolManage::checkThread( void * arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; threadInfo* ptinfo; time_t nowtime; while (ptmanage->checkrun){ sleep(CHECK_TIME); pthread_mutex_lock(&ptmanage->_thread_mutex); if (ptmanage->_threadList.size()<=ptmanage->_minThreads) { continue ; } for (list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){ ptinfo = *itorti2; if (EBUSY == pthread_mutex_trylock(&ptinfo->mtx)) { itorti2++; continue ; } time (&nowtime); if (ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec) { ptinfo->doFlag = false ; itorti2 = ptmanage->_threadList.erase(itorti2); } else { itorti2++; } pthread_mutex_unlock(&ptinfo->mtx); } pthread_mutex_unlock(&ptmanage->_thread_mutex); } } int CthreadPoolManage::start() { //初始化 this ->initThread(); //啟動任務分配線程 if (pthread_create(&tasktPid,NULL,taskAllocation,( void *) this ) != 0) { cout<< "創建任務分配線程失敗" <<endl; return -1; } //創建現程狀態分配管理線程 if (pthread_create(&checktPid,NULL,checkThread,( void *) this ) != 0) { cout<< "創建線程狀態分配管理線程失敗" <<endl; return -1; } return 0; } /////////////////////////////// int CthreadPoolManage::getNowThreadNum() { int num = 0; pthread_mutex_lock(& this ->_thread_mutex); num = this ->_threadList.size(); pthread_mutex_unlock(& this ->_thread_mutex); return num ; } |
以上所述就是本文的全部內容了,希望大家能夠喜歡。
請您花一點時間將文章分享給您的朋友或者留下評論。我們將會由衷感謝您的支持!