Skip to content
Snippets Groups Projects
Commit 368a44b3 authored by Allen Gilliland's avatar Allen Gilliland
Browse files

use Quartzite for scheduling and running background tasks to give us some more options.

parent 77324876
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,7 @@
[cheshire "5.5.0"] ; fast JSON encoding (used by Ring JSON middleware)
[clj-http-lite "0.2.1"] ; HTTP client; lightweight version of clj-http that uses HttpURLConnection instead of Apache
[clj-time "0.10.0"] ; library for dealing with date/time
[clojurewerkz/quartzite "2.0.0"] ; scheduling library
[colorize "0.1.1" :exclusions [org.clojure/clojure]] ; string output with ANSI color codes (for logging)
[com.cemerick/friend "0.2.1"] ; auth library
[com.draines/postal "1.11.3"] ; SMTP library
......@@ -58,7 +59,8 @@
:javac-options ["-target" "1.6" "-source" "1.6"]
;; :jar-exclusions [#"\.java"] Circle CI doesn't like regexes because it's using the EDN reader and is retarded
:ring {:handler metabase.core/app
:init metabase.core/init}
:init metabase.core/init
:destroy metabase.core/destroy}
:eastwood {:exclude-namespaces [:test-paths]
:add-linters [:unused-private-vars]
:exclude-linters [:constant-test ; korma macros generate some forms with if statements that are always logically true or false
......
......@@ -17,6 +17,7 @@
(metabase [config :as config]
[db :as db]
[driver :as driver]
[events :as events]
[routes :as routes]
[setup :as setup]
[task :as task])
......@@ -25,8 +26,7 @@
[format :refer :all])
(metabase.models [setting :refer [defsetting]]
[database :refer [Database]]
[user :refer [User]])
[metabase.events :as events]))
[user :refer [User]])))
;; ## CONFIG
......@@ -85,11 +85,19 @@
setup-url
"\n\n"))))
(defn destroy
"General application shutdown function which should be called once at application shuddown."
[]
(log/info "Metabase Shutting Down ...")
(task/stop-scheduler!)
(log/info "Metabase Shutdown COMPLETE"))
(defn init
"General application initialization function which should be run once at application startup."
[]
(log/info "Metabase Initializing ... ")
;; First of all, lets register a shutdown hook that will tidy things up for us on app exit
(.addShutdownHook (Runtime/getRuntime) (Thread. destroy))
(log/debug "Using Config:\n" (with-out-str (clojure.pprint/pprint config/config-all)))
;; Bootstrap the event system
......@@ -106,6 +114,7 @@
(events/publish-event :install {}))
;; Now start the task runner
(task/start-scheduler!)
(task/start-task-runner!)
(log/info "Metabase Initialization COMPLETE")
......
;; -*- comment-column: 60; -*-
(ns metabase.task
"Function hooks; background task runner and related hooks.
Namespaces under `metabase.task.*` will be automatically loaded when the background task runner is started."
"Background task scheduling via Quartzite. Individual tasks are defined in `metabase.task.*`"
(:require clojure.java.classpath
[clojure.tools.logging :as log]
[clojure.tools.namespace.find :as ns-find]
[metabase.util :as u])
(:import java.util.Calendar))
[clojurewerkz.quartzite.scheduler :as qs]))
;; # HOOKS
;; [Just like in Emacs Lisp](http://www.gnu.org/software/emacs/manual/html_node/elisp/Hooks.html) (well, almost) <3
;;
;; Define a hook with `defhook`, add functions to it with `add-hook!`, and run those functions later with `run-hook`:
;;
;; (defhook hourly-tasks-hook \"Tasks to run hourly\") ; define a new hook.
;; (add-hook! #'hourly-tasks-hook my-fn-to-run-hourly) ; add a function to the hook
;; (run-hook #'hourly-tasks-hook :parallel) ; run the functions associated with a hook
;;
;; See the docstrs of these functions below for further discussion.
;;
;; ### Robert Hooke
;; Yes, I known [`robert-hooke`](https://github.com/technomancy/robert-hooke) exists.
;; This library is actually bascially an implementation of Emacs Lisp [`advice`](http://www.gnu.org/software/emacs/manual/html_node/elisp/Advising-Functions.html),
;; not `add-hook`. Which is what we want here. Also, the implementation below is only like ~20 LOC so no need to pull in a 3rd-party library IMO.
;;
;; ### Differences from Emacs Lisp
;;
;; (defun add-hook (hook function &optional append local)
;; ...)
;; (add-hook 'my-wacky-hook #'some-fun t t)
;;
;; 1. In Elisp, calling `add-hook` with an undefined hook will just create the hook for you. We're not allowing that here so we can do
;; some safety checking.
;; 2. You can't define a `buffer-local` hook because there's no such thing in Clojure
;; 3. Excution order of *hook functions* here are indeterminate since they're stored in a set
;; 4. We can run our *hook functions* in parallel <3
;;
(defmacro defhook
"Define a new hook.
(defhook hourly-tasks-hook \"Tasks to run hourly\")
A hook is simply an atom storing a set of functions that you can run at any time with `run-hook`."
[hook-name & [docstr?]]
{:arglists '([hook-name docstr?])}
`(defonce ~(vary-meta hook-name assoc
:doc docstr?
:type ::hook)
(atom #{})))
;; TODO Should we require that F be a var so as to avoid duplicate lambdas being added ?
(defn add-hook!
"Add function F to HOOK (hereafter, known as one of HOOK's *hook functions*).
Calling `(run-hook #'hook)` at a later time will call this function.
(add-hook! #'hourly-tasks-hook my-fn-to-run-hourly)
Note that you are expected to pass the var literal of HOOK; this is so we can check its metadata.
Hooks are tagged with `:type -> :metabase.task/hook` which is checked as a precondition so you
don't accidentally use this function the wrong way."
[hook f]
{:pre [(var? hook)
(= (:type (meta hook)) ::hook)
(fn? f)]}
(swap! (var-get hook) conj f))
;; TODO - remove-hook! function
(defn run-hook
"Run the *hook functions* associated with HOOK sequentially (by default) or in parallel if `:parallel` is
passed as the first arg. All subsequent args will be passed directly to the *hook functions* themselves.
(run-hook #'hourly-tasks-hook :parallel)
Like `add-hook!`, you are expected to pass the var literal of HOOK so we can do some safety checks for you."
{:arglists '([hook parallel? & args])}
[hook & args]
{:pre [(var? hook)
(= (:type (meta hook)) ::hook)]}
(let [[parallel args] (u/optional (partial = :parallel) args)
map-fn (if parallel pmap map)]
(dorun
(map-fn #(try (apply % args)
(catch Throwable e
(log/error (format "Caught exception when running %s function %s with args %s : %s"
(:name (meta hook)) % args e))))
@(var-get hook)))))
;; # TASK RUNNER
;; The task runner is a set of hooks like `hourly-tasks-hook` that get called at certain intervals on a background thread.
;; Just add functions to these hooks with `add-hook!` and they'll be ran at the appropriate time.
;; ## STANDARD TASK RUNNER HOOKS
(defhook hourly-tasks-hook
"Tasks to run hourly.
Functions will be passed a single function: the current hour (according to the system calendar) in 24-hour time.
(defn some-task [hour]
do-something ...)
(add-hook! #'hourly-tasks-hook some-task) ; some-task will be called on a background thread every hour")
(defhook nightly-tasks-hook
"Tasks to run nightly at midnight (according to the system calendar).
Functions will be passed no arguments.")
;; ## RUN-HOURLY-TASKS / RUN-NIGHTLY-TASKS
(defn- hour
"Current hour (0 - 23) according to the system calendar."
[]
(.get (Calendar/getInstance) Calendar/HOUR))
(defn- minute
"Current minute (0 - 59) according to the system calendar."
[]
(.get (Calendar/getInstance) Calendar/MINUTE))
(defn- minutes-until-next-hour
"Number of minutes (1 - 60) until the top of the *next* hour."
[]
(- 60 (minute)))
(defn- hourly-task-delay
"Number of milliseconds to wait before running hourly tasks the next time around.
This is the number of milliseconds until the top of the next hour; e.g., if the test runner is started
with `(start-task-runner!)` at 8:23 PM, this function will return the number of milliseconds until 9:00 PM,
which will be first time we'd want to run the `hourly-task-hook` functions.
(This is provided here so the unit tests can replace this fn so we can test the scheduling mechanism.)"
[]
(* 1000 60 (minutes-until-next-hour)))
(defn- run-hourly-tasks
"Sleep until the top of the next hour, then run the `hourly-tasks-hook` in parallel."
[]
;; Sleep first, that way we're not trying to run a ton of tasks as soon as Metabase spins up
(Thread/sleep (hourly-task-delay))
(log/info "Running hourly tasks...")
(run-hook #'hourly-tasks-hook :parallel (hour))
(recur))
(defn- run-nightly-tasks
"Run the `nightly-tasks-hook` (in parallel).
This is acutally just a *hook function* added to the `hourly-tasks-hook`;
it just checks whether the system hour is `0`, and, if so, runs the `nightly-tasks-hook`."
[hour]
(when (= hour 0)
(log/info "Running nightly tasks...")
(run-hook #'nightly-tasks-hook :parallel)))
(add-hook! #'hourly-tasks-hook run-nightly-tasks)
;; ## COLLECT TASKS IN METABASE.TASK.* NAMESPACES
(defonce ^:private quartz-scheduler
(atom nil))
(defn- find-and-load-tasks
"Search JARs + files in the classpath for Clojure namespaces that start with `metabase.task.`, then `require` them so tasks will be loaded as needed."
"Search Classpath for namespaces that start with `metabase.tasks.`, then `require` them so initialization can happen."
[]
(->> (ns-find/find-namespaces (clojure.java.classpath/classpath))
(filter (fn [ns-symb]
(re-find #"^metabase\.task\." (name ns-symb))))
set
(map (fn [task-ns]
(log/info "Loading tasks from namespace" task-ns "...")
(require task-ns)))
(map (fn [events-ns]
(log/info "\tloading tasks namespace: " events-ns)
(require events-ns)))
dorun))
;; ## START/STOP TASK RUNNER
(defonce ^:private task-runner
(atom nil))
(defn start-task-runner!
"Start a background thread that will run tasks on the `hourly-tasks-hook` and `nightly-tasks-hook` every hour / every night, respectively.
This also loads all namespaces under `metabase.task.*`, so tasks can defined in them without needing to load them elsewhere."
(defn start-scheduler!
"Start our Quartzite scheduler which allows jobs to be submitted and triggers to begin executing."
[]
(when-not @task-runner
(find-and-load-tasks)
(log/info "Starting task runner...")
(reset! task-runner (future (run-hourly-tasks)))))
(when-not @quartz-scheduler
(log/debug "Starting Quartz Scheduler")
;; keep a reference to our scheduler
(reset! quartz-scheduler (-> (qs/initialize) qs/start))
;; look for job/trigger definitions
(find-and-load-tasks)))
(defn stop-task-runner!
"Stop the task runner."
(defn stop-scheduler!
"Stop our Quartzite scheduler and shutdown any running executions."
[]
(when @task-runner
(log/info "Stopping task runner...")
(future-cancel @task-runner)
(reset! task-runner nil)))
(log/debug "Stopping Quartz Scheduler")
;; tell quartz to stop everything
(qs/shutdown @quartz-scheduler)
;; reset our scheduler reference
(reset! quartz-scheduler nil))
(defn schedule-task!
"Add a given job and trigger to our scheduler."
[job trigger]
(when @quartz-scheduler
(qs/schedule @quartz-scheduler job trigger)))
(ns metabase.task.sync-databases
(:require [clojure.tools.logging :as log]
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.triggers :as triggers]
[clojurewerkz.quartzite.schedule.cron :as cron]
[metabase.config :as config]
[metabase.db :as db]
[metabase.driver :as driver]
[metabase.models.database :refer [Database]]
[metabase.task :as task]))
(def sync-databases-job-key "metabase.task.sync-databases.job")
(def sync-databases-trigger-key "metabase.task.sync-databases.trigger")
(defonce ^:private sync-databases-job (atom nil))
(defonce ^:private sync-databases-trigger (atom nil))
;; simple job which looks up all databases and runs a sync on them
;; TODO - skip the sample dataset?
(jobs/defjob SyncDatabases
[ctx]
(dorun
(for [database (db/sel :many Database)]
(future (driver/sync-database! database)))))
;; this is what actually adds our task to the scheduler
(when (config/is-prod?)
(log/info "Submitting sync-database task to scheduler")
;; build our job
(reset! sync-databases-job (jobs/build
(jobs/of-type SyncDatabases)
(jobs/with-identity (jobs/key sync-databases-job-key))))
;; build our trigger
(reset! sync-databases-trigger (triggers/build
(triggers/with-identity (triggers/key sync-databases-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run at midnight daily
(cron/schedule (cron/cron-schedule "0 0 0 * * ? *")))))
;; submit ourselves to the scheduler
(task/schedule-task! @sync-databases-job @sync-databases-trigger))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment