Skip to content
Snippets Groups Projects
Commit b94938e4 authored by Ryan Senior's avatar Ryan Senior
Browse files

Change database sync and field values job scheduling [ci drivers]

Previously we configured each database to have separate trigger + job
instances for sync+analyze and field values scanning. We were using
the default threadcount and "do nothing" upon misfires. This resulted
in a race condition like below.

By default, sync and field-values scans are scheduled at 50 minutes
past the hour (two separate jobs). As an example assume we have 10
databases configured. This would result in 20 jobs firing at 50
minutes past the hour. The order of execution of these jobs is not
defined. By default, we have 4 threads to execute these jobs. If the 4
threads are busy when a trigger occurs, it will wait up to one
minute (the default). If it can't get a thread in 1 minute, it will
misfire. The policy we had setup will discard the job when it
misfires. The end result of this is a race condition where we could
potentially not run sync on many databases and also be running 4 scans
or syncs at the same time, potentially causing memory issues.

This commit switches to a single job for sync and a single job for
field value scans. Every database will have a trigger for sync and a
trigger for field values, but only 1 instance of sync and field values
scanning will run at a time. In the event of a misfire, it will
execute it as soon as it's able and only once.
parent edf6efa3
No related branches found
No related tags found
No related merge requests found
......@@ -3,3 +3,8 @@ org.quartz.threadPool.threadCount = 4
# Don't phone home
org.quartz.scheduler.skipUpdateCheck: true
# Useful for debugging when Quartz jobs run and when they misfire
#org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingTriggerHistoryPlugin
#org.quartz.plugin.triggHistory.triggerFiredMessage = Trigger \{1\}.\{0\} fired job \{6\}.\{5\} at: \{4, date, HH:mm:ss MM/dd/yyyy}
#org.quartz.plugin.triggHistory.triggerCompleteMessage = Trigger \{1\}.\{0\} completed firing job \{6\}.\{5\} at \{4, date, HH:mm:ss MM/dd/yyyy\}.
......@@ -84,3 +84,21 @@
(when-let [scheduler (scheduler)]
(qs/delete-trigger scheduler trigger-key)
(qs/delete-job scheduler job-key)))
(s/defn add-job!
"Add a job separately from a trigger, replace if the job is already there"
[job :- JobDetail]
(when-let [scheduler (scheduler)]
(qs/add-job scheduler job true)))
(s/defn add-trigger!
"Add a trigger. Assumes the trigger is already associated to a job (i.e. `trigger/for-job`)"
[trigger :- Trigger]
(when-let [scheduler (scheduler)]
(qs/add-trigger scheduler trigger)))
(s/defn delete-trigger!
"Remove `trigger-key` from the scheduler"
[trigger-key :- TriggerKey]
(when-let [scheduler (scheduler)]
(qs/delete-trigger scheduler trigger-key)))
......@@ -18,7 +18,7 @@
[schema.core :as s]
[toucan.db :as db])
(:import metabase.models.database.DatabaseInstance
[org.quartz CronTrigger JobDetail JobKey TriggerKey]))
[org.quartz CronTrigger DisallowConcurrentExecution JobDetail JobKey TriggerKey]))
;;; +------------------------------------------------------------------------------------------------------------------------+
;;; | JOB LOGIC |
......@@ -29,21 +29,20 @@
[job-context]
(Database (u/get-id (get (qc/from-job-data job-context) "db-id"))))
(jobs/defjob SyncAndAnalyzeDatabase [job-context]
;; The DisallowConcurrentExecution on the two defrecords below attaches an annotation to the generated class that will
;; constrain the job execution to only be one at a time. Other triggers wanting the job to run will misfire.
(jobs/defjob ^{org.quartz.DisallowConcurrentExecution true} SyncAndAnalyzeDatabase [job-context]
(let [database (job-context->database job-context)]
(sync-metadata/sync-db-metadata! database)
;; only run analysis if this is a "full sync" database
(when (:is_full_sync database)
(analyze/analyze-db! database))))
(jobs/defjob UpdateFieldValues [job-context]
(jobs/defjob ^{org.quartz.DisallowConcurrentExecution true} UpdateFieldValues [job-context]
(let [database (job-context->database job-context)]
(when (:is_full_sync database)
(field-values/update-field-values! (job-context->database job-context)))))
;;; +------------------------------------------------------------------------------------------------------------------------+
;;; | TASK INFO AND GETTER FUNCTIONS |
;;; +------------------------------------------------------------------------------------------------------------------------+
......@@ -54,23 +53,23 @@
:db-schedule-column s/Keyword
:job-class Class})
(s/def ^:private sync-analyze-task-info :- TaskInfo
{:key :sync-and-analyze
:db-schedule-column :metadata_sync_schedule
:job-class SyncAndAnalyzeDatabase})
(def ^:private task-infos
"Maps containing info about the different independent sync tasks we schedule for each DB."
[{:key :sync-and-analyze
:db-schedule-column :metadata_sync_schedule
:job-class SyncAndAnalyzeDatabase}
{:key :update-field-values
:db-schedule-column :cache_field_values_schedule
:job-class UpdateFieldValues}])
(s/def ^:private field-values-task-info :- TaskInfo
{:key :update-field-values
:db-schedule-column :cache_field_values_schedule
:job-class UpdateFieldValues})
;; These getter functions are not strictly neccesary but are provided primarily so we can get some extra validation by using them
(s/defn ^:private job-key :- JobKey
"Return an appropriate string key for the job described by TASK-INFO for DATABASE-OR-ID."
[database :- DatabaseInstance, task-info :- TaskInfo]
(jobs/key (format "metabase.task.%s.job.%d" (name (:key task-info)) (u/get-id database))))
[task-info :- TaskInfo]
(jobs/key (format "metabase.task.%s.job" (name (:key task-info)))))
(s/defn ^:private trigger-key :- TriggerKey
"Return an appropriate string key for the trigger for TASK-INFO and DATABASE-OR-ID."
......@@ -87,85 +86,101 @@
[task-info :- TaskInfo]
(:job-class task-info))
(s/defn ^:private description :- s/Str
(s/defn ^:private trigger-description :- s/Str
"Return an appropriate description string for a job/trigger for Database described by TASK-INFO."
[database :- DatabaseInstance, task-info :- TaskInfo]
(format "%s Database %d" (name (:key task-info)) (u/get-id database)))
(s/defn ^:private job-description :- s/Str
"Return an appropriate description string for a job"
[task-info :- TaskInfo]
(format "%s for all databases" (name (:key task-info))))
;;; +------------------------------------------------------------------------------------------------------------------------+
;;; | DELETING TASKS FOR A DB |
;;; +------------------------------------------------------------------------------------------------------------------------+
(s/defn ^:private delete-task!
"Cancel a single sync job for DATABASE-OR-ID and TASK-INFO."
"Cancel a single sync task for DATABASE-OR-ID and TASK-INFO."
[database :- DatabaseInstance, task-info :- TaskInfo]
(let [job-key (job-key database task-info)
trigger-key (trigger-key database task-info)]
(log/debug (u/format-color 'red "Unscheduling task for Database %d: job: %s; trigger: %s" (u/get-id database) (.getName job-key) (.getName trigger-key)))
(task/delete-task! job-key trigger-key)))
(let [trigger-key (trigger-key database task-info)]
(log/debug (u/format-color 'red "Unscheduling task for Database %d: trigger: %s" (u/get-id database) (.getName trigger-key)))
(task/delete-trigger! trigger-key)))
(s/defn unschedule-tasks-for-db!
"Cancel *all* scheduled sync and FieldValues caching tassks for DATABASE-OR-ID."
[database :- DatabaseInstance]
(doseq [task-info task-infos]
(delete-task! database task-info)))
(delete-task! database sync-analyze-task-info)
(delete-task! database field-values-task-info))
;;; +------------------------------------------------------------------------------------------------------------------------+
;;; | (RE)SCHEDULING TASKS FOR A DB |
;;; +------------------------------------------------------------------------------------------------------------------------+
(s/defn ^:private job :- JobDetail
"Build a Quartz Job for DATABASE and TASK-INFO."
[database :- DatabaseInstance, task-info :- TaskInfo]
"Build a durable Quartz Job for TASK-INFO. Durable in Quartz allows the job to exist even if there are no triggers
for it."
[task-info :- TaskInfo]
(jobs/build
(jobs/with-description (description database task-info))
(jobs/with-description (job-description task-info))
(jobs/of-type (job-class task-info))
(jobs/using-job-data {"db-id" (u/get-id database)})
(jobs/with-identity (job-key database task-info))))
(jobs/with-identity (job-key task-info))
(jobs/store-durably)))
(s/def ^:private sync-analyze-job (job sync-analyze-task-info))
(s/def ^:private field-values-job (job field-values-task-info))
(s/defn ^:private trigger :- CronTrigger
"Build a Quartz Trigger for DATABASE and TASK-INFO."
[database :- DatabaseInstance, task-info :- TaskInfo]
(triggers/build
(triggers/with-description (description database task-info))
(triggers/with-description (trigger-description database task-info))
(triggers/with-identity (trigger-key database task-info))
(triggers/using-job-data {"db-id" (u/get-id database)})
(triggers/for-job (job-key task-info))
(triggers/start-now)
(triggers/with-schedule
(cron/schedule
(cron/cron-schedule (cron-schedule database task-info))
;; drop tasks if they start to back up
(cron/with-misfire-handling-instruction-do-nothing)))))
;; If we miss a trigger, try again at the next opportunity, but only try it once. If we miss two triggers in a
;; row (i.e. more than an hour goes by) then the job should still execute, but drop the additional occurrences
;; of the same trigger (i.e. no need to run the job 3 times because it was missed three times, once is all we
;; need)
(cron/with-misfire-handling-instruction-fire-and-proceed)))))
(s/defn ^:private schedule-task-for-db!
(s/defn ^:private schedule-tasks-for-db!
"Schedule a new Quartz job for DATABASE and TASK-INFO."
[database :- DatabaseInstance, task-info :- TaskInfo]
(let [job (job database task-info)
trigger (trigger database task-info)]
(log/debug (u/format-color 'green "Scheduling task for Database %d: job: %s; trigger: %s" (u/get-id database) (.getName (.getKey job)) (.getName (.getKey trigger))))
(task/schedule-task! job trigger)))
[database :- DatabaseInstance]
(let [sync-trigger (trigger database sync-analyze-task-info)
fv-trigger (trigger database field-values-task-info)]
;; unschedule any tasks that might already be scheduled
(unschedule-tasks-for-db! database)
(s/defn schedule-tasks-for-db!
"Schedule all the different sync jobs we have for DATABASE.
Unschedules any existing jobs."
[database :- DatabaseInstance]
;; unschedule any tasks that might already be scheduled
(unschedule-tasks-for-db! database)
;; now (re)schedule all the tasks
(doseq [task-info task-infos]
(schedule-task-for-db! database task-info)))
(log/debug (u/format-color 'green "Scheduling sync/analyze and field-values task for database %d: trigger: %s and trigger: %s"
(u/get-id database) (.getName (.getKey sync-trigger))
(u/get-id database) (.getName (.getKey fv-trigger))))
;; now (re)schedule all the tasks
(task/add-trigger! sync-trigger)
(task/add-trigger! fv-trigger)))
;;; +------------------------------------------------------------------------------------------------------------------------+
;;; | TASK INITIALIZATION |
;;; +------------------------------------------------------------------------------------------------------------------------+
(defn- job-init
"Separated from `task-init` primarily as it's useful in testing. Adds the sync and field-values job that all of the
triggers will use"
[]
(task/add-job! sync-analyze-job)
(task/add-job! field-values-job))
(defn task-init
"Automatically called during startup; start the jobs for syncing/analyzing and updating FieldValues for all
Databases."
[]
(job-init)
(doseq [database (db/select Database)]
(schedule-tasks-for-db! database)))
......@@ -4,7 +4,7 @@
(:require [clojure.string :as str]
[expectations :refer :all]
[metabase.models.database :refer [Database]]
metabase.task.sync-databases
[metabase.task.sync-databases :as sync-db]
[metabase.test.util :as tu]
[metabase.util :as u]
[toucan.db :as db]
......@@ -19,48 +19,51 @@
(-> task
(update :description replace-trailing-id-with-<id>)
(update :key replace-trailing-id-with-<id>)
(update-in [:data "db-id"] class)
(update :triggers (fn [triggers]
(vec (for [trigger triggers]
(update trigger :key replace-trailing-id-with-<id>)))))))))
(-> trigger
(update :key replace-trailing-id-with-<id>)
(update-in [:data "db-id"] replace-trailing-id-with-<id>))))))))))
(defn- current-tasks []
(replace-ids-with-<id> (tu/scheduler-current-tasks)))
(defmacro ^:private with-scheduler-setup [& body]
`(tu/with-temp-scheduler
(#'sync-db/job-init)
~@body))
(def ^:private sync-job
{:description "sync-and-analyze for all databases"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job"
:data {}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 50 * * * ? *"
:data {"db-id" "<id>"}}]})
(def ^:private fv-job
{:description "update-field-values for all databases"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job"
:data {}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 50 0 * * ? *"
:data {"db-id" "<id>"}}]})
;; Check that a newly created database automatically gets scheduled
(expect
[{:description "sync-and-analyze Database <id>"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 50 * * * ? *"}]}
{:description "update-field-values Database <id>"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 50 0 * * ? *"}]}]
(tu/with-temp-scheduler
[sync-job fv-job]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres}]
(current-tasks))))
;; Check that a custom schedule is respected when creating a new Database
(expect
[{:description "sync-and-analyze Database <id>"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 30 4,16 * * ? *"}]}
{:description "update-field-values Database <id>"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 15 10 ? * 6#3"}]}]
(tu/with-temp-scheduler
[(assoc-in sync-job [:triggers 0 :cron-schedule] "0 30 4,16 * * ? *")
(assoc-in fv-job [:triggers 0 :cron-schedule] "0 15 10 ? * 6#3")]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres
:metadata_sync_schedule "0 30 4,16 * * ? *" ; 4:30 AM and PM daily
:cache_field_values_schedule "0 15 10 ? * 6#3"}] ; 10:15 on the 3rd Friday of the Month
......@@ -69,67 +72,38 @@
;; Check that a deleted database gets unscheduled
(expect
[]
(tu/with-temp-scheduler
[(update sync-job :triggers empty)
(update fv-job :triggers empty)]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres}]
(db/delete! Database :id (u/get-id database))
(current-tasks))))
;; Check that changing the schedule column(s) for a DB properly updates the scheduled tasks
(expect
[{:description "sync-and-analyze Database <id>"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 15 10 ? * MON-FRI"}]}
{:description "update-field-values Database <id>"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 11 11 11 11 ?"}]}]
(tu/with-temp-scheduler
[(assoc-in sync-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")
(assoc-in fv-job [:triggers 0 :cron-schedule] "0 11 11 11 11 ?")]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:metadata_sync_schedule "0 15 10 ? * MON-FRI" ; 10:15 AM every weekday
:cache_field_values_schedule "0 11 11 11 11 ?") ; Every November 11th at 11:11 AM
:cache_field_values_schedule "0 11 11 11 11 ?") ; Every November 11th at 11:11 AM
(current-tasks))))
;; Check that changing one schedule doesn't affect the other
(expect
[{:description "sync-and-analyze Database <id>"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 50 * * * ? *"}]}
{:description "update-field-values Database <id>"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 15 10 ? * MON-FRI"}]}]
(tu/with-temp-scheduler
[sync-job
(assoc-in fv-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:cache_field_values_schedule "0 15 10 ? * MON-FRI")
(current-tasks))))
(expect
[{:description "sync-and-analyze Database <id>"
:class SyncAndAnalyzeDatabase
:key "metabase.task.sync-and-analyze.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.sync-and-analyze.trigger.<id>"
:cron-schedule "0 15 10 ? * MON-FRI"}]}
{:description "update-field-values Database <id>"
:class UpdateFieldValues
:key "metabase.task.update-field-values.job.<id>"
:data {"db-id" Integer}
:triggers [{:key "metabase.task.update-field-values.trigger.<id>"
:cron-schedule "0 50 0 * * ? *"}]}]
(tu/with-temp-scheduler
[(assoc-in sync-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")
fv-job]
(with-scheduler-setup
(tt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:metadata_sync_schedule "0 15 10 ? * MON-FRI")
......@@ -171,7 +145,7 @@
(with-redefs [metabase.sync.sync-metadata/sync-db-metadata! (fn [& _] (swap! sync-db-metadata-counter inc))
metabase.sync.analyze/analyze-db! (fn [& _] (swap! analyze-db-counter inc))
metabase.sync.field-values/update-field-values! (fn [& _] (swap! update-field-values-counter inc))]
(tu/with-temp-scheduler
(with-scheduler-setup
(tt/with-temp Database [database db-info]
;; give tasks some time to run
(Thread/sleep 2000)
......
......@@ -414,7 +414,8 @@
(merge
{:key (.getName (.getKey trigger))}
(when (instance? CronTrigger trigger)
{:cron-schedule (.getCronExpression ^CronTrigger trigger)}))))}))))))
{:cron-schedule (.getCronExpression ^CronTrigger trigger)
:data (into {} (.getJobDataMap trigger))}))))}))))))
(defn db-timezone-id
"Return the timezone id from the test database. Must be called with
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment