



// 直接默认启动scheduler scheduler = stdschedulerfactory.getdefaultscheduler();// 或者手动注入配置properties properties = new properties();properties.setproperty("org.quartz.scheduler.instancename", "myscheduler");properties.setproperty("org.quartz.threadpool.threadcount", "4");properties.setproperty("org.quartz.jobstore.class", "org.quartz.simpl.ramjobstore");stdschedulerfactory factory = new stdschedulerfactory(properties);scheduler scheduler = factory.getscheduler();// 3. 系统传参,指定配置文件位置 : org.quartz.properties=quartz.properties//4. 直接不用传参 ,在classpath下面建立一个 quartz.properties文件就行了.

你做了什么?配置是如何加载的?其实就是这里。org . quartz . impl . std scheduler factory # getdefaultscheduler


public static scheduler getdefaultscheduler() throws schedulerexception { stdschedulerfactory fact = new stdschedulerfactory(); return fact.getscheduler();}

为了组织。quartz . impl . stdschedulerfactory # instantiate()方法对比,调度器的核心方法代码有几千行,不一一展示。主要目的是初始化一堆东西,打包一堆东西。


jobstore js = null;threadpool tp = null;quartzscheduler qs = null;dbconnectionmanager dbmgr = null;


第一 properties注入全部采用的是 set方法反射注入 ,org.quartz.impl.stdschedulerfactory#setbeanprops 这个方法里, 会set注入进去.第二 全部初始化都是 initialize方法

该接口由希望为org . quartz . core . quartz scheduler提供作业和触发存储机制的类实现& # 39;的用途。



org.quartz.spi.jobstore显然是存储我们工作的东西。如果我们不告诉他使用哪一个,默认实现是org . quartz . simple . ramjobstore



ram (直接jvm进程的)terracotta (terracotta是一款由美国terracotta公司开发的著名开源ja集群平台。)jdbc (mysql 之类的 ….)

希望为org . quartz . core . quartz scheduler提供线程池的类实现的接口& # 39;的用途。







public quartzscheduler(quartzschedulerresources resources, long idlewaittime, @deprecated long dbretryinterval){….}











job store-& gt;dbconnectionmanager-& gt;connectionproviders,这可以说是一种桥接模式。有一个管理器管理连接提供者,然后jobstore可以通过他获得连接。





