Skip to content
Snippets Groups Projects
Unverified Commit fa126498 authored by Cam Saul's avatar Cam Saul Committed by GitHub
Browse files

Merge pull request #9961 from metabase/reschedule-tasks

Reschedule Quartz tasks on launch
parents 61d07b5f 407f0650
No related merge requests found
......@@ -24,6 +24,7 @@ const renderTriggersTable = triggers => {
<th>{t`End Time`}</th>
<th>{t`Final Fire Time`}</th>
<th>{t`May Fire Again?`}</th>
<th>{t`Misfire Instruction`}</th>
</tr>
</thead>
<tbody>
......@@ -40,6 +41,7 @@ const renderTriggersTable = triggers => {
<td>{trigger["end-time"]}</td>
<td>{trigger["final-fire-time"]}</td>
<td>{trigger["may-fire-again?"] ? t`Yes` : t`No`}</td>
<td>{trigger["misfire-instruction"]}</td>
</tr>
))}
</tbody>
......
......@@ -6,7 +6,11 @@
The most appropriate way to initialize tasks in any `metabase.task.*` namespace is to implement the `task-init`
function which accepts zero arguments. This function is dynamically resolved and called exactly once when the
application goes through normal startup procedures. Inside this function you can do any work needed and add your
task to the scheduler as usual via `schedule-task!`."
task to the scheduler as usual via `schedule-task!`.
## Quartz JavaDoc
Find the JavaDoc for Quartz here: http://www.quartz-scheduler.org/api/2.3.0/index.html"
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[clojure.tools.logging :as log]
......@@ -18,7 +22,7 @@
[metabase.util.i18n :refer [trs]]
[schema.core :as s]
[toucan.db :as db])
(:import [org.quartz JobDetail JobKey Scheduler Trigger TriggerKey]))
(:import [org.quartz CronTrigger JobDetail JobKey Scheduler Trigger TriggerKey]))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | SCHEDULER INSTANCE |
......@@ -27,6 +31,7 @@
(defonce ^:private quartz-scheduler
(atom nil))
;; TODO - maybe we should make this a delay instead!
(defn- scheduler
"Fetch the instance of our Quartz scheduler. Call this function rather than dereffing the atom directly because there
are a few places (e.g., in tests) where we swap the instance out."
......@@ -155,6 +160,16 @@
;;; | SCHEDULING/DELETING TASKS |
;;; +----------------------------------------------------------------------------------------------------------------+
(s/defn ^:private reschedule-task!
[job :- JobDetail, new-trigger :- Trigger]
(try
(when-let [scheduler (scheduler)]
(when-let [[^Trigger old-trigger] (seq (qs/get-triggers-of-job scheduler (.getKey job)))]
(log/debug (trs "Rescheduling job {0}" (-> job .getKey .getName)))
(.rescheduleJob scheduler (.getKey old-trigger) new-trigger)))
(catch Throwable e
(log/error e (trs "Error rescheduling job")))))
(s/defn schedule-task!
"Add a given job and trigger to our scheduler."
[job :- JobDetail, trigger :- Trigger]
......@@ -162,7 +177,8 @@
(try
(qs/schedule scheduler job trigger)
(catch org.quartz.ObjectAlreadyExistsException _
(log/debug (trs "Job already exists:") (-> job .getKey .getName))))))
(log/debug (trs "Job already exists:") (-> job .getKey .getName))
(reschedule-task! job trigger)))))
(s/defn delete-task!
"delete a task from the scheduler"
......@@ -202,7 +218,12 @@
:durable? (.isDurable job-detail)
:requests-recovery? (.requestsRecovery job-detail)})
(defn- trigger->info [^Trigger trigger]
(defmulti ^:private trigger->info
{:arglists '([trigger])}
class)
(defmethod trigger->info Trigger
[^Trigger trigger]
{:description (.getDescription trigger)
:end-time (.getEndTime trigger)
:final-fire-time (.getFinalFireTime trigger)
......@@ -214,6 +235,19 @@
:start-time (.getStartTime trigger)
:may-fire-again? (.mayFireAgain trigger)})
(defmethod trigger->info CronTrigger
[^CronTrigger trigger]
(merge
((get-method trigger->info Trigger) trigger)
{:misfire-instruction
;; not 100% sure why `case` doesn't work here...
(condp = (.getMisfireInstruction trigger)
CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY "IGNORE_MISFIRE_POLICY"
CronTrigger/MISFIRE_INSTRUCTION_SMART_POLICY "SMART_POLICY"
CronTrigger/MISFIRE_INSTRUCTION_FIRE_ONCE_NOW "FIRE_ONCE_NOW"
CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING "DO_NOTHING"
(format "UNKNOWN: %d" (.getMisfireInstruction trigger)))}))
(defn scheduler-info
"Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging
purposes."
......
......@@ -16,14 +16,30 @@
[pulse-channel :as pulse-channel]
[setting :as setting]
[task-history :as task-history]]
[metabase.util.i18n :refer [trs]]))
[metabase.util.i18n :refer [trs]]
[schema.core :as s]))
;;; ------------------------------------------------- PULSE SENDING --------------------------------------------------
(defn- log-pulse-exception [pulse-id exception]
(log/error exception (trs "Error sending Pulse {0}" pulse-id)))
(defn- send-pulses!
(def ^:private Hour
(s/constrained
s/Int
#(and (<= 0 %) (>= 23 %))
"valid hour"))
(def ^:private Weekday
(s/pred pulse-channel/day-of-week? "valid day of week"))
(def ^:private MonthDay
(s/enum :first :last :mid :other))
(def ^:private MonthWeek
(s/enum :first :last :other))
(s/defn ^:private send-pulses!
"Send any `Pulses` which are scheduled to run in the current day/hour. We use the current time and determine the
hour of the day and day of the week according to the defined reporting timezone, or UTC. We then find all `Pulses`
that are scheduled to run and send them. The `on-error` function is called if an exception is thrown when sending
......@@ -31,12 +47,8 @@
`on-error` function makes it easier to test for when an error doesn't occur"
([hour weekday monthday monthweek]
(send-pulses! hour weekday monthday monthweek log-pulse-exception))
([hour weekday monthday monthweek on-error]
{:pre [(integer? hour)
(and (<= 0 hour) (>= 23 hour))
(pulse-channel/day-of-week? weekday)
(contains? #{:first :last :mid :other} monthday)
(contains? #{:first :last :other} monthweek)]}
([hour :- Hour, weekday :- Weekday, monthday :- MonthDay, monthweek :- MonthWeek, on-error]
(log/info (trs "Sending scheduled pulses..."))
(let [channels-by-pulse (group-by :pulse_id (pulse-channel/retrieve-scheduled-channels hour weekday monthday monthweek))]
(doseq [pulse-id (keys channels-by-pulse)]
......@@ -103,9 +115,11 @@
(cron/schedule
;; run at the top of every hour
(cron/cron-schedule "0 0 * * * ? *")
;; If a trigger misfires (i.e., Quartz cannot run our job for one reason or another, such as all
;; worker threads being busy), attempt to fire the triggers again ASAP. This article does a good
;; job explaining what this means:
;; https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html
(cron/with-misfire-handling-instruction-ignore-misfires))))]
;; If send-pulses! misfires, don't try to re-send all the misfired Pulses. Retry only the most
;; recent misfire, discarding all others. This should hopefully cover cases where a misfire
;; happens while the system is still running; if the system goes down for an extended period of
;; time we don't want to re-send tons of (possibly duplicate) Pulses.
;;
;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html
(cron/with-misfire-handling-instruction-fire-and-proceed))))]
(task/schedule-task! job trigger)))
......@@ -150,11 +150,11 @@
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule (cron-schedule database task-info))
;; If we miss a trigger, try again at the next opportunity, but only try it once. If we miss two triggers in a
;; row (i.e. more than an hour goes by) then the job should still execute, but drop the additional occurrences
;; of the same trigger (i.e. no need to run the job 3 times because it was missed three times, once is all we
;; need)
(cron/with-misfire-handling-instruction-fire-and-proceed)))))
;; if we miss a sync for one reason or another (such as system being down) do not try to run the sync again.
;; Just wait until the next sync cycle.
;;
;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info
(cron/with-misfire-handling-instruction-do-nothing)))))
(s/defn ^:private schedule-tasks-for-db!
"Schedule a new Quartz job for `database` and `task-info`."
......
......@@ -22,7 +22,7 @@
(.isAnnotationPresent UpdateFieldValues org.quartz.DisallowConcurrentExecution))
(defn- replace-trailing-id-with-<id> [s]
(str/replace s #"\d+$" "<id>"))
(some-> s (str/replace #"\d+$" "<id>")))
(defn- replace-ids-with-<id> [current-tasks]
(vec (for [task current-tasks]
......@@ -36,7 +36,9 @@
(update-in [:data "db-id"] replace-trailing-id-with-<id>))))))))))
(defn- current-tasks []
(replace-ids-with-<id> (tu/scheduler-current-tasks)))
(->> (tu/scheduler-current-tasks)
(filter #(#{"metabase.task.sync-and-analyze.job" "metabase.task.update-field-values.job"} (:key %)))
replace-ids-with-<id>))
(defmacro ^:private with-scheduler-setup [& body]
`(tu/with-temp-scheduler
......
(ns metabase.task-test
(:require [clojurewerkz.quartzite
[jobs :as jobs]
[scheduler :as qs]
[triggers :as triggers]]
[clojurewerkz.quartzite.schedule.cron :as cron]
[expectations :refer [expect]]
[metabase.task :as task]
[metabase.test.util :as tu])
(:import [org.quartz CronTrigger JobDetail]))
;; make sure we attempt to reschedule tasks so changes made in source are propogated to JDBC backend
(jobs/defjob TestJob [_])
(defn- job ^JobDetail []
(jobs/build
(jobs/of-type TestJob)
(jobs/with-identity (jobs/key "metabase.task-test.job"))))
(defn- trigger-1 ^CronTrigger []
(triggers/build
(triggers/with-identity (triggers/key "metabase.task-test.trigger"))
(triggers/start-now)
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule "0 0 * * * ? *") ; every hour
(cron/with-misfire-handling-instruction-do-nothing)))))
(defn- trigger-2 ^CronTrigger []
(triggers/build
(triggers/with-identity (triggers/key "metabase.task-test.trigger"))
(triggers/start-now)
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule "0 0 6 * * ? *") ; at 6 AM every day
(cron/with-misfire-handling-instruction-ignore-misfires)))))
(defn- do-with-temp-scheduler-and-cleanup [f]
(tu/with-temp-scheduler
(try
(f)
(finally
(task/delete-task! (.getKey (job)) (.getKey (trigger-1)))))))
(defmacro ^:private with-temp-scheduler-and-cleanup [& body]
`(do-with-temp-scheduler-and-cleanup (fn [] ~@body)))
(defn- triggers []
(set
(for [^CronTrigger trigger (qs/get-triggers-of-job (#'metabase.task/scheduler) (.getKey (job)))]
{:cron-expression (.getCronExpression trigger)
:misfire-instruction (.getMisfireInstruction trigger)})))
;; can we schedule a job?
(expect
#{{:cron-expression "0 0 * * * ? *"
:misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING}}
(with-temp-scheduler-and-cleanup
(task/schedule-task! (job) (trigger-1))
(triggers)))
;; does scheduling a job a second time work without throwing errors?
(expect
#{{:cron-expression "0 0 * * * ? *"
:misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING}}
(with-temp-scheduler-and-cleanup
(task/schedule-task! (job) (trigger-1))
(task/schedule-task! (job) (trigger-1))
(triggers)))
;; does scheduling a job with a *new* trigger replace the original? (can we reschedule a job?)
(expect
#{{:cron-expression "0 0 6 * * ? *"
:misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY}}
(with-temp-scheduler-and-cleanup
(task/schedule-task! (job) (trigger-1))
(task/schedule-task! (job) (trigger-2))
(triggers)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment