Asymmetrical View

Exploring Quartz from Clojure

From the Quartz website:

Quartz is a job scheduling system that can be integrated with, or used along side virtually any other software system. The term “job scheduler” seems to conjure different ideas for different people…in short, a job scheduler is a system that is responsible for executing (or notifying) other software components when a pre-determined (scheduled) time arrives.

I wanted to be able to explore how Quartz worked from Clojure. Quartz executes Jobs. You do not schedule job instances in Quartz though, instead you pass the scheduler a factory, specifically a JobDetail. The JobDetail will specify the Class of the class implementing the job, which must support a no-arg constructor. This meant I couldn’t use Clojure’s proxy to implement the Job instance – since it needed to be constructible via a call like Class.newInstance. Looking briefly at the Quartz source it appears to be the technique used in at least the SimpleJobFactory.

ClojureJob.java

To enable the calling of Clojure from a Quartz Job I implemented a very basic ClojureJob class:

package com.github.kyleburton.sandbox.quartz;

import clojure.lang.Namespace;
import clojure.lang.RT;
import clojure.lang.Symbol;
import clojure.lang.Var;
import clojure.lang.IFn;

import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * Quartz Job class for executing a clojure function.  The clojure
 * function will take a single argument, the JobExecutionContext which
 * is passed to this Job's execute method.
 *
 * NB: There is no way to capture or propagate an error from the
 * called function back to the context where the job was scheduled.
 *
 * http://www.opensymphony.com/quartz/
 *
 * @author Kyle R. Burton <kyle.burton@gmail.com>
 */
public class ClojureJob implements Job {
  /** JobExecutionContext/JobDetail/JobDataMap Parameter for the namespace of the function that will be called. */
  public  static final String NAMESPACE_PARAMETER     = "job.clojure.namespace";
  /** JobExecutionContext/JobDetail/JobDataMap Parameter for the name of the function that will be called. */
  public  static final String FUNCTION_NAME_PARAMETER = "job.clojure.function";
  /** */
  public  static final String FUNCTION_PARAMETER = "job.clojure.fn";
  private static final Class CLASS = ClojureJob.class;

  /**
   * Quartz required no-arg constructor.  Does nothing.
   */
  public ClojureJob() {
  }

  /**
   * Execute implementation, look up the clojure function specified in
   * the JobDataMap, invoke it with the JobExecutionContext.
   * @param context the JobExecutionContext passed in by quartz
   * @throws JobExecutionContext if the function can not be looked up,
   * or if the function throws an exception.
   */
  @Override
  public void execute(JobExecutionContext context) throws JobExecutionException {
    if ( null != contextParameter(context,FUNCTION_PARAMETER) ) {
      executeFunction(context,(IFn)contextParameter(context,FUNCTION_PARAMETER));
      return;
    }

    executeVar(context);
  }

  private void executeVar( JobExecutionContext context) throws JobExecutionException {
    Var fn = lookupClojureFunction(context);
    if ( null == fn ) {
      throw new JobExecutionException(
        String.format(CLASS.getName() + ".execute: unable to find the specified function, namespace=%s; function=%s", 
                      contextParameterString(context,NAMESPACE_PARAMETER),
                      contextParameterString(context,FUNCTION_NAME_PARAMETER)));
    }

    try {
      fn.invoke(context);
    }
    catch(Exception ex) {
      throw new JobExecutionException(ex);
    }
  }

  private void executeFunction( JobExecutionContext context, IFn fn) throws JobExecutionException {
    try {
      fn.invoke(context);
    }
    catch(Exception ex) {
      throw new JobExecutionException(ex);
    }
  }

  /**
   * Helper function for pulling parameters from the JobDataMap
   *
   * @param JobExecutionException the context to pull the parameter from
   * @param name the parameter to pull from the JobDataMap
   * @return the string value from the JobDataMap
   */
  private String contextParameterString(JobExecutionContext context, String name) {
    return context.getJobDetail().getJobDataMap().getString(name);
  }

  private Object contextParameter(JobExecutionContext context, String name) {
    return context.getJobDetail().getJobDataMap().get(name);
  }

  /**
   * Helper function for looking up the clojure function specified in
   * the quartz job.
   *
   * @param JobExecutionException the context passed in from quartz
   * @return the clojure Var which is the function
   */
  private Var lookupClojureFunction(JobExecutionContext context) { 
    String namespaceName = contextParameterString(context,NAMESPACE_PARAMETER);
    String functionName  = contextParameterString(context,FUNCTION_NAME_PARAMETER);
    Symbol symNamespace  = Symbol.create(namespaceName);
    Namespace namespace  = Namespace.findOrCreate(symNamespace);
    return Var.intern(namespace,Symbol.create(functionName));
  }
}

This class calls a clojure function that is either looked up or one that is passed in via the JobDetails of the JobExecutionContext. In the case when an actual function (Clojure IFn) is passed, it is called without checking to see if a named function is passed. The JobExecutionContext is passed to the invoke call So that parameters can be passed through to the Clojure function in either case.

