Skip to content
Snippets Groups Projects
Commit 37feaa8c authored by Allen Gilliland's avatar Allen Gilliland
Browse files

Merge pull request #1033 from metabase/schedule_with_quartzite

Schedule background tasks with Quartzite
parents 77324876 f58953a2
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,7 @@
[cheshire "5.5.0"] ; fast JSON encoding (used by Ring JSON middleware)
[clj-http-lite "0.2.1"] ; HTTP client; lightweight version of clj-http that uses HttpURLConnection instead of Apache
[clj-time "0.10.0"] ; library for dealing with date/time
[clojurewerkz/quartzite "2.0.0"] ; scheduling library
[colorize "0.1.1" :exclusions [org.clojure/clojure]] ; string output with ANSI color codes (for logging)
[com.cemerick/friend "0.2.1"] ; auth library
[com.draines/postal "1.11.3"] ; SMTP library
......@@ -58,7 +59,8 @@
:javac-options ["-target" "1.6" "-source" "1.6"]
;; :jar-exclusions [#"\.java"] Circle CI doesn't like regexes because it's using the EDN reader and is retarded
:ring {:handler metabase.core/app
:init metabase.core/init}
:init metabase.core/init
:destroy metabase.core/destroy}
:eastwood {:exclude-namespaces [:test-paths]
:add-linters [:unused-private-vars]
:exclude-linters [:constant-test ; korma macros generate some forms with if statements that are always logically true or false
......
......@@ -17,6 +17,7 @@
(metabase [config :as config]
[db :as db]
[driver :as driver]
[events :as events]
[routes :as routes]
[setup :as setup]
[task :as task])
......@@ -25,8 +26,7 @@
[format :refer :all])
(metabase.models [setting :refer [defsetting]]
[database :refer [Database]]
[user :refer [User]])
[metabase.events :as events]))
[user :refer [User]])))
;; ## CONFIG
......@@ -85,11 +85,19 @@
setup-url
"\n\n"))))
(defn destroy
"General application shutdown function which should be called once at application shuddown."
[]
(log/info "Metabase Shutting Down ...")
(task/stop-scheduler!)
(log/info "Metabase Shutdown COMPLETE"))
(defn init
"General application initialization function which should be run once at application startup."
[]
(log/info "Metabase Initializing ... ")
;; First of all, lets register a shutdown hook that will tidy things up for us on app exit
(.addShutdownHook (Runtime/getRuntime) (Thread. destroy))
(log/debug "Using Config:\n" (with-out-str (clojure.pprint/pprint config/config-all)))
;; Bootstrap the event system
......@@ -106,16 +114,14 @@
(events/publish-event :install {}))
;; Now start the task runner
(task/start-task-runner!)
(task/start-scheduler!)
(log/info "Metabase Initialization COMPLETE")
true)
;; TODO - uh, when do we *stop* the task runner ?
;; ## Jetty (Web) Server
(def ^:private jetty-instance
(atom nil))
......
;; -*- comment-column: 60; -*-
(ns metabase.task
"Function hooks; background task runner and related hooks.
Namespaces under `metabase.task.*` will be automatically loaded when the background task runner is started."
"Background task scheduling via Quartzite. Individual tasks are defined in `metabase.task.*`"
(:require clojure.java.classpath
[clojure.tools.logging :as log]
[clojure.tools.namespace.find :as ns-find]
[metabase.util :as u])
(:import java.util.Calendar))
[clojurewerkz.quartzite.scheduler :as qs]))
;; # HOOKS
;; [Just like in Emacs Lisp](http://www.gnu.org/software/emacs/manual/html_node/elisp/Hooks.html) (well, almost) <3
;;
;; Define a hook with `defhook`, add functions to it with `add-hook!`, and run those functions later with `run-hook`:
;;
;; (defhook hourly-tasks-hook \"Tasks to run hourly\") ; define a new hook.
;; (add-hook! #'hourly-tasks-hook my-fn-to-run-hourly) ; add a function to the hook
;; (run-hook #'hourly-tasks-hook :parallel) ; run the functions associated with a hook
;;
;; See the docstrs of these functions below for further discussion.
;;
;; ### Robert Hooke
;; Yes, I known [`robert-hooke`](https://github.com/technomancy/robert-hooke) exists.
;; This library is actually bascially an implementation of Emacs Lisp [`advice`](http://www.gnu.org/software/emacs/manual/html_node/elisp/Advising-Functions.html),
;; not `add-hook`. Which is what we want here. Also, the implementation below is only like ~20 LOC so no need to pull in a 3rd-party library IMO.
;;
;; ### Differences from Emacs Lisp
;;
;; (defun add-hook (hook function &optional append local)
;; ...)
;; (add-hook 'my-wacky-hook #'some-fun t t)
;;
;; 1. In Elisp, calling `add-hook` with an undefined hook will just create the hook for you. We're not allowing that here so we can do
;; some safety checking.
;; 2. You can't define a `buffer-local` hook because there's no such thing in Clojure
;; 3. Excution order of *hook functions* here are indeterminate since they're stored in a set
;; 4. We can run our *hook functions* in parallel <3
;;
(defmacro defhook
"Define a new hook.
(defhook hourly-tasks-hook \"Tasks to run hourly\")
A hook is simply an atom storing a set of functions that you can run at any time with `run-hook`."
[hook-name & [docstr?]]
{:arglists '([hook-name docstr?])}
`(defonce ~(vary-meta hook-name assoc
:doc docstr?
:type ::hook)
(atom #{})))
;; TODO Should we require that F be a var so as to avoid duplicate lambdas being added ?
(defn add-hook!
"Add function F to HOOK (hereafter, known as one of HOOK's *hook functions*).
Calling `(run-hook #'hook)` at a later time will call this function.
(add-hook! #'hourly-tasks-hook my-fn-to-run-hourly)
Note that you are expected to pass the var literal of HOOK; this is so we can check its metadata.
Hooks are tagged with `:type -> :metabase.task/hook` which is checked as a precondition so you
don't accidentally use this function the wrong way."
[hook f]
{:pre [(var? hook)
(= (:type (meta hook)) ::hook)
(fn? f)]}
(swap! (var-get hook) conj f))
;; TODO - remove-hook! function
(defn run-hook
"Run the *hook functions* associated with HOOK sequentially (by default) or in parallel if `:parallel` is
passed as the first arg. All subsequent args will be passed directly to the *hook functions* themselves.
(run-hook #'hourly-tasks-hook :parallel)
Like `add-hook!`, you are expected to pass the var literal of HOOK so we can do some safety checks for you."
{:arglists '([hook parallel? & args])}
[hook & args]
{:pre [(var? hook)
(= (:type (meta hook)) ::hook)]}
(let [[parallel args] (u/optional (partial = :parallel) args)
map-fn (if parallel pmap map)]
(dorun
(map-fn #(try (apply % args)
(catch Throwable e
(log/error (format "Caught exception when running %s function %s with args %s : %s"
(:name (meta hook)) % args e))))
@(var-get hook)))))
;; # TASK RUNNER
;; The task runner is a set of hooks like `hourly-tasks-hook` that get called at certain intervals on a background thread.
;; Just add functions to these hooks with `add-hook!` and they'll be ran at the appropriate time.
;; ## STANDARD TASK RUNNER HOOKS
(defhook hourly-tasks-hook
"Tasks to run hourly.
Functions will be passed a single function: the current hour (according to the system calendar) in 24-hour time.
(defn some-task [hour]
do-something ...)
(add-hook! #'hourly-tasks-hook some-task) ; some-task will be called on a background thread every hour")
(defhook nightly-tasks-hook
"Tasks to run nightly at midnight (according to the system calendar).
Functions will be passed no arguments.")
;; ## RUN-HOURLY-TASKS / RUN-NIGHTLY-TASKS
(defn- hour
"Current hour (0 - 23) according to the system calendar."
[]
(.get (Calendar/getInstance) Calendar/HOUR))
(defn- minute
"Current minute (0 - 59) according to the system calendar."
[]
(.get (Calendar/getInstance) Calendar/MINUTE))
(defn- minutes-until-next-hour
"Number of minutes (1 - 60) until the top of the *next* hour."
[]
(- 60 (minute)))
(defn- hourly-task-delay
"Number of milliseconds to wait before running hourly tasks the next time around.
This is the number of milliseconds until the top of the next hour; e.g., if the test runner is started
with `(start-task-runner!)` at 8:23 PM, this function will return the number of milliseconds until 9:00 PM,
which will be first time we'd want to run the `hourly-task-hook` functions.
(This is provided here so the unit tests can replace this fn so we can test the scheduling mechanism.)"
[]
(* 1000 60 (minutes-until-next-hour)))
(defn- run-hourly-tasks
"Sleep until the top of the next hour, then run the `hourly-tasks-hook` in parallel."
[]
;; Sleep first, that way we're not trying to run a ton of tasks as soon as Metabase spins up
(Thread/sleep (hourly-task-delay))
(log/info "Running hourly tasks...")
(run-hook #'hourly-tasks-hook :parallel (hour))
(recur))
(defn- run-nightly-tasks
"Run the `nightly-tasks-hook` (in parallel).
This is acutally just a *hook function* added to the `hourly-tasks-hook`;
it just checks whether the system hour is `0`, and, if so, runs the `nightly-tasks-hook`."
[hour]
(when (= hour 0)
(log/info "Running nightly tasks...")
(run-hook #'nightly-tasks-hook :parallel)))
(add-hook! #'hourly-tasks-hook run-nightly-tasks)
;; ## COLLECT TASKS IN METABASE.TASK.* NAMESPACES
(defonce ^:private quartz-scheduler
(atom nil))
(defn- find-and-load-tasks
"Search JARs + files in the classpath for Clojure namespaces that start with `metabase.task.`, then `require` them so tasks will be loaded as needed."
"Search Classpath for namespaces that start with `metabase.tasks.`, then `require` them so initialization can happen."
[]
(->> (ns-find/find-namespaces (clojure.java.classpath/classpath))
(filter (fn [ns-symb]
(re-find #"^metabase\.task\." (name ns-symb))))
set
(map (fn [task-ns]
(log/info "Loading tasks from namespace" task-ns "...")
(require task-ns)))
(map (fn [events-ns]
(log/info "\tloading tasks namespace: " events-ns)
(require events-ns)))
dorun))
;; ## START/STOP TASK RUNNER
(defonce ^:private task-runner
(atom nil))
(defn start-task-runner!
"Start a background thread that will run tasks on the `hourly-tasks-hook` and `nightly-tasks-hook` every hour / every night, respectively.
This also loads all namespaces under `metabase.task.*`, so tasks can defined in them without needing to load them elsewhere."
(defn start-scheduler!
"Start our Quartzite scheduler which allows jobs to be submitted and triggers to begin executing."
[]
(when-not @task-runner
(find-and-load-tasks)
(log/info "Starting task runner...")
(reset! task-runner (future (run-hourly-tasks)))))
(when-not @quartz-scheduler
(log/debug "Starting Quartz Scheduler")
;; keep a reference to our scheduler
(reset! quartz-scheduler (-> (qs/initialize) qs/start))
;; look for job/trigger definitions
(find-and-load-tasks)))
(defn stop-task-runner!
"Stop the task runner."
(defn stop-scheduler!
"Stop our Quartzite scheduler and shutdown any running executions."
[]
(when @task-runner
(log/info "Stopping task runner...")
(future-cancel @task-runner)
(reset! task-runner nil)))
(log/debug "Stopping Quartz Scheduler")
;; tell quartz to stop everything
(qs/shutdown @quartz-scheduler)
;; reset our scheduler reference
(reset! quartz-scheduler nil))
(defn schedule-task!
"Add a given job and trigger to our scheduler."
[job trigger]
(when @quartz-scheduler
(qs/schedule @quartz-scheduler job trigger)))
(ns metabase.task.sync-databases
(:require [clojure.tools.logging :as log]
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.triggers :as triggers]
[clojurewerkz.quartzite.schedule.cron :as cron]
[metabase.config :as config]
[metabase.db :as db]
[metabase.driver :as driver]
[metabase.models.database :refer [Database]]
[metabase.task :as task]))
(def sync-databases-job-key "metabase.task.sync-databases.job")
(def sync-databases-trigger-key "metabase.task.sync-databases.trigger")
(defonce ^:private sync-databases-job (atom nil))
(defonce ^:private sync-databases-trigger (atom nil))
;; simple job which looks up all databases and runs a sync on them
;; TODO - skip the sample dataset?
(jobs/defjob SyncDatabases
[ctx]
(dorun
(for [database (db/sel :many Database)]
(try
;; NOTE: this happens synchronously for now to avoid excessive load if there are lots of databases
(driver/sync-database! database)
(catch Exception e
(log/error "Error syncing database: " (:id database) e))))))
;; this is what actually adds our task to the scheduler
(when (config/is-prod?)
(log/info "Submitting sync-database task to scheduler")
;; build our job
(reset! sync-databases-job (jobs/build
(jobs/of-type SyncDatabases)
(jobs/with-identity (jobs/key sync-databases-job-key))))
;; build our trigger
(reset! sync-databases-trigger (triggers/build
(triggers/with-identity (triggers/key sync-databases-trigger-key))
(triggers/start-now)
(triggers/with-schedule
;; run at midnight daily
(cron/schedule (cron/cron-schedule "0 0 0 * * ? *")))))
;; submit ourselves to the scheduler
(task/schedule-task! @sync-databases-job @sync-databases-trigger))
(ns metabase.task-test
(:require [expectations :refer :all]
(metabase [task :refer :all]
[test-setup :refer :all]))
(:import java.util.Calendar))
(defhook task-test-hook "Hook for test purposes.")
(def task-test-atom-counter
(atom 0))
(defn- inc-task-test-atom-counter []
(swap! task-test-atom-counter inc))
(defn- inc-task-test-atom-counter-twice []
(swap! task-test-atom-counter (partial + 2)))
;; ## HOOK TESTS
(expect
[0 ; (1)
1 ; (2)
3 ; (3)
6 ; (4)
9] ; (5)
[;; (1) get initial value
(do (reset! task-test-atom-counter 0) ; reset back to 0
(run-hook #'task-test-hook)
@task-test-atom-counter)
;; (2) now add a hook function. Should increment the counter once
(do (add-hook! #'task-test-hook inc-task-test-atom-counter)
(run-hook #'task-test-hook)
@task-test-atom-counter)
;; (3) ok, run the hook twice. Should increment counter twice
(do (run-hook #'task-test-hook)
(run-hook #'task-test-hook)
@task-test-atom-counter)
;; (4) add another hook function that increments counter twice on each call (for a total of + 3)
(do (add-hook! #'task-test-hook inc-task-test-atom-counter-twice)
(run-hook #'task-test-hook)
@task-test-atom-counter)
;; (5) check that we can't add duplicate hooks - should still be just +3
(do (add-hook! #'task-test-hook inc-task-test-atom-counter-twice)
(run-hook #'task-test-hook)
@task-test-atom-counter)])
;; ## TASK RUNNER TESTS
(defn- system-hour []
(.get (Calendar/getInstance) Calendar/HOUR))
(defn- inc-task-test-atom-counter-by-system-hour [hour]
(swap! task-test-atom-counter (partial + (system-hour))))
(defhook mock-hourly-tasks-hook
"Hook that will replace the actual hourly-tasks-hook in our unit test.")
(expect [[0
(system-hour) ; we can also check that the `hourly-tasks-hook` is passing the correct param to its functions
(* 3 (system-hour))
:stopped]
:restarted]
[(do
(stop-task-runner!)
(with-redefs [metabase.task/hourly-task-delay (constantly 200)
metabase.task/hourly-tasks-hook mock-hourly-tasks-hook]
(add-hook! #'hourly-tasks-hook inc-task-test-atom-counter-by-system-hour)
(reset! task-test-atom-counter 0)
(start-task-runner!)
[@task-test-atom-counter ; should be 0, since not enough time has elaspsed for the hook to be executed
(do (Thread/sleep 300)
@task-test-atom-counter) ; should have been called once (~100ms ago)
(do (Thread/sleep 400)
@task-test-atom-counter) ; should have been called two more times
(do (stop-task-runner!)
:stopped)]))
(do (start-task-runner!)
:restarted)])
......@@ -6,7 +6,6 @@
[expectations :refer :all]
(metabase [core :as core]
[db :as db]
[task :as task]
[util :as u])
(metabase.models [table :refer [Table]])
[metabase.test.data.datasets :as datasets]))
......@@ -83,16 +82,13 @@
(let [setup-db (future (time (do (log/info "Setting up test DB and running migrations...")
(db/setup-db :auto-migrate true)
(load-test-datasets)
(metabase.models.setting/set :site-name "Metabase Test"))))
start-task-runner! (future (task/start-task-runner!))]
(metabase.models.setting/set :site-name "Metabase Test"))))]
(core/start-jetty)
@setup-db
@start-task-runner!))
@setup-db))
(defn test-teardown
{:expectations-options :after-run}
[]
(log/info "Shutting down Metabase unit test runner")
(task/stop-task-runner!)
(core/stop-jetty))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment