diff --git a/frontend/src/metabase/admin/tasks/containers/JobTriggersModal.jsx b/frontend/src/metabase/admin/tasks/containers/JobTriggersModal.jsx index 0ed0609f6ab38b969357db00f256e2e898531196..ed0ea805d1969eefd94018a749d32b48d0b81e63 100644 --- a/frontend/src/metabase/admin/tasks/containers/JobTriggersModal.jsx +++ b/frontend/src/metabase/admin/tasks/containers/JobTriggersModal.jsx @@ -24,6 +24,7 @@ const renderTriggersTable = triggers => { <th>{t`End Time`}</th> <th>{t`Final Fire Time`}</th> <th>{t`May Fire Again?`}</th> + <th>{t`Misfire Instruction`}</th> </tr> </thead> <tbody> @@ -40,6 +41,7 @@ const renderTriggersTable = triggers => { <td>{trigger["end-time"]}</td> <td>{trigger["final-fire-time"]}</td> <td>{trigger["may-fire-again?"] ? t`Yes` : t`No`}</td> + <td>{trigger["misfire-instruction"]}</td> </tr> ))} </tbody> diff --git a/src/metabase/task.clj b/src/metabase/task.clj index 62b45fb451a03ace79b4f8867b0bf725306a8260..c6a44c0a40c2dd65d1e1183cfb11fb9d2e922e3a 100644 --- a/src/metabase/task.clj +++ b/src/metabase/task.clj @@ -6,7 +6,11 @@ The most appropriate way to initialize tasks in any `metabase.task.*` namespace is to implement the `task-init` function which accepts zero arguments. This function is dynamically resolved and called exactly once when the application goes through normal startup procedures. Inside this function you can do any work needed and add your - task to the scheduler as usual via `schedule-task!`." + task to the scheduler as usual via `schedule-task!`. + + ## Quartz JavaDoc + + Find the JavaDoc for Quartz here: http://www.quartz-scheduler.org/api/2.3.0/index.html" (:require [clojure.java.jdbc :as jdbc] [clojure.string :as str] [clojure.tools.logging :as log] @@ -18,7 +22,7 @@ [metabase.util.i18n :refer [trs]] [schema.core :as s] [toucan.db :as db]) - (:import [org.quartz JobDetail JobKey Scheduler Trigger TriggerKey])) + (:import [org.quartz CronTrigger JobDetail JobKey Scheduler Trigger TriggerKey])) ;;; +----------------------------------------------------------------------------------------------------------------+ ;;; | SCHEDULER INSTANCE | @@ -27,6 +31,7 @@ (defonce ^:private quartz-scheduler (atom nil)) +;; TODO - maybe we should make this a delay instead! (defn- scheduler "Fetch the instance of our Quartz scheduler. Call this function rather than dereffing the atom directly because there are a few places (e.g., in tests) where we swap the instance out." @@ -155,6 +160,16 @@ ;;; | SCHEDULING/DELETING TASKS | ;;; +----------------------------------------------------------------------------------------------------------------+ +(s/defn ^:private reschedule-task! + [job :- JobDetail, new-trigger :- Trigger] + (try + (when-let [scheduler (scheduler)] + (when-let [[^Trigger old-trigger] (seq (qs/get-triggers-of-job scheduler (.getKey job)))] + (log/debug (trs "Rescheduling job {0}" (-> job .getKey .getName))) + (.rescheduleJob scheduler (.getKey old-trigger) new-trigger))) + (catch Throwable e + (log/error e (trs "Error rescheduling job"))))) + (s/defn schedule-task! "Add a given job and trigger to our scheduler." [job :- JobDetail, trigger :- Trigger] @@ -162,7 +177,8 @@ (try (qs/schedule scheduler job trigger) (catch org.quartz.ObjectAlreadyExistsException _ - (log/debug (trs "Job already exists:") (-> job .getKey .getName)))))) + (log/debug (trs "Job already exists:") (-> job .getKey .getName)) + (reschedule-task! job trigger))))) (s/defn delete-task! "delete a task from the scheduler" @@ -202,7 +218,12 @@ :durable? (.isDurable job-detail) :requests-recovery? (.requestsRecovery job-detail)}) -(defn- trigger->info [^Trigger trigger] +(defmulti ^:private trigger->info + {:arglists '([trigger])} + class) + +(defmethod trigger->info Trigger + [^Trigger trigger] {:description (.getDescription trigger) :end-time (.getEndTime trigger) :final-fire-time (.getFinalFireTime trigger) @@ -214,6 +235,19 @@ :start-time (.getStartTime trigger) :may-fire-again? (.mayFireAgain trigger)}) +(defmethod trigger->info CronTrigger + [^CronTrigger trigger] + (merge + ((get-method trigger->info Trigger) trigger) + {:misfire-instruction + ;; not 100% sure why `case` doesn't work here... + (condp = (.getMisfireInstruction trigger) + CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY "IGNORE_MISFIRE_POLICY" + CronTrigger/MISFIRE_INSTRUCTION_SMART_POLICY "SMART_POLICY" + CronTrigger/MISFIRE_INSTRUCTION_FIRE_ONCE_NOW "FIRE_ONCE_NOW" + CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING "DO_NOTHING" + (format "UNKNOWN: %d" (.getMisfireInstruction trigger)))})) + (defn scheduler-info "Return raw data about all the scheduler and scheduled tasks (i.e. Jobs and Triggers). Primarily for debugging purposes." diff --git a/src/metabase/task/send_pulses.clj b/src/metabase/task/send_pulses.clj index 4b4fe398aefa517f47a9aedab801fcc060ce6fb2..4f0112f6ed423294933c287e6c05e12e495713e2 100644 --- a/src/metabase/task/send_pulses.clj +++ b/src/metabase/task/send_pulses.clj @@ -16,14 +16,30 @@ [pulse-channel :as pulse-channel] [setting :as setting] [task-history :as task-history]] - [metabase.util.i18n :refer [trs]])) + [metabase.util.i18n :refer [trs]] + [schema.core :as s])) ;;; ------------------------------------------------- PULSE SENDING -------------------------------------------------- (defn- log-pulse-exception [pulse-id exception] (log/error exception (trs "Error sending Pulse {0}" pulse-id))) -(defn- send-pulses! +(def ^:private Hour + (s/constrained + s/Int + #(and (<= 0 %) (>= 23 %)) + "valid hour")) + +(def ^:private Weekday + (s/pred pulse-channel/day-of-week? "valid day of week")) + +(def ^:private MonthDay + (s/enum :first :last :mid :other)) + +(def ^:private MonthWeek + (s/enum :first :last :other)) + +(s/defn ^:private send-pulses! "Send any `Pulses` which are scheduled to run in the current day/hour. We use the current time and determine the hour of the day and day of the week according to the defined reporting timezone, or UTC. We then find all `Pulses` that are scheduled to run and send them. The `on-error` function is called if an exception is thrown when sending @@ -31,12 +47,8 @@ `on-error` function makes it easier to test for when an error doesn't occur" ([hour weekday monthday monthweek] (send-pulses! hour weekday monthday monthweek log-pulse-exception)) - ([hour weekday monthday monthweek on-error] - {:pre [(integer? hour) - (and (<= 0 hour) (>= 23 hour)) - (pulse-channel/day-of-week? weekday) - (contains? #{:first :last :mid :other} monthday) - (contains? #{:first :last :other} monthweek)]} + + ([hour :- Hour, weekday :- Weekday, monthday :- MonthDay, monthweek :- MonthWeek, on-error] (log/info (trs "Sending scheduled pulses...")) (let [channels-by-pulse (group-by :pulse_id (pulse-channel/retrieve-scheduled-channels hour weekday monthday monthweek))] (doseq [pulse-id (keys channels-by-pulse)] @@ -103,9 +115,11 @@ (cron/schedule ;; run at the top of every hour (cron/cron-schedule "0 0 * * * ? *") - ;; If a trigger misfires (i.e., Quartz cannot run our job for one reason or another, such as all - ;; worker threads being busy), attempt to fire the triggers again ASAP. This article does a good - ;; job explaining what this means: - ;; https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html - (cron/with-misfire-handling-instruction-ignore-misfires))))] + ;; If send-pulses! misfires, don't try to re-send all the misfired Pulses. Retry only the most + ;; recent misfire, discarding all others. This should hopefully cover cases where a misfire + ;; happens while the system is still running; if the system goes down for an extended period of + ;; time we don't want to re-send tons of (possibly duplicate) Pulses. + ;; + ;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html + (cron/with-misfire-handling-instruction-fire-and-proceed))))] (task/schedule-task! job trigger))) diff --git a/src/metabase/task/sync_databases.clj b/src/metabase/task/sync_databases.clj index 9b0ad73317ba2bdcf49e23ba92da5815033a2785..d2867bb2d97194ba34736d1e979a3d68677d5cf8 100644 --- a/src/metabase/task/sync_databases.clj +++ b/src/metabase/task/sync_databases.clj @@ -150,11 +150,11 @@ (triggers/with-schedule (cron/schedule (cron/cron-schedule (cron-schedule database task-info)) - ;; If we miss a trigger, try again at the next opportunity, but only try it once. If we miss two triggers in a - ;; row (i.e. more than an hour goes by) then the job should still execute, but drop the additional occurrences - ;; of the same trigger (i.e. no need to run the job 3 times because it was missed three times, once is all we - ;; need) - (cron/with-misfire-handling-instruction-fire-and-proceed))))) + ;; if we miss a sync for one reason or another (such as system being down) do not try to run the sync again. + ;; Just wait until the next sync cycle. + ;; + ;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info + (cron/with-misfire-handling-instruction-do-nothing))))) (s/defn ^:private schedule-tasks-for-db! "Schedule a new Quartz job for `database` and `task-info`." diff --git a/test/metabase/task/sync_databases_test.clj b/test/metabase/task/sync_databases_test.clj index 00a9bf6d8c3a1918e069de69eaf21ec853d14799..8d942efe2cdb5e571c1b998bd083baf5d848f1b0 100644 --- a/test/metabase/task/sync_databases_test.clj +++ b/test/metabase/task/sync_databases_test.clj @@ -22,7 +22,7 @@ (.isAnnotationPresent UpdateFieldValues org.quartz.DisallowConcurrentExecution)) (defn- replace-trailing-id-with-<id> [s] - (str/replace s #"\d+$" "<id>")) + (some-> s (str/replace #"\d+$" "<id>"))) (defn- replace-ids-with-<id> [current-tasks] (vec (for [task current-tasks] @@ -36,7 +36,9 @@ (update-in [:data "db-id"] replace-trailing-id-with-<id>)))))))))) (defn- current-tasks [] - (replace-ids-with-<id> (tu/scheduler-current-tasks))) + (->> (tu/scheduler-current-tasks) + (filter #(#{"metabase.task.sync-and-analyze.job" "metabase.task.update-field-values.job"} (:key %))) + replace-ids-with-<id>)) (defmacro ^:private with-scheduler-setup [& body] `(tu/with-temp-scheduler diff --git a/test/metabase/task_test.clj b/test/metabase/task_test.clj new file mode 100644 index 0000000000000000000000000000000000000000..08e1b25bf0833c72b6bed3592012d0b8966ab853 --- /dev/null +++ b/test/metabase/task_test.clj @@ -0,0 +1,79 @@ +(ns metabase.task-test + (:require [clojurewerkz.quartzite + [jobs :as jobs] + [scheduler :as qs] + [triggers :as triggers]] + [clojurewerkz.quartzite.schedule.cron :as cron] + [expectations :refer [expect]] + [metabase.task :as task] + [metabase.test.util :as tu]) + (:import [org.quartz CronTrigger JobDetail])) + +;; make sure we attempt to reschedule tasks so changes made in source are propogated to JDBC backend + +(jobs/defjob TestJob [_]) + +(defn- job ^JobDetail [] + (jobs/build + (jobs/of-type TestJob) + (jobs/with-identity (jobs/key "metabase.task-test.job")))) + +(defn- trigger-1 ^CronTrigger [] + (triggers/build + (triggers/with-identity (triggers/key "metabase.task-test.trigger")) + (triggers/start-now) + (triggers/with-schedule + (cron/schedule + (cron/cron-schedule "0 0 * * * ? *") ; every hour + (cron/with-misfire-handling-instruction-do-nothing))))) + +(defn- trigger-2 ^CronTrigger [] + (triggers/build + (triggers/with-identity (triggers/key "metabase.task-test.trigger")) + (triggers/start-now) + (triggers/with-schedule + (cron/schedule + (cron/cron-schedule "0 0 6 * * ? *") ; at 6 AM every day + (cron/with-misfire-handling-instruction-ignore-misfires))))) + +(defn- do-with-temp-scheduler-and-cleanup [f] + (tu/with-temp-scheduler + (try + (f) + (finally + (task/delete-task! (.getKey (job)) (.getKey (trigger-1))))))) + +(defmacro ^:private with-temp-scheduler-and-cleanup [& body] + `(do-with-temp-scheduler-and-cleanup (fn [] ~@body))) + +(defn- triggers [] + (set + (for [^CronTrigger trigger (qs/get-triggers-of-job (#'metabase.task/scheduler) (.getKey (job)))] + {:cron-expression (.getCronExpression trigger) + :misfire-instruction (.getMisfireInstruction trigger)}))) + +;; can we schedule a job? +(expect + #{{:cron-expression "0 0 * * * ? *" + :misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING}} + (with-temp-scheduler-and-cleanup + (task/schedule-task! (job) (trigger-1)) + (triggers))) + +;; does scheduling a job a second time work without throwing errors? +(expect + #{{:cron-expression "0 0 * * * ? *" + :misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_DO_NOTHING}} + (with-temp-scheduler-and-cleanup + (task/schedule-task! (job) (trigger-1)) + (task/schedule-task! (job) (trigger-1)) + (triggers))) + +;; does scheduling a job with a *new* trigger replace the original? (can we reschedule a job?) +(expect + #{{:cron-expression "0 0 6 * * ? *" + :misfire-instruction CronTrigger/MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY}} + (with-temp-scheduler-and-cleanup + (task/schedule-task! (job) (trigger-1)) + (task/schedule-task! (job) (trigger-2)) + (triggers)))