v一、前言
定時任務一般是項目中都需要用到的,可以用于定時處理一些特殊的任務。這篇文章主要給大家介紹了關于spring boot定時任務的原理及動態創建的相關內容,下面來一起看看詳細的介紹吧
上周工作遇到了一個需求,同步多個省份銷號數據,解綁微信粉絲。分省定時將銷號數據放到sftp服務器上,我需要開發定時任務去解析文件。因為是多省份,服務器、文件名規則、數據規則都不一定,所以要做成可配置是有一定難度的。數據規則這塊必須強烈要求統一,服務器、文件名規則都可以從配置中心去讀。每新增一個省份的配置,后臺感知到后,動態生成定時任務。
v二、springboot引入定時任務核心配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@target (elementtype.type) @retention (retentionpolicy.runtime) @import (schedulingconfiguration. class ) @documented public @interface enablescheduling { } @configuration @role (beandefinition.role_infrastructure) public class schedulingconfiguration { @bean (name = taskmanagementconfigutils.scheduled_annotation_processor_bean_name) @role (beandefinition.role_infrastructure) public scheduledannotationbeanpostprocessor scheduledannotationprocessor() { return new scheduledannotationbeanpostprocessor(); } } |
接下來主要看一下這個核心后置處理器:scheduledannotationbeanpostprocessor 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@override public object postprocessafterinitialization(object bean, string beanname) { if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler || bean instanceof scheduledexecutorservice) { // ignore aop infrastructure such as scoped proxies. return bean; } class <?> targetclass = aopproxyutils.ultimatetargetclass(bean); if (! this .nonannotatedclasses.contains(targetclass)) { map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass, (methodintrospector.metadatalookup<set<scheduled>>) method -> { set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations( method, scheduled. class , schedules. class ); return (!scheduledmethods.isempty() ? scheduledmethods : null ); }); if (annotatedmethods.isempty()) { this .nonannotatedclasses.add(targetclass); if (logger.istraceenabled()) { logger.trace( "no @scheduled annotations found on bean class: " + targetclass); } } else { // non-empty set of methods annotatedmethods.foreach((method, scheduledmethods) -> scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean))); if (logger.istraceenabled()) { logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname + "': " + annotatedmethods); } } } return bean; } |
1、處理scheduled注解,通過scheduledtaskregistrar注冊定時任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
private void finishregistration() { if ( this .scheduler != null ) { this .registrar.setscheduler( this .scheduler); } if ( this .beanfactory instanceof listablebeanfactory) { map<string, schedulingconfigurer> beans = ((listablebeanfactory) this .beanfactory).getbeansoftype(schedulingconfigurer. class ); list<schedulingconfigurer> configurers = new arraylist<>(beans.values()); annotationawareordercomparator.sort(configurers); for (schedulingconfigurer configurer : configurers) { configurer.configuretasks( this .registrar); } } if ( this .registrar.hastasks() && this .registrar.getscheduler() == null ) { assert .state( this .beanfactory != null , "beanfactory must be set to find scheduler by type" ); try { // search for taskscheduler bean... this .registrar.settaskscheduler(resolveschedulerbean( this .beanfactory, taskscheduler. class , false )); } catch (nouniquebeandefinitionexception ex) { logger.trace( "could not find unique taskscheduler bean" , ex); try { this .registrar.settaskscheduler(resolveschedulerbean( this .beanfactory, taskscheduler. class , true )); } catch (nosuchbeandefinitionexception ex2) { if (logger.isinfoenabled()) { logger.info( "more than one taskscheduler bean exists within the context, and " + "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " + "(possibly as an alias); or implement the schedulingconfigurer interface and call " + "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " + ex.getbeannamesfound()); } } } catch (nosuchbeandefinitionexception ex) { logger.trace( "could not find default taskscheduler bean" , ex); // search for scheduledexecutorservice bean next... try { this .registrar.setscheduler(resolveschedulerbean( this .beanfactory, scheduledexecutorservice. class , false )); } catch (nouniquebeandefinitionexception ex2) { logger.trace( "could not find unique scheduledexecutorservice bean" , ex2); try { this .registrar.setscheduler(resolveschedulerbean( this .beanfactory, scheduledexecutorservice. class , true )); } catch (nosuchbeandefinitionexception ex3) { if (logger.isinfoenabled()) { logger.info( "more than one scheduledexecutorservice bean exists within the context, and " + "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " + "(possibly as an alias); or implement the schedulingconfigurer interface and call " + "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " + ex2.getbeannamesfound()); } } } catch (nosuchbeandefinitionexception ex2) { logger.trace( "could not find default scheduledexecutorservice bean" , ex2); // giving up -> falling back to default scheduler within the registrar... logger.info( "no taskscheduler/scheduledexecutorservice bean found for scheduled processing" ); } } } this .registrar.afterpropertiesset(); } |
1、通過一系列的schedulingconfigurer動態配置scheduledtaskregistrar。
2、向scheduledtaskregistrar注冊一個taskscheduler(用于對runnable的任務進行調度,它包含有多種觸發規則)。
3、registrar.afterpropertiesset(),在這開始安排所有的定時任務開始執行了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
protected void scheduletasks() { if ( this .taskscheduler == null ) { this .localexecutor = executors.newsinglethreadscheduledexecutor(); this .taskscheduler = new concurrenttaskscheduler( this .localexecutor); } if ( this .triggertasks != null ) { for (triggertask task : this .triggertasks) { addscheduledtask(scheduletriggertask(task)); } } if ( this .crontasks != null ) { for (crontask task : this .crontasks) { addscheduledtask(schedulecrontask(task)); } } if ( this .fixedratetasks != null ) { for (intervaltask task : this .fixedratetasks) { addscheduledtask(schedulefixedratetask(task)); } } if ( this .fixeddelaytasks != null ) { for (intervaltask task : this .fixeddelaytasks) { addscheduledtask(schedulefixeddelaytask(task)); } } } |
1、triggertask:動態定時任務。通過trigger#nextexecutiontime 給定的觸發上下文確定下一個執行時間。
2、crontask:動態定時任務,triggertask子類。通過cron表達式確定的時間觸發下一個任務執行。
3、intervaltask:一定時間延遲之后,周期性執行的任務。
4、taskscheduler 如果為空,默認是concurrenttaskscheduler,并使用默認單線程的scheduledexecutor。
v三、主要看一下crontask工作原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
scheduledtaskregistrar.java @nullable public scheduledtask schedulecrontask(crontask task) { scheduledtask scheduledtask = this .unresolvedtasks.remove(task); boolean newtask = false ; if (scheduledtask == null ) { scheduledtask = new scheduledtask(task); newtask = true ; } if ( this .taskscheduler != null ) { scheduledtask.future = this .taskscheduler.schedule(task.getrunnable(), task.gettrigger()); } else { addcrontask(task); this .unresolvedtasks.put(task, scheduledtask); } return (newtask ? scheduledtask : null ); } concurrenttaskscheduler.java @override @nullable public scheduledfuture<?> schedule(runnable task, trigger trigger) { try { if ( this .enterpriseconcurrentscheduler) { return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true ), trigger); } else { errorhandler errorhandler = ( this .errorhandler != null ? this .errorhandler : taskutils.getdefaulterrorhandler( true )); return new reschedulingrunnable(task, trigger, this .scheduledexecutor, errorhandler).schedule(); } } catch (rejectedexecutionexception ex) { throw new taskrejectedexception( "executor [" + this .scheduledexecutor + "] did not accept task: " + task, ex); } } reschedulingrunnable.java @nullable public scheduledfuture<?> schedule() { synchronized ( this .triggercontextmonitor) { this .scheduledexecutiontime = this .trigger.nextexecutiontime( this .triggercontext); if ( this .scheduledexecutiontime == null ) { return null ; } long initialdelay = this .scheduledexecutiontime.gettime() - system.currenttimemillis(); this .currentfuture = this .executor.schedule( this , initialdelay, timeunit.milliseconds); return this ; } } private scheduledfuture<?> obtaincurrentfuture() { assert .state( this .currentfuture != null , "no scheduled future" ); return this .currentfuture; } @override public void run() { date actualexecutiontime = new date(); super .run(); date completiontime = new date(); synchronized ( this .triggercontextmonitor) { assert .state( this .scheduledexecutiontime != null , "no scheduled execution" ); this .triggercontext.update( this .scheduledexecutiontime, actualexecutiontime, completiontime); if (!obtaincurrentfuture().iscancelled()) { schedule(); } } } |
1、最終將task和trigger都封裝到了reschedulingrunnable中。
2、reschedulingrunnable實現了任務重復調度(schedule方法中調用調度器executor并傳入自身對象,executor會調用run方法,run方法又調用了schedule方法)。
3、reschedulingrunnable schedule方法加了同步鎖,只能有一個線程拿到下次執行時間并加入執行器的調度。
4、不同的reschedulingrunnable對象之間在線程池夠用的情況下是不會相互影響的,也就是說滿足線程池的條件下,taskscheduler的schedule方法的多次調用是可以交叉執行的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
scheduledthreadpoolexecutor.java public scheduledfuture<?> schedule(runnable command, long delay, timeunit unit) { if (command == null || unit == null ) throw new nullpointerexception(); runnablescheduledfuture<?> t = decoratetask(command, new scheduledfuturetask< void >(command, null , triggertime(delay, unit))); delayedexecute(t); return t; } private void delayedexecute(runnablescheduledfuture<?> task) { if (isshutdown()) reject(task); else { super .getqueue().add(task); if (isshutdown() && !canrunincurrentrunstate(task.isperiodic()) && remove(task)) task.cancel( false ); else ensureprestart(); } } |
scheduledfuturetask 工作原理如下圖所示【太懶了,不想畫圖了,盜圖一張】。
1、scheduledfuturetask會放入優先阻塞隊列:scheduledthreadpoolexecutor.delayedworkqueue(二叉最小堆實現)
2、上圖中的thread對象即threadpoolexecutor.worker,實現了runnable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/** * creates with given first task and thread from threadfactory. * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(- 1 ); // inhibit interrupts until runworker this .firsttask = firsttask; this .thread = getthreadfactory().newthread( this ); } /** delegates main run loop to outer runworker */ public void run() { runworker( this ); } |
1、worker中維護了thread對象,thread對象的runnable實例即worker自身
2、threadpoolexecutor#addworker方法中會創建worker對象,然后拿到worker中的thread實例并start,這樣就創建了線程池中的一個線程實例
3、worker的run方法會調用threadpoolexecutor#runworker方法,這才是任務最終被執行的地方,該方法示意如下
(1)首先取傳入的task執行,如果task是null,只要該線程池處于運行狀態,就會通過gettask方法從workqueue中取任務。threadpoolexecutor的execute方法會在無法產生core線程的時候向 workqueue隊列中offer任務。
gettask方法從隊列中取task的時候會根據相關配置決定是否阻塞和阻塞多久。如果gettask方法結束,返回的是null,runworker循環結束,執行processworkerexit方法。
至此,該線程結束自己的使命,從線程池中“消失”。
(2)在開始執行任務之前,會調用worker的lock方法,目的是阻止task正在被執行的時候被interrupt,通過調用clearinterruptsfortaskrun方法來保證的(后面可以看一下這個方法),該線程沒有自己的interrupt set了。
(3)beforeexecute和afterexecute方法用于在執行任務前后執行一些自定義的操作,這兩個方法是空的,留給繼承類去填充功能。
我們可以在beforeexecute方法中拋出異常,這樣task不會被執行,而且在跳出該循環的時候completedabruptly的值是true,表示the worker died due to user exception,會用decrementworkercount調整wc。
(4)因為runnable的run方法不能拋出throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會使當當前線程死掉,可以在afterexecute中對異常做一些處理。
(5)afterexecute方法也可能拋出異常,也可能使當前線程死掉。
v四、動態創建定時任務
v taskconfiguration 配置類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
@configuration @enablescheduling @role (beandefinition.role_infrastructure) public class taskconfiguration { @bean (name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name) @role (beandefinition.role_infrastructure) public scheduledexecutorservice scheduledannotationprocessor() { return executors.newscheduledthreadpool( 5 , new defaultthreadfactory()); } private static class defaultthreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger( 1 ); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger( 1 ); private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null ) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-schedule-" ; } @override public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0 ); if (t.isdaemon()) { t.setdaemon( false ); } if (t.getpriority() != thread.norm_priority) { t.setpriority(thread.norm_priority); } return t; } } } |
1、保證concurrenttaskscheduler不使用默認單線程的scheduledexecutor,而是corepoolsize=5的線程池
2、自定義線程池工廠類
v dynamictask 動態定時任務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
@configuration public class dynamictask implements schedulingconfigurer { private static logger logger = loggerfactory.getlogger(dynamictask. class ); private static final executorservice es = new threadpoolexecutor( 10 , 20 , 0l, timeunit.milliseconds, new linkedblockingqueue<>( 10 ), new dynamictaskconsumethreadfactory()); private volatile scheduledtaskregistrar registrar; private final concurrenthashmap<string, scheduledfuture<?>> scheduledfutures = new concurrenthashmap<>(); private final concurrenthashmap<string, crontask> crontasks = new concurrenthashmap<>(); private volatile list<taskconstant> taskconstants = lists.newarraylist(); @override public void configuretasks(scheduledtaskregistrar registrar) { this .registrar = registrar; this .registrar.addtriggertask(() -> { if (!collectionutils.isempty(taskconstants)) { logger.info( "檢測動態定時任務列表..." ); list<timingtask> tts = new arraylist<>(); taskconstants .foreach(taskconstant -> { timingtask tt = new timingtask(); tt.setexpression(taskconstant.getcron()); tt.settaskid( "dynamic-task-" + taskconstant.gettaskid()); tts.add(tt); }); this .refreshtasks(tts); } } , triggercontext -> new periodictrigger(5l, timeunit.seconds).nextexecutiontime(triggercontext)); } public list<taskconstant> gettaskconstants() { return taskconstants; } private void refreshtasks(list<timingtask> tasks) { //取消已經刪除的策略任務 set<string> taskids = scheduledfutures.keyset(); for (string taskid : taskids) { if (!exists(tasks, taskid)) { scheduledfutures.get(taskid).cancel( false ); } } for (timingtask tt : tasks) { string expression = tt.getexpression(); if (stringutils.isblank(expression) || !cronsequencegenerator.isvalidexpression(expression)) { logger.error( "定時任務dynamictask cron表達式不合法: " + expression); continue ; } //如果配置一致,則不需要重新創建定時任務 if (scheduledfutures.containskey(tt.gettaskid()) && crontasks.get(tt.gettaskid()).getexpression().equals(expression)) { continue ; } //如果策略執行時間發生了變化,則取消當前策略的任務 if (scheduledfutures.containskey(tt.gettaskid())) { scheduledfutures.remove(tt.gettaskid()).cancel( false ); crontasks.remove(tt.gettaskid()); } crontask task = new crontask(tt, expression); scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger()); crontasks.put(tt.gettaskid(), task); scheduledfutures.put(tt.gettaskid(), future); } } private boolean exists(list<timingtask> tasks, string taskid) { for (timingtask task : tasks) { if (task.gettaskid().equals(taskid)) { return true ; } } return false ; } @predestroy public void destroy() { this .registrar.destroy(); } public static class taskconstant { private string cron; private string taskid; public string getcron() { return cron; } public void setcron(string cron) { this .cron = cron; } public string gettaskid() { return taskid; } public void settaskid(string taskid) { this .taskid = taskid; } } private class timingtask implements runnable { private string expression; private string taskid; public string gettaskid() { return taskid; } public void settaskid(string taskid) { this .taskid = taskid; } @override public void run() { //設置隊列大小10 logger.error( "當前crontask: " + this ); dynamicblockingqueue queue = new dynamicblockingqueue( 3 ); es.submit(() -> { while (!queue.isdone() || !queue.isempty()) { try { string content = queue.poll( 500 , timeunit.milliseconds); if (stringutils.isblank(content)) { return ; } logger.info( "dynamicblockingqueue 消費:" + content); timeunit.milliseconds.sleep( 500 ); } catch (interruptedexception e) { e.printstacktrace(); } } }); //隊列放入數據 for ( int i = 0 ; i < 5 ; ++i) { try { queue.put(string.valueof(i)); logger.info( "dynamicblockingqueue 生產:" + i); } catch (interruptedexception e) { e.printstacktrace(); } } queue.setdone( true ); } public string getexpression() { return expression; } public void setexpression(string expression) { this .expression = expression; } @override public string tostring() { return reflectiontostringbuilder.tostring( this , tostringstyle.json_style , false , false , timingtask. class ); } } /** * 隊列消費線程工廠類 */ private static class dynamictaskconsumethreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger( 1 ); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger( 1 ); private final string nameprefix; dynamictaskconsumethreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null ) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-dynamic-task-" ; } @override public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0 ); if (t.isdaemon()) { t.setdaemon( false ); } if (t.getpriority() != thread.norm_priority) { t.setpriority(thread.norm_priority); } return t; } } private static class dynamicblockingqueue extends linkedblockingqueue<string> { dynamicblockingqueue( int capacity) { super (capacity); } private volatile boolean done = false ; public boolean isdone() { return done; } public void setdone( boolean done) { this .done = done; } } } |
1、taskconstants 動態任務列表
2、scheduledtaskregistrar#addtriggertask 添加動態周期定時任務,檢測動態任務列表的變化
1
2
3
4
|
crontask task = new crontask(tt, expression); scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger()); crontasks.put(tt.gettaskid(), task); scheduledfutures.put(tt.gettaskid(), future); |
3、動態創建cron定時任務,拿到scheduledfuture實例并緩存起來
4、在刷新任務列表時,通過緩存的scheduledfuture實例和crontask實例,來決定是否取消、移除失效的動態定時任務。
v dynamictasktest 動態定時任務測試類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@runwith (springrunner. class ) @springboottest public class dynamictasktest { @autowired private dynamictask dynamictask; @test public void test() throws interruptedexception { list<dynamictask.taskconstant> taskconstans = dynamictask.gettaskconstants(); dynamictask.taskconstant taskconstant = new dynamictask.taskconstant(); taskconstant.setcron( "0/5 * * * * ?" ); taskconstant.settaskid( "test1" ); taskconstans.add(taskconstant); dynamictask.taskconstant taskconstant1 = new dynamictask.taskconstant(); taskconstant1.setcron( "0/5 * * * * ?" ); taskconstant1.settaskid( "test2" ); taskconstans.add(taskconstant1); dynamictask.taskconstant taskconstant2 = new dynamictask.taskconstant(); taskconstant2.setcron( "0/5 * * * * ?" ); taskconstant2.settaskid( "test3" ); taskconstans.add(taskconstant2); timeunit.seconds.sleep( 40 ); //移除并添加新的配置 taskconstans.remove(taskconstans.size() - 1 ); dynamictask.taskconstant taskconstant3 = new dynamictask.taskconstant(); taskconstant3.setcron( "0/5 * * * * ?" ); taskconstant3.settaskid( "test4" ); taskconstans.add(taskconstant3); // timeunit.minutes.sleep( 50 ); } } |
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://www.cnblogs.com/hujunzheng/p/10353390.html