1、quartz任務調度的基本實現原理
quartz是opensymphony開源組織在任務調度領域的一個開源項目,完全基于java實現。作為一個優(yōu)秀的開源調度框架,quartz具有以下特點:
(1)強大的調度功能,例如支持豐富多樣的調度方法,可以滿足各種常規(guī)及特殊需求;
(2)靈活的應用方式,例如支持任務和調度的多種組合方式,支持調度數據的多種存儲方式;
(3)分布式和集群能力,terracotta收購后在原來功能基礎上作了進一步提升。本文將對該部分相加闡述。
1.1 quartz 核心元素
quartz任務調度的核心元素為:scheduler——任務調度器、trigger——觸發(fā)器、job——任務。其中trigger和job是任務調度的元數據,scheduler是實際執(zhí)行調度的控制器。
trigger是用于定義調度時間的元素,即按照什么時間規(guī)則去執(zhí)行任務。quartz中主要提供了四種類型的trigger:simpletrigger,crontirgger,dateintervaltrigger,和nthincludeddaytrigger。這四種trigger可以滿足企業(yè)應用中的絕大部分需求。
job用于表示被調度的任務。主要有兩種類型的job:無狀態(tài)的(stateless)和有狀態(tài)的(stateful)。對于同一個trigger來說,有狀態(tài)的job不能被并行執(zhí)行,只有上一次觸發(fā)的任務被執(zhí)行完之后,才能觸發(fā)下一次執(zhí)行。job主要有兩種屬性:volatility和durability,其中volatility表示任務是否被持久化到數據庫存儲,而durability表示在沒有trigger關聯的時候任務是否被保留。兩者都是在值為true的時候任務被持久化或保留。一個job可以被多個trigger關聯,但是一個trigger只能關聯一個job。
scheduler由scheduler工廠創(chuàng)建:directschedulerfactory或者stdschedulerfactory。第二種工廠stdschedulerfactory使用較多,因為directschedulerfactory使用起來不夠方便,需要作許多詳細的手工編碼設置。scheduler主要有三種:remotembeanscheduler,remotescheduler和stdscheduler。
quartz核心元素之間的關系如圖1.1所示:
圖1.1 核心元素關系圖
1.2 quartz 線程視圖
在quartz中,有兩類線程,scheduler調度線程和任務執(zhí)行線程,其中任務執(zhí)行線程通常使用一個線程池維護一組線程。
圖1.2 quartz線程視圖
scheduler調度線程主要有兩個:執(zhí)行常規(guī)調度的線程,和執(zhí)行misfiredtrigger的線程。常規(guī)調度線程輪詢存儲的所有trigger,如果有需要觸發(fā)的trigger,即到達了下一次觸發(fā)的時間,則從任務執(zhí)行線程池獲取一個空閑線程,執(zhí)行與該trigger關聯的任務。misfire線程是掃描所有的trigger,查看是否有misfiredtrigger,如果有的話根據misfire的策略分別處理(fire now or wait for the next fire)。
1.3 quartz job數據存儲
quartz中的trigger和job需要存儲下來才能被使用。quartz中有兩種存儲方式:ramjobstore,jobstoresupport,其中ramjobstore是將trigger和job存儲在內存中,而jobstoresupport是基于jdbc將trigger和job存儲到數據庫中。ramjobstore的存取速度非常快,但是由于其在系統被停止后所有的數據都會丟失,所以在集群應用中,必須使用jobstoresupport。
2、quartz集群原理2.1 quartz 集群架構
一個quartz集群中的每個節(jié)點是一個獨立的quartz應用,它又管理著其他的節(jié)點。這就意味著你必須對每個節(jié)點分別啟動或停止。quartz集群中,獨立的quartz節(jié)點并不與另一其的節(jié)點或是管理節(jié)點通信,而是通過相同的數據庫表來感知到另一quartz應用的,如圖2.1所示。
圖2.1 quartz集群架構
2.2 quartz集群相關數據庫表
因為quartz集群依賴于數據庫,所以必須首先創(chuàng)建quartz數據庫表,quartz發(fā)布包中包括了所有被支持的數據庫平臺的sql腳本。這些sql腳本存放于<quartz_home>/docs/dbtables 目錄下。這里采用的quartz 1.8.4版本,總共12張表,不同版本,表個數可能不同。數據庫為mysql,用tables_mysql.sql創(chuàng)建數據庫表。全部表如圖2.2所示,對這些表的簡要介紹如圖2.3所示。
圖2.2 quartz 1.8.4在mysql數據庫中生成的表
圖2.3 quartz數據表簡介
2.2.1 調度器狀態(tài)表(qrtz_scheduler_state)
說明:集群中節(jié)點實例信息,quartz定時讀取該表的信息判斷集群中每個實例的當前狀態(tài)。
instance_name:配置文件中org.quartz.scheduler.instanceid配置的名字,如果設置為auto,quartz會根據物理機名和當前時間產生一個名字。
last_checkin_time:上次檢入時間
checkin_interval:檢入間隔時間
2.2.2 觸發(fā)器與任務關聯表(qrtz_fired_triggers)
存儲與已觸發(fā)的trigger相關的狀態(tài)信息,以及相聯job的執(zhí)行信息。
2.2.3 觸發(fā)器信息表(qrtz_triggers)
trigger_name:trigger的名字,該名字用戶自己可以隨意定制,無強行要求
trigger_group:trigger所屬組的名字,該名字用戶自己隨意定制,無強行要求
job_name:qrtz_job_details表job_name的外鍵
job_group:qrtz_job_details表job_group的外鍵
trigger_state:當前trigger狀態(tài)設置為acquired,如果設為waiting,則job不會觸發(fā)
trigger_cron:觸發(fā)器類型,使用cron表達式
2.2.4 任務詳細信息表(qrtz_job_details)
說明:保存job詳細信息,該表需要用戶根據實際情況初始化
job_name:集群中job的名字,該名字用戶自己可以隨意定制,無強行要求。
job_group:集群中job的所屬組的名字,該名字用戶自己隨意定制,無強行要求。
job_class_name:集群中job實現類的完全包名,quartz就是根據這個路徑到classpath找到該job類的。
is_durable:是否持久化,把該屬性設置為1,quartz會把job持久化到數據庫中
job_data:一個blob字段,存放持久化job對象。
2.2.5權限信息表(qrtz_locks)
說明:tables_oracle.sql里有相應的dml初始化,如圖2.4所示。
圖2.4 quartz權限信息表中的初始化信息
2.3 quartz scheduler在集群中的啟動流程
quartz scheduler自身是察覺不到被集群的,只有配置給scheduler的jdbc jobstore才知道。當quartz scheduler啟動時,它調用jobstore的schedulerstarted()方法,它告訴jobstore scheduler已經啟動了。schedulerstarted() 方法是在jobstoresupport類中實現的。jobstoresupport類會根據quartz.properties文件中的設置來確定scheduler實例是否參與到集群中。假如配置了集群,一個新的clustermanager類的實例就被創(chuàng)建、初始化并啟動。clustermanager是在jobstoresupport類中的一個內嵌類,繼承了java.lang.thread,它會定期運行,并對scheduler實例執(zhí)行檢入的功能。scheduler也要查看是否有任何一個別的集群節(jié)點失敗了。檢入操作執(zhí)行周期在quartz.properties中配置。
2.4 偵測失敗的scheduler節(jié)點
當一個scheduler實例執(zhí)行檢入時,它會查看是否有其他的scheduler實例在到達他們所預期的時間還未檢入。這是通過檢查scheduler_state表中scheduler記錄在last_chedk_time列的值是否早于org.quartz.jobstore.clustercheckininterval來確定的。如果一個或多個節(jié)點到了預定時間還沒有檢入,那么運行中的scheduler就假定它(們) 失敗了。
2.5 從故障實例中恢復job
當一個sheduler實例在執(zhí)行某個job時失敗了,有可能由另一正常工作的scheduler實例接過這個job重新運行。要實現這種行為,配置給jobdetail對象的job可恢復屬性必須設置為true(job.setrequestsrecovery(true))。如果可恢復屬性被設置為false(默認為false),當某個scheduler在運行該job失敗時,它將不會重新運行;而是由另一個scheduler實例在下一次觸發(fā)時間觸發(fā)。scheduler實例出現故障后多快能被偵測到取決于每個scheduler的檢入間隔(即2.3中提到的org.quartz.jobstore.clustercheckininterval)。
3、quartz集群實例(quartz+spring)
3.1 spring不兼容quartz問題
spring從2.0.2開始便不再支持quartz。具體表現在quartz+spring把quartz的task實例化進入數據庫時,會產生:serializable的錯誤:
1
2
3
4
5
6
7
8
|
<bean id= "jobtask" class = "org.springframework.scheduling.quartz. methodinvokingjobdetailfactorybean " > <property name= "targetobject" > <ref bean= "quartzjob" /> </property> <property name= "targetmethod" > <value>execute</value> </property> </bean> |
這個methodinvokingjobdetailfactorybean類中的methodinvoking方法,是不支持序列化的,因此在把quartz的task序列化進入數據庫時就會拋錯。
首先解決methodinvokingjobdetailfactorybean的問題,在不修改spring源碼的情況下,可以避免使用這個類,直接調用jobdetail。但是使用jobdetail實現,需要自己實現mothodinvoking的邏輯,可以使用jobdetail的jobclass和jobdataasmap屬性來自定義一個factory(manager)來實現同樣的目的。例如,本示例中新建了一個mydetailquartzjobbean來實現這個功能。
3.2 mydetailquartzjobbean.java文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package org.lxh.mvc.jobbean; import java.lang.reflect.method; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.quartz.jobexecutioncontext; import org.quartz.jobexecutionexception; import org.springframework.context.applicationcontext; import org.springframework.scheduling.quartz.quartzjobbean; public class mydetailquartzjobbean extends quartzjobbean { protected final log logger = logfactory.getlog(getclass()); private string targetobject; private string targetmethod; private applicationcontext ctx; protected void executeinternal(jobexecutioncontext context) throws jobexecutionexception { try { logger.info( "execute [" + targetobject + "] at once>>>>>>" ); object otargetobject = ctx.getbean(targetobject); method m = null ; try { m = otargetobject.getclass().getmethod(targetmethod, new class [] {}); m.invoke(otargetobject, new object[] {}); } catch (securityexception e) { logger.error(e); } catch (nosuchmethodexception e) { logger.error(e); } } catch (exception e) { throw new jobexecutionexception(e); } } public void setapplicationcontext(applicationcontext applicationcontext){ this .ctx=applicationcontext; } public void settargetobject(string targetobject) { this .targetobject = targetobject; } public void settargetmethod(string targetmethod) { this .targetmethod = targetmethod; } } |
3.3真正的job實現類
在test類中,只是簡單實現了打印系統當前時間的功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package org.lxh.mvc.job; import java.io.serializable; import java.util.date; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; public class test implements serializable{ private log logger = logfactory.getlog(test. class ); private static final long serialversionuid = -2073310586499744415l; public void execute () { date date= new date(); system.out.println(date.tolocalestring()); } } |
3.4 配置quartz.xml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
<bean id= "test" class = "org.lxh.mvc.job.test" scope= "prototype" > </bean> <bean id= "testjobtask" class = "org.springframework.scheduling.quartz.jobdetailbean" > <property name= "jobclass" > <value>org.lxh.mvc.jobbean.mydetailquartzjobbean</value> </property> <property name= "jobdataasmap" > <map> <entry key= "targetobject" value= "test" /> <entry key= "targetmethod" value= "execute" /> </map> </property> </bean> <bean name= "testtrigger" class = "org.springframework.scheduling.quartz.crontriggerbean" > <property name= "jobdetail" ref= "testjobtask" /> <property name= "cronexpression" value= "0/1 * * * * ?" /> </bean> <bean id= "quartzscheduler" class = "org.springframework.scheduling.quartz.schedulerfactorybean" > <property name= "configlocation" value= "classpath:quartz.properties" /> <property name= "triggers" > <list> <ref bean= "testtrigger" /> </list> </property> <property name= "applicationcontextschedulercontextkey" value= "applicationcontext" /> </bean> |
3.5 測試
servera、serverb的代碼、配置完全一樣,先啟動servera,后啟動serverb,當server關斷之后,serverb會監(jiān)測到其關閉,并將servera上正在執(zhí)行的job接管,繼續(xù)執(zhí)行。
4、quartz集群實例(單獨quartz)
盡管我們已經實現了spring+quartz的集群配置,但是因為spring與quartz之間的兼容問題還是不建議使用該方式。在本小節(jié)中,我們實現了單獨用quartz配置的集群,相對spring+quartz的方式來說,簡單、穩(wěn)定。
4.1 工程結構
我們采用單獨使用quartz來實現其集群功能,代碼結構及所需的第三方jar包如圖3.1所示。其中,mysql版本:5.1.52,mysql驅動版本:mysql-connector-java-5.1.5-bin.jar(針對于5.1.52,建議采用該版本驅動,因為quartz存在bug使得其與某些mysql驅動結合時不能正常運行)。
圖4.1 quartz集群工程結構及所需第三方jar包
其中quartz.properties為quartz配置文件,放在src目錄下,若無該文件,quartz將自動加載jar包中的quartz.properties文件;simplerecoveryjob.java、simplerecoverystatefuljob.java為兩個job;clusterexample.java中編寫了調度信息、觸發(fā)機制及相應的測試main函數。
4.2 配置文件quartz.properties
默認文件名稱quartz.properties,通過設置"org.quartz.jobstore.isclustered"屬性為"true"來激活集群特性。在集群中的每一個實例都必須有一個唯一的"instance id" ("org.quartz.scheduler.instanceid" 屬性), 但是應該有相同的"scheduler instance name" ("org.quartz.scheduler.instancename"),也就是說集群中的每一個實例都必須使用相同的quartz.properties 配置文件。除了以下幾種例外,配置文件的內容其他都必須相同:
a.線程池大小。
b.不同的"org.quartz.scheduler.instanceid"屬性值(通過設定為"auto"即可)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#============================================================== #configure main scheduler properties #============================================================== org.quartz.scheduler.instancename = quartzscheduler org.quartz.scheduler.instanceid = auto #============================================================== #configure jobstore #============================================================== org.quartz.jobstore. class = org.quartz.impl.jdbcjobstore.jobstoretx org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate org.quartz.jobstore.tableprefix = qrtz_ org.quartz.jobstore.isclustered = true org.quartz.jobstore.clustercheckininterval = 10000 org.quartz.jobstore.datasource = myds #============================================================== #configure datasource #============================================================== org.quartz.datasource.myds.driver = com.mysql.jdbc.driver org.quartz.datasource.myds.url = jdbc:mysql: //192.168.31.18:3306/test?useunicode=true&characterencoding=utf-8 org.quartz.datasource.myds.user = root org.quartz.datasource.myds.password = 123456 org.quartz.datasource.myds.maxconnections = 30 #============================================================== #configure threadpool #============================================================== org.quartz.threadpool. class = org.quartz.simpl.simplethreadpool org.quartz.threadpool.threadcount = 5 org.quartz.threadpool.threadpriority = 5 org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true |
4.3 clusterexample.java文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package cluster; import java.util.date; import org.quartz.jobdetail; import org.quartz.scheduler; import org.quartz.schedulerfactory; import org.quartz.simpletrigger; import org.quartz.impl.stdschedulerfactory; public class clusterexample { public void cleanup(scheduler inscheduler) throws exception { system.out.println( "***** deleting existing jobs/triggers *****" ); // unschedule jobs string[] groups = inscheduler.gettriggergroupnames(); for ( int i = 0 ; i < groups.length; i++) { string[] names = inscheduler.gettriggernames(groups[i]); for ( int j = 0 ; j < names.length; j++) { inscheduler.unschedulejob(names[j], groups[i]); } } // delete jobs groups = inscheduler.getjobgroupnames(); for ( int i = 0 ; i < groups.length; i++) { string[] names = inscheduler.getjobnames(groups[i]); for ( int j = 0 ; j < names.length; j++) { inscheduler.deletejob(names[j], groups[i]); } } } public void run( boolean inclearjobs, boolean inschedulejobs) throws exception { // first we must get a reference to a scheduler schedulerfactory sf = new stdschedulerfactory(); scheduler sched = sf.getscheduler(); if (inclearjobs) { cleanup(sched); } system.out.println( "------- initialization complete -----------" ); if (inschedulejobs) { system.out.println( "------- scheduling jobs ------------------" ); string schedid = sched.getschedulerinstanceid(); int count = 1 ; jobdetail job = new jobdetail( "job_" + count, schedid, simplerecoveryjob. class ); // ask scheduler to re-execute this job if it was in progress when // the scheduler went down... job.setrequestsrecovery( true ); simpletrigger trigger = new simpletrigger( "triger_" + count, schedid, 200 , 1000l); trigger.setstarttime( new date(system.currenttimemillis() + 1000l)); system.out.println(job.getfullname() + " will run at: " + trigger.getnextfiretime() + " and repeat: " + trigger.getrepeatcount() + " times, every " + trigger.getrepeatinterval() / 1000 + " seconds" ); sched.schedulejob(job, trigger); count++; job = new jobdetail( "job_" + count, schedid, simplerecoverystatefuljob. class ); // ask scheduler to re-execute this job if it was in progress when // the scheduler went down... job.setrequestsrecovery( false ); trigger = new simpletrigger( "trig_" + count, schedid, 100 , 2000l); trigger.setstarttime( new date(system.currenttimemillis() + 2000l)); system.out.println(job.getfullname() + " will run at: " + trigger.getnextfiretime() + " and repeat: " + trigger.getrepeatcount() + " times, every " + trigger.getrepeatinterval() / 1000 + " seconds" ); sched.schedulejob(job, trigger); } // jobs don't start firing until start() has been called... system.out.println( "------- starting scheduler ---------------" ); sched.start(); system.out.println( "------- started scheduler ----------------" ); system.out.println( "------- waiting for one hour... ----------" ); try { thread.sleep(3600l * 1000l); } catch (exception e) { } system.out.println( "------- shutting down --------------------" ); sched.shutdown(); system.out.println( "------- shutdown complete ----------------" ); } public static void main(string[] args) throws exception { boolean clearjobs = true ; boolean schedulejobs = true ; for ( int i = 0 ; i < args.length; i++) { if (args[i].equalsignorecase( "clearjobs" )) { clearjobs = true ; } else if (args[i].equalsignorecase( "dontschedulejobs" )) { schedulejobs = false ; } } clusterexample example = new clusterexample(); example.run(clearjobs, schedulejobs); } } |
4.4 simplerecoveryjob.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package cluster; import java.io.serializable; import java.util.date; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.quartz.job; import org.quartz.jobexecutioncontext; import org.quartz.jobexecutionexception; //如果有想反復執(zhí)行的動作,作業(yè),任務就把相關的代碼寫在execute這個方法里,前提:實現job這個接口 //至于simplejob這個類什么時候實例化,execute這個方法何時被調用,我們不用關注,交給quartz public class simplerecoveryjob implements job, serializable { private static log _log = logfactory.getlog(simplerecoveryjob. class ); public simplerecoveryjob() { } public void execute(jobexecutioncontext context) throws jobexecutionexception { //這個作業(yè)只是簡單的打印出作業(yè)名字和此作業(yè)運行的時間 string jobname = context.getjobdetail().getfullname(); system.out.println( "job 1111111111111111111 simplerecoveryjob says: " + jobname + " executing at " + new date()); } } |
4.5 運行結果
server a與server b中的配置和代碼完全一樣。運行方法:運行任意主機上的clusterexample.java,將任務加入調度,觀察運行結果:
運行servera,結果如圖4.2所示。
圖4.2 servera運行結果1
開啟serverb后,servera與serverb的輸出如圖4.3、4.4所示。
圖4.3 servera運行結果2
圖4.4 serverb運行結果1
從圖4.3、4.4可以看出,serverb開啟后,系統自動實現了負責均衡,serverb接手job1。關斷servera后,serverb的運行結果如圖4.5所示。
圖4.5 serverb運行結果2
從圖4.5中可以看出,serverb可以檢測出servera丟失,將其負責的任務job2接手,并將servera丟失到server檢測出這段異常時間中需要執(zhí)行的job2重新執(zhí)行了。
5、注意事項
5.1 時間同步問題
quartz實際并不關心你是在相同還是不同的機器上運行節(jié)點。當集群放置在不同的機器上時,稱之為水平集群。節(jié)點跑在同一臺機器上時,稱之為垂直集群。對于垂直集群,存在著單點故障的問題。這對高可用性的應用來說是無法接受的,因為一旦機器崩潰了,所有的節(jié)點也就被終止了。對于水平集群,存在著時間同步問題。
節(jié)點用時間戳來通知其他實例它自己的最后檢入時間。假如節(jié)點的時鐘被設置為將來的時間,那么運行中的scheduler將再也意識不到那個結點已經宕掉了。另一方面,如果某個節(jié)點的時鐘被設置為過去的時間,也許另一節(jié)點就會認定那個節(jié)點已宕掉并試圖接過它的job重運行。最簡單的同步計算機時鐘的方式是使用某一個internet時間服務器(internet time server its)。
5.2 節(jié)點爭搶job問題
因為quartz使用了一個隨機的負載均衡算法, job以隨機的方式由不同的實例執(zhí)行。quartz官網上提到當前,還不存在一個方法來指派(釘住) 一個 job 到集群中特定的節(jié)點。
5.3 從集群獲取job列表問題
當前,如果不直接進到數據庫查詢的話,還沒有一個簡單的方式來得到集群中所有正在執(zhí)行的job列表。請求一個scheduler實例,將只能得到在那個實例上正運行job的列表。quartz官網建議可以通過寫一些訪問數據庫jdbc代碼來從相應的表中獲取全部的job信息。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:http://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html