public interface joblistener { string getname(); // job调用前 void jobtobeexecuted(jobexecutioncontext context); // veto 否决的意思 , 由 org.quartz.triggerlistener#vetojobexecution 决定 void jobexecutionvetoed(jobexecutioncontext context); // job调用后 void jobwasexecuted(jobexecutioncontext context, jobexecutionexception jobexception);}public interface triggerlistener { string getname(); // 触发trigger void triggerfired(trigger trigger, jobexecutioncontext context); // 禁止执行jobexecution boolean vetojobexecution(trigger trigger, jobexecutioncontext context); // triggermisfired void triggermisfired(trigger trigger); void triggercomplete(trigger trigger, jobexecutioncontext context, completedexecutioninstruction triggerinstructioncode);}


触发器触发-& gt;

vetojobexecution?false-& gt;jobtobeexecuted-& gt;调用作业接口中的方法->:jobwascexecuted

?真-& gt;作业执行



org.quartz.joblistener.j1.class=com.example.springquartz.myjoblistener// 比如属性设置可以这么做, 只要实现set方法就可以// org.quartz.joblistener.l1.name=myjoblistenerorg.quartz.triggerlistener.t1.class=com.example.springquartz.mytriggerlistener







org . quartz . core . quartz scheduler # quartz scheduler是执行任务的核心,也就是心脏。然后交给->: quartzschedulerthread轮询处理

public quartzscheduler(quartzschedulerresources resources, long idlewaittime, @deprecated long dbretryinterval) throws schedulerexception { this.resources = resources; if (resources.getjobstore() instanceof joblistener) { addinternaljoblistener((joblistener)resources.getjobstore()); } // 核心处理线程 – > 这个就是整个心脏 this.schedthread = new quartzschedulerthread(this, resources); threadexecutor schedthreadexecutor = resources.getthreadexecutor(); // 心脏启动 schedthreadexecutor.execute(this.schedthread); if (idlewaittime > 0) { this.schedthread.setidlewaittime(idlewaittime); } jobmgr = new executingjobanager(); addinternaljoblistener(jobmgr); errlogger = new errorlogger(); addinternalschedulerlistener(errlogger); signaler = new schedulersignalerimpl(this, this.schedthread); getlog().info("quartz scheduler v." getversion() " created.");}

org . quartz . core . quartz scheduler thread # run-& gt;这里描述的是quartzschedulerthread的主处理循环。


@overridepublic void run() { int acquiresfailed = 0; while (!halted.get()) { try { // 因为提前开启了 , 所以需要等待真正启动了- > 调用start, 才能继续执行. paused暂停.在 // org.quartz.core.quartzscheduler#start执行. // check if we're supposed to pause… synchronized (siglock) { while (paused && !halted.get()) { try { // wait until togglepause(false) is called… siglock.wait(1000l); } catch (interruptedexception ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresfailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. db is down or restarting).. if (acquiresfailed > 1) { try { long delay = computedelayforrepeatederrors(qsrsrcs.getjobstore(), acquiresfailed); thread.sleep(delay); } catch (exception ignore) { } } // 这里是获取正在等待的threadpool中的线程 . 如果有执行,你要知道`quartzschedulerresources` 把所有东西都放进去了 int ailthreadcount = qsrsrcs.getthreadpool().blockforailablethreads(); if(ailthreadcount > 0) { // will always be true, due to semantics of blockforailablethreads… // 获取当前时刻的triggers list triggers; long now = system.currenttimemillis(); clearsignaledschengchange(); try { triggers = qsrsrcs.getjobstore().acquirenexttriggers( now idlewaittime, math.min(ailthreadcount, qsrsrcs.getmaxbatchsize()), qsrsrcs.getbatchtimewindow()); acquiresfailed = 0; if (log.isdebugenabled()) log.debug("batch acquisition of " (triggers == null ? 0 : triggers.size()) " triggers"); } catch (jobpersistenceexception jpe) { if (acquiresfailed == 0) { // 打印日志 qs.notifyschedulerlistenerserror( "an error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresfailed < integer.max_value) acquiresfailed ; continue; } catch (runtimeexception e) { if (acquiresfailed == 0) { getlog().error("quartzschedulerthreadloop: runtimeexception " e.getmessage(), e); } if (acquiresfailed < integer.max_value) acquiresfailed ; continue; } // 这里就是如果有triggers继续执行 if (triggers != null && !triggers.isempty()) { now = system.currenttimemillis(); long triggertime = triggers.get(0).getnextfiretime().gettime(); long timeuntiltrigger = triggertime – now; // 这个处理比较灵性.. 死循环, 你差多久, 我就 wait多久 while(timeuntiltrigger > 2) { synchronized (siglock) { if (halted.get()) { break; } if (!iscandidatenewtimeearlierwithinreason(triggertime, false)) { try { // wait // we could he blocked a long while // on 'synchronize', so we must recompute now = system.currenttimemillis(); timeuntiltrigger = triggertime – now; if(timeuntiltrigger >= 1) siglock.wait(timeuntiltrigger); } catch (interruptedexception ignore) { } } } if(releaseifschedulechangedsignificantly(triggers, triggertime)) { break; } now = system.currenttimemillis(); timeuntiltrigger = triggertime – now; } // this happens if releaseifschedulechangedsignificantly decided to release triggers // 防止其他发生 if(triggers.isempty()) continue; // 初始化一个 triggerfiredresult // set triggers to 'executing' list bndles = new arraylist(); boolean goahead = true; synchronized(siglock) { goahead = !halted.get(); } if(goahead) { try { list res = qsrsrcs.getjobstore().triggersfired(triggers); if(res != null) bndles = res; } catch (schedulerexception se) { qs.notifyschedulerlistenerserror( "an error occurred while firing triggers '" triggers "'", se); //qtz-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i ) { qsrsrcs.getjobstore().releaseacquiredtrigger(triggers.get(i)); } continue; } } // 循环执行 for (int i = 0; i < bndles.size(); i ) { // triggerfiredresult result = bndles.get(i); triggerfiredbundle bndle = result.gettriggerfiredbundle(); exception exception = result.getexception(); if (exception instanceof runtimeexception) { getlog().error("runtimeexception while firing trigger " triggers.get(i), exception); qsrsrcs.getjobstore().releaseacquiredtrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time… or if the scheduler was shutdown (halted) if (bndle == null) { qsrsrcs.getjobstore().releaseacquiredtrigger(triggers.get(i)); continue; } // 初始化任务 jobrunshell shell = null; try { // 初始化流程 shell = qsrsrcs.getjobrunshellfactory().createjobrunshell(bndle); shell.initialize(qs); } catch (schedulerexception se) { qsrsrcs.getjobstore().triggeredjobcomplete(triggers.get(i), bndle.getjobdetail(), completedexecutioninstruction.set_all_job_triggers_error); continue; } // 这里就是任务执行了. … 这里就涉及到 -> `org.quartz.simpl.simplethreadpool.workerthread#run()` 这里了 if (qsrsrcs.getthreadpool().runinthread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently – which the docs // say not to do… getlog().error("threadpool.runinthread() return false!"); qsrsrcs.getjobstore().triggeredjobcomplete(triggers.get(i), bndle.getjobdetail(), completedexecutioninstruction.set_all_job_triggers_error); } } continue; // while (!halted) } } else { // if(ailthreadcount > 0) // should never happen, if threadpool.blockforailablethreads() follows contract continue; // while (!halted) } long now = system.currenttimemillis(); long waittime = now getrandomizedidlewaittime(); long timeuntilcontinue = waittime – now; synchronized(siglock) { try { if(!halted.get()) { // qtz-336 a job might he been completed in the mean time and we might he // missed the scheduled changed signal by not waiting for the notify() yet // check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isschedulechanged()) { siglock.wait(timeuntilcontinue); } } } catch (interruptedexception ignore) { } } } catch(runtimeexception re) { getlog().error("runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection… qs = null; qsrsrcs = null;}


org . quartz . core . quartz scheduler # start-& gt;给你。核心是schedthread.togglepause(false),

public void start() throws schedulerexception { if (shuttingdown|| closed) { throw new schedulerexception( "the scheduler cannot be restarted after shutdown() has been called."); } // qtz-212 : calling new schedulerstarting() method on the listeners // right after entering start() notifyschedulerlistenersstarting(); if (initialstart == null) { initialstart = new date(); this.resources.getjobstore().schedulerstarted(); startplugins(); } else { resources.getjobstore().schedulerresumed(); } // 这里设置的目的就是将它继续执行 -> `org.quartz.core.quartzschedulerthread#run`252取消暂停. schedthread.togglepause(false); getlog().info( "scheduler " resources.getuniqueidentifier() " started."); notifyschedulerlistenersstarted();}

调用qssrcs。getthreadpool()。runinthread (shell)->:调用org。quartz . simple . simple thread pool # runinthread->:那么在这个下面

public boolean runinthread(runnable runnable) { if (runnable == null) { return false; } synchronized (nextrunnablelock) { handoffpending = true; // wait until a worker thread is ailable , 这里就是等 , 等有可用的线程,这里我不理解为啥不使用队列… 其实可以使用队列的. while ((ailworkers.size() < 1) && !isshutdown) { try { nextrunnablelock.wait(500); } catch (interruptedexception ignore) { } } if (!isshutdown) { // 等到了, 拿着第一个可用线程执行 workerthread wt = (workerthread)ailworkers.removefirst(); busyworkers.add(wt); wt.run(runnable); } else { // if the thread pool is going down, execute the runnable // within a new additional worker thread (no thread from the pool). workerthread wt = new workerthread(this, threadgroup, "workerthread-lastjob", prio, iakethreadsdaemons(), runnable); busyworkers.add(wt); workers.add(wt); wt.start(); } // 释放锁 nextrunnablelock.notifyall(); handoffpending = false; } // 返回ok , 这里其实是异步的 , 因为处理流程交给了子线程, 他只是返回了他已经处理了, 但是有一个问题就是线程处理时间过长, 就会影响周期性.(所以建议设置多点,但是也不好,线程一直处于不断运行阶段) return true;}

调用wt . run(runnable);-& gt;调用组织。quartz . simple . simple thread pool . worker thread # run(ja . lang . runnable)来执行run方法。

public void run(runnable newrunnable) { synchronized(lock) { if(runnable != null) { throw new illegalstateexception("already running a runnable!"); } runnable = newrunnable; lock.notifyall(); }}


org . quartz . simple . simple thread pool . worker thread # run()每个工作线程在初始化的时候都一直在这里等待。等等等等等等等等。

@overridepublic void run() { boolean ran = false; // 转 …. while (run.get()) { try { // 同步等待. 500ms , 直到有 runnable … 很可怜, 其实基本大多都是这么实现的 eventloop synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; // 启动任务, 这里直接将runnable对象启动, 而不是new thread(runnable).start,而是直接调用的方式. 让这个线程去处理. runnable.run(); } } } catch (interruptedexception unblock) { // do nothing (loop will terminate if shutdown() was called try { getlog().error("worker thread was interrupt()'ed.", unblock); } catch(exception e) { // ignore to help with a tomcat glitch } } catch (throwable exceptioninrunnable) { try { getlog().error("error while executing the runnable: ", exceptioninrunnable); } catch(exception e) { // ignore to help with a tomcat glitch } } finally { synchronized(lock) { runnable = null; } // repair the thread in case the runnable mucked it up… if(getpriority() != tp.getthreadpriority()) { setpriority(tp.getthreadpriority()); } if (runonce) { run.set(false); clearfrombusyworkerslist(this); } else if(ran) { ran = false; makeailable(this); } } } //if (log.isdebugenabled()) try { getlog().debug("workerthread is shut down."); } catch(exception e) { // ignore to help with a tomcat glitch }}

public class demo { public static final object object = new object(); public static void main(string[] args) throws interruptedexception { long start = system.currenttimemillis(); while (true) { synchronized (object) { system.out.println("time : " (system.currenttimemillis() – start) "ms."); object.wait(1000l); } } }}


time : 0ms.time : 1000ms.time : 2001ms.time : 3002ms.time : 4002ms.time : 5003ms.





