#为什么使用Quartz?
Java 中已经有一个 timer 类可以用来进行执行计划,定时任务。我们所要做的只是继承 java.util.TimerTask 类。如下所示:
import java.util.Calendar;import java.util.Timer;import java.util.TimerTask;public class ReportGenerator extends TimerTask { public void run() { System.out.println("Generating report"); } public static void main(String[] args) { Timer timer = new Timer(); Calendar date = Calendar.getInstance(); date.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY); date.set(Calendar.HOUR, 0); date.set(Calendar.MINUTE, 0); date.set(Calendar.SECOND, 0); date.set(Calendar.MILLISECOND, 0); // Schedule to run every Sunday in midnight timer.schedule(new ReportGenerator(), // TimerTask date.getTime(), // Timer 1000 * 60 * 60 * 24 * 7);// delay }}
这里有几个问题,我们的类继承了TimerTask ,而TimerTask 也是实现了 java.lang.Runnable 接口。我们所要做的只是在我们自己的类里重置 run()方法。所以我们的TimerTask类其实是一种线程,但线程的调度往往不是按照我们希望来实现的,因为一些垃圾收集等原因,我们计划的时间点,却没有执行必要的任务。这样会产生一些问题。虽然,Timer 类也提供了scheduleAtFixedRate()方法用来在垃圾收集后能够快速的追上任务进度,但这个不一定是我们所需要的。特别是在 一些 J2EE 服务器上 Timer 是无法控制的,因为它不在容器的权责范围内。另外,这个任务调度也缺乏一些企业级所需要的特殊日期定制的功能,以及修改、查找任务的功能。
这里我们要介绍的是一个开源项目:Quartz 。Quartz 定义了两种基本接口 Job 和 Trigger,看名字也就知道,我们的任务必须实现 Job, 我们的时间触发器定义在 Trigger 内。 看一个例子也许能更快的了解他的使用方法:
import org.quartz.*;import org.quartz.impl.JobDetailImpl;import org.quartz.impl.StdSchedulerFactory;import org.quartz.impl.triggers.CronTriggerImpl;public class QuartzReport implements Job { public void execute(JobExecutionContext cntxt) throws JobExecutionException { System.out.println("Generating report - " + cntxt.getJobDetail().getJobDataMap().get("type")); } @SuppressWarnings("deprecation") public static void main(String[] args) { try { SchedulerFactory schedFact = new StdSchedulerFactory(); Scheduler sched = schedFact.getScheduler(); sched.start(); JobDetail jobDetail = new JobDetailImpl("Income Report", // 任务名 "Report Generation", // 任务组 QuartzReport.class //任务执行的类 ); jobDetail.getJobDataMap().put("type", "FULL"); CronTriggerImpl trigger = new CronTriggerImpl("Income Report", // 触发器名 "Report Generation" // 触发器组 ); trigger.setCronExpression( // 触发器时间设定 "0 0 12 ? * SUN"); sched.scheduleJob(jobDetail, trigger); // 执行任务 } catch (Exception e) { e.printStackTrace(); } }}
当然这个例子比较简单,也没有跟Spring集成,网上有很多Quartz的具体示例,这里我们着重讨论它的实现原理。
#Quartz Scheduler 开源框架
Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者可以到 Quartz 的发布版本及其源代码。笔者在产品开发中使用的是版本 2.2.1,因此本文内容基于该版本。
作为一个优秀的开源调度框架,Quartz 具有以下特点:
-
强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求。
-
灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式。
-
分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。本文不讨论该部分内容。
另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。
下面是本文中用到的一些专用词汇,在此声明:
-
scheduler:任务调度器
-
trigger:触发器,用于定义任务调度时间规则
-
job:任务,即被调度的任务
-
misfire:错过的,指本来应该被执行但实际没有被执行的任务调度
#Quartz 任务调度的基本实现原理
##核心元素
Quartz 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任务调度的元数据, scheduler 是实际执行调度的控制器。
在 Quartz 中,trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。这四种 trigger 可以满足企业应用中的绝大部分需求。我们将在企业应用一节中进一步讨论四种 trigger 的功能。
在 Quartz 中,job 用于表示被调度的任务。主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来说,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job 主要有两种属性:volatility 和 durability,其中 volatility 表示任务是否被持久化到数据库存储,而 durability 表示在没有 trigger 关联的时候任务是否被保留。两者都是在值为 true 的时候任务被持久化或保留。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。
在 Quartz 中, scheduler 由 scheduler 工厂创建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,因为 DirectSchedulerFactory 使用起来不够方便,需要作许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。
Quartz 核心元素之间的关系如下图所示:
##数据存储
Quartz 中的 trigger 和 job 需要存储下来才能被使用。Quartz 中有两种存储方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是将 trigger 和 job 存储在内存中,而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度非常快,但是其在系统被停止后所有的数据都会丢失,用户按照实际场景选用。
##以线程等待的方式实现按时间调度
Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:
其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:
public void run() { boolean lastAcquireFailed = false; while (!halted) { try { // check if we're supposed to pause... synchronized (pauseLock) { while (paused && !halted) { try { // wait until togglePause(false) is called... pauseLock.wait(100L); } catch (InterruptedException ignore) { } } if (halted) { break; } } ...... } }
以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
Trigger trigger = null; long now = System.currentTimeMillis(); signaled = false; try { trigger = qsRsrcs.getJobStore().acquireNextTrigger( ctxt, now + idleWaitTime); lastAcquireFailed = false; } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occured while scanning for the next trigger to fire.", jpe); } lastAcquireFailed = true; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; }
这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:
now = System.currentTimeMillis(); long triggerTime = trigger.getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; long spinInterval = 10; int numPauses = (int) (timeUntilTrigger / spinInterval); while (numPauses >= 0 && !signaled) { try { Thread.sleep(spinInterval); } catch (InterruptedException ignore) { } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; numPauses = (int) (timeUntilTrigger / spinInterval); } if (signaled) { try { qsRsrcs.getJobStore().releaseAcquiredTrigger( ctxt, trigger); } catch (JobPersistenceException jpe) { qs.notifySchedulerListenersError( "An error occured while releasing trigger '" + trigger.getFullName() + "'", jpe); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(trigger); } catch (RuntimeException e) { getLog().error( "releaseTriggerRetryLoop: RuntimeException " +e.getMessage(), e); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(trigger); } signaled = false; continue; }
此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:
JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell(); shell.initialize(qs, bndle); } catch (SchedulerException se) { try { qsRsrcs.getJobStore().triggeredJobComplete(ctxt, trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occured while placing job's triggers in error state '" + trigger.getFullName() + "'", se2); // db connection must have failed... keep retrying // until it's up... errorTriggerRetryLoop(bndle); } continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { try { getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(ctxt, trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occured while placing job's triggers in error state '" + trigger.getFullName() + "'", se2); // db connection must have failed... keep retrying // until it's up... releaseTriggerRetryLoop(trigger); } }
此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。
之后的代码就是清扫战场,就不在累述。
#分布式任务调度
quartz自带的分布式调度,是用数据库锁来实现的,可能不满足某些高并发场景下的需求,为此自己研发了适合自身场景的去中心化的分布式任务调度组件: