博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Quartz原理
阅读量:6580 次
发布时间:2019-06-24

本文共 9452 字,大约阅读时间需要 31 分钟。

hot3.png

#为什么使用Quartz?

Java 中已经有一个 timer 类可以用来进行执行计划,定时任务。我们所要做的只是继承 java.util.TimerTask 类。如下所示:

import java.util.Calendar;import java.util.Timer;import java.util.TimerTask;public class ReportGenerator extends TimerTask {    public void run() {        System.out.println("Generating report");    }    public static void main(String[] args) {        Timer timer = new Timer();        Calendar date = Calendar.getInstance();        date.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY);        date.set(Calendar.HOUR, 0);        date.set(Calendar.MINUTE, 0);        date.set(Calendar.SECOND, 0);        date.set(Calendar.MILLISECOND, 0);        // Schedule to run every Sunday in midnight        timer.schedule(new ReportGenerator(), // TimerTask                date.getTime(), // Timer                1000 * 60 * 60 * 24 * 7);// delay    }}

这里有几个问题,我们的类继承了TimerTask ,而TimerTask 也是实现了 java.lang.Runnable 接口。我们所要做的只是在我们自己的类里重置 run()方法。所以我们的TimerTask类其实是一种线程,但线程的调度往往不是按照我们希望来实现的,因为一些垃圾收集等原因,我们计划的时间点,却没有执行必要的任务。这样会产生一些问题。虽然,Timer 类也提供了scheduleAtFixedRate()方法用来在垃圾收集后能够快速的追上任务进度,但这个不一定是我们所需要的。特别是在 一些 J2EE 服务器上 Timer 是无法控制的,因为它不在容器的权责范围内。另外,这个任务调度也缺乏一些企业级所需要的特殊日期定制的功能,以及修改、查找任务的功能。

这里我们要介绍的是一个开源项目:Quartz 。Quartz 定义了两种基本接口 Job 和 Trigger,看名字也就知道,我们的任务必须实现 Job, 我们的时间触发器定义在 Trigger 内。 看一个例子也许能更快的了解他的使用方法:

import org.quartz.*;import org.quartz.impl.JobDetailImpl;import org.quartz.impl.StdSchedulerFactory;import org.quartz.impl.triggers.CronTriggerImpl;public class QuartzReport implements Job {    public void execute(JobExecutionContext cntxt) throws JobExecutionException {        System.out.println("Generating report - " + cntxt.getJobDetail().getJobDataMap().get("type"));    }    @SuppressWarnings("deprecation")    public static void main(String[] args) {        try {            SchedulerFactory schedFact = new StdSchedulerFactory();            Scheduler sched = schedFact.getScheduler();            sched.start();            JobDetail jobDetail = new JobDetailImpl("Income Report",      // 任务名                    "Report Generation", // 任务组                    QuartzReport.class    //任务执行的类                    );            jobDetail.getJobDataMap().put("type", "FULL");            CronTriggerImpl trigger = new CronTriggerImpl("Income Report", // 触发器名                    "Report Generation" // 触发器组            );            trigger.setCronExpression( // 触发器时间设定            "0 0 12 ? * SUN");            sched.scheduleJob(jobDetail, trigger); // 执行任务        } catch (Exception e) {            e.printStackTrace();        }    }}

当然这个例子比较简单,也没有跟Spring集成,网上有很多Quartz的具体示例,这里我们着重讨论它的实现原理。

#Quartz Scheduler 开源框架

Quartz 是 OpenSymphony 开源组织在任务调度领域的一个开源项目,完全基于 Java 实现。该项目于 2009 年被 Terracotta 收购,目前是 Terracotta 旗下的一个项目。读者可以到 Quartz 的发布版本及其源代码。笔者在产品开发中使用的是版本 2.2.1,因此本文内容基于该版本。

作为一个优秀的开源调度框架,Quartz 具有以下特点:

  1. 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求。

  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式。

  3. 分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。本文不讨论该部分内容。

另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。

下面是本文中用到的一些专用词汇,在此声明:

  • scheduler:任务调度器

  • trigger:触发器,用于定义任务调度时间规则

  • job:任务,即被调度的任务

  • misfire:错过的,指本来应该被执行但实际没有被执行的任务调度

#Quartz 任务调度的基本实现原理

##核心元素

Quartz 任务调度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任务调度的元数据, scheduler 是实际执行调度的控制器。

在 Quartz 中,trigger 是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz 中主要提供了四种类型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。这四种 trigger 可以满足企业应用中的绝大部分需求。我们将在企业应用一节中进一步讨论四种 trigger 的功能。

在 Quartz 中,job 用于表示被调度的任务。主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。对于同一个 trigger 来说,有状态的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job 主要有两种属性:volatility 和 durability,其中 volatility 表示任务是否被持久化到数据库存储,而 durability 表示在没有 trigger 关联的时候任务是否被保留。两者都是在值为 true 的时候任务被持久化或保留。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。

在 Quartz 中, scheduler 由 scheduler 工厂创建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,因为 DirectSchedulerFactory 使用起来不够方便,需要作许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 为例讲解。这也是笔者在项目中所使用的 scheduler 类。

Quartz 核心元素之间的关系如下图所示:

Quartz 核心元素关系图

##数据存储

Quartz 中的 trigger 和 job 需要存储下来才能被使用。Quartz 中有两种存储方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是将 trigger 和 job 存储在内存中,而 JobStoreSupport 是基于 jdbc 将 trigger 和 job 存储到数据库中。RAMJobStore 的存取速度非常快,但是其在系统被停止后所有的数据都会丢失,用户按照实际场景选用。

##以线程等待的方式实现按时间调度

Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:

时序图

其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:

public void run() {              boolean lastAcquireFailed = false;                            while (!halted) {                  try {                      // check if we're supposed to pause...                      synchronized (pauseLock) {                          while (paused && !halted) {                              try {                                  // wait until togglePause(false) is called...                                  pauseLock.wait(100L);                              } catch (InterruptedException ignore) {                              }                          }                                    if (halted) {                              break;                          }                      }                 ......            }      }

以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:

Trigger trigger = null;      long now = System.currentTimeMillis();      signaled = false;      try {          trigger = qsRsrcs.getJobStore().acquireNextTrigger(                  ctxt, now + idleWaitTime);          lastAcquireFailed = false;      } catch (JobPersistenceException jpe) {          if(!lastAcquireFailed) {              qs.notifySchedulerListenersError(                  "An error occured while scanning for the next trigger to fire.",                  jpe);          }          lastAcquireFailed = true;      } catch (RuntimeException e) {          if(!lastAcquireFailed) {              getLog().error("quartzSchedulerThreadLoop: RuntimeException "                      +e.getMessage(), e);          }          lastAcquireFailed = true;      }

这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:

now = System.currentTimeMillis();      long triggerTime = trigger.getNextFireTime().getTime();      long timeUntilTrigger = triggerTime - now;      long spinInterval = 10;      int numPauses = (int) (timeUntilTrigger / spinInterval);      while (numPauses >= 0 && !signaled) {          try {              Thread.sleep(spinInterval);          } catch (InterruptedException ignore) {          }          now = System.currentTimeMillis();          timeUntilTrigger = triggerTime - now;          numPauses = (int) (timeUntilTrigger / spinInterval);      }      if (signaled) {          try {              qsRsrcs.getJobStore().releaseAcquiredTrigger(                      ctxt, trigger);          } catch (JobPersistenceException jpe) {              qs.notifySchedulerListenersError(                      "An error occured while releasing trigger '"                              + trigger.getFullName() + "'",                      jpe);              // db connection must have failed... keep              // retrying until it's up...              releaseTriggerRetryLoop(trigger);          } catch (RuntimeException e) {              getLog().error(                  "releaseTriggerRetryLoop: RuntimeException "                  +e.getMessage(), e);              // db connection must have failed... keep              // retrying until it's up...              releaseTriggerRetryLoop(trigger);          }          signaled = false;          continue;      }

此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:

JobRunShell shell = null;  try {      shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();      shell.initialize(qs, bndle);  } catch (SchedulerException se) {      try {          qsRsrcs.getJobStore().triggeredJobComplete(ctxt,                  trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);      } catch (SchedulerException se2) {          qs.notifySchedulerListenersError(                  "An error occured while placing job's triggers in error state '"                          + trigger.getFullName() + "'", se2);          // db connection must have failed... keep retrying          // until it's up...          errorTriggerRetryLoop(bndle);      }      continue;  }  if (qsRsrcs.getThreadPool().runInThread(shell) == false) {      try {          getLog().error("ThreadPool.runInThread() return false!");          qsRsrcs.getJobStore().triggeredJobComplete(ctxt,                  trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);      } catch (SchedulerException se2) {          qs.notifySchedulerListenersError(                  "An error occured while placing job's triggers in error state '"                          + trigger.getFullName() + "'", se2);          // db connection must have failed... keep retrying          // until it's up...          releaseTriggerRetryLoop(trigger);      }  }

此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。

之后的代码就是清扫战场,就不在累述。

#分布式任务调度

quartz自带的分布式调度,是用数据库锁来实现的,可能不满足某些高并发场景下的需求,为此自己研发了适合自身场景的去中心化的分布式任务调度组件:

转载于:https://my.oschina.net/zhaoyi1/blog/895751

你可能感兴趣的文章
WinForm 之 程序启动不显示主窗体
查看>>
【Network】Calico, Flannel, Weave and Docker Overlay Network 各种网络模型之间的区别
查看>>
【转】Oracle索引的类型
查看>>
FragmentTransaction.replace() 你不知道的坑
查看>>
分布式消息队列 Kafka
查看>>
模拟退火算法
查看>>
Solr 按照得分score跟指定字段相乘排序
查看>>
MySQL数据库如何去掉数据库中重复记录
查看>>
【原创】如何写一篇“用户友好”的随笔
查看>>
【16】成对使用new和delete时要采取相同形式
查看>>
POJ 2352 Stars
查看>>
SharpRush中的AOP实现
查看>>
[摘自DbC原则与实践]DbC的一些优点和限制
查看>>
配置错误定义了重复的“system.web.extensions/scripting/scriptResourceHandler” 解决办法...
查看>>
.net平台下开源(免费)三维 GIS (地形,游戏)平台资料
查看>>
大公司 or 小公司
查看>>
.h和.cpp文件的区别(zt)
查看>>
SQLSERVER中的锁资源类型RID KEY PAG EXT TAB DB FIL
查看>>
将Datagridview中的数据导出至Excel中
查看>>
c++下面的一个单例
查看>>