quartz.clj

This allowed me to then create and schedule jobs which are backed by clojure functions. Here is quartz.clj

(ns com.github.kyleburton.sandbox.quartz
  (:import (org.quartz SchedulerFactory Scheduler TriggerUtils JobDetail)
           (org.quartz.impl StdSchedulerFactory)
           (com.github.kyleburton.sandbox.quartz ClojureJob)))

(def *schedule-factory* (StdSchedulerFactory.))

(def *scheduler* (atom nil))

(defn ensure-scheduler-started []
  (if (or (not @*scheduler*)
          (.isShutdown @*scheduler*)
          (not (.isStarted @*scheduler*)))
    (do
      (reset! *scheduler* (.getScheduler *schedule-factory*))
      (.start @*scheduler*)
      true)
    nil))

(defn stop-scheduler []
  (if (and @*scheduler*
           (.isStarted @*scheduler*))
    (.shutdown @*scheduler*)))

(defn schedule-job [job-detail trigger]
  (ensure-scheduler-started)
  (.scheduleJob @*scheduler* job-detail trigger))

(defn delete-job [job-detail]
  (.deleteJob @*scheduler*
              (.getName job-detail)
              (.getGroup job-detail)))

(defn job-exists? [job-detail]
  (not (nil? (.getJobDetail @*scheduler*
                            (.getName job-detail)
                            (.getGroup job-detail)))))

(defn testfn [context]
  (prn (format "testfn: context=%s time=%s" 
               context
               (java.util.Date.))))

(defn quartz-test []
  (let [job-detail (JobDetail. "myJob" nil ClojureJob)
        trigger (doto (TriggerUtils/makeSecondlyTrigger 10)
                  (.setStartTime (TriggerUtils/getEvenSecondDate (java.util.Date.)))
                  (.setName "My Second Trigger"))]
    (.put (.getJobDataMap job-detail) ClojureJob/NAMESPACE_PARAMETER "com.github.kyleburton.sandbox.quartz")
    (.put (.getJobDataMap job-detail) ClojureJob/FUNCTION_NAME_PARAMETER "testfn")
    (schedule-job job-detail trigger)))

(defn quartz-test-fn [fn]
  (let [job-detail (JobDetail. "myJob" nil ClojureJob)
        trigger (doto (TriggerUtils/makeSecondlyTrigger 10)
                  (.setStartTime (TriggerUtils/getEvenSecondDate (java.util.Date.)))
                  (.setName "My Second Trigger"))]
    (.put (.getJobDataMap job-detail) ClojureJob/FUNCTION_PARAMETER fn)
    (schedule-job job-detail trigger)))


;; (quartz-test)
;; (stop-scheduler)
;; (def *count* (atom 0))
;; (quartz-test-fn (fn [context] 
;;                   (reset! *count* (inc @*count*))
;;                   (prn (format "anon scheduled function! context=%s called %d times!" context @*count*))))
;; (stop-scheduler)

To play with these pull down my sandbox run ant in the clojure-utils sub directory with the fetch-deps and jar targets.

kyle@indigo64 ~/personal/projects/sandbox/clojure-utils[master]$ ant fetch-deps jar
Buildfile: build.xml

fetch-deps:
      [get] Getting: http://asymmetrical-view.com/personal/repo//ant-1.7.0.jar
      [get] To: /home/mortis/personal/projects/sandbox/clojure-utils/lib/ant-1.7.0.jar
      ...and lots more...

compile:
    [mkdir] Created dir: /home/mortis/personal/projects/sandbox/clojure-utils/target/classes
    [javac] Compiling 1 source file to /home/mortis/personal/projects/sandbox/clojure-utils/target/classes

jar:  
      [jar] Building jar: /home/mortis/personal/projects/sandbox/clojure-utils/target/krb-clojure-utils-0.1.jar

BUILD SUCCESSFUL
Total time: 23 seconds
kyle@indigo64 ~/personal/projects/sandbox/clojure-utils[master]$ ant repl

You can use ant repl to run a Clojure REPL with the necessary dependencies on the classpath. Paste or type in the example functions to see them in action:

(quartz-test)
;; wait a bit to see it run (it should run every 10s)
;; stop the scheduler to remove all tasks
(stop-scheduler)

;; define an atom to hold the count
(def *count* (atom 0))
;; schedule an anonymous function
(quartz-test-fn (fn [context] 
                  (reset! *count* (inc @*count*))
                  (prn (format "anon scheduled function! context=%s called %d times!" context @*count*))))
;; wait a bit to see it run (every 10s)
;; stop and exit the JVM
(stop-scheduler)
(System/exit 0)

The next thing I’d like to do is implement a prototype scheduler service exposing Quartz over AMQP (RabbitMQ).

Kyle Burton, 19 May 2009 – Wayne PA

Tags: clojure,java,quartz