Skip to content
Snippets Groups Projects
Unverified Commit 6668b4ee authored by Robert Roland's avatar Robert Roland Committed by GitHub
Browse files

Clean up sync scheduling (#15043)

It's possible for the scheduler to get in a weird state if the sync
fails while it executes.

This change makes it *only* recreate a job/task if the schedule has
changed or if it is missing. Previously, clearing the state at every
start had bad effects if the JVM had terminated during the sync.

Adds a vector of Exception classes that signal a "fatal" exception
during sync for a specific database. If these exceptions occur, the sync
for that database stops and will pick up next time.

This will have to be expanded per driver, but I don't see a way around
that, as each driver will have its own, unique way of failing.

metabase/metabase#14817
parent ef824613
No related branches found
No related tags found
No related merge requests found
......@@ -1333,7 +1333,7 @@ workflows:
<<: *Matrix
- fe-tests-cypress:
name: fe-tests-cypres-mysql-8-<< matrix.edition >>
name: fe-tests-cypress-mysql-8-<< matrix.edition >>
requires:
- build-uberjar-<< matrix.edition >>
e: fe-mysql-8
......
No preview for this file type
......@@ -4,7 +4,7 @@ org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.threadCount = 10
# Don't phone home
org.quartz.scheduler.skipUpdateCheck: true
org.quartz.scheduler.skipUpdateCheck = true
# Use the JDBC backend so we can cluster when running multiple instances!
# See http://www.quartz-scheduler.org/documentation/quartz-2.x/configuration/ConfigJDBCJobStoreClustering
......@@ -27,16 +27,6 @@ org.quartz.jobStore.isClustered = true
# than not at all for such things)
org.quartz.jobStore.misfireThreshold=900000
# By default, Quartz will fire triggers up to a minute late without considering them to be misfired; if it cannot fire
# anything within that period for one reason or another (such as all threads in the thread pool being tied up), the
# trigger is considered misfired. Threshold is in milliseconds.
#
# Default threshould is one minute (60,000)
# We'll bump it up to 15 minutes (900,000) because the sorts of things we're scheduling aren't extremely time-sensitive,
# for example Pulses and Sync can be sent out more than a minute late without issue. (In fact, 15 minutes late is better
# than not at all for such things)
org.quartz.jobStore.misfireThreshold=900000
# Useful for debugging when Quartz jobs run and when they misfire
#org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingTriggerHistoryPlugin
#org.quartz.plugin.triggHistory.triggerFiredMessage = Trigger \{1\}.\{0\} fired job \{6\}.\{5\} at: \{4, date, HH:mm:ss MM/dd/yyyy}
......
......@@ -25,7 +25,7 @@
(try
;; this is done this way to avoid circular dependencies
(classloader/require 'metabase.task.sync-databases)
((resolve 'metabase.task.sync-databases/schedule-tasks-for-db!) database)
((resolve 'metabase.task.sync-databases/check-and-schedule-tasks-for-db!) database)
(catch Throwable e
(log/error e (trs "Error scheduling tasks for DB")))))
......
(ns metabase.models.task-history
(:require [clojure.tools.logging :as log]
(:require [cheshire.generate :refer [add-encoder encode-map]]
[clojure.tools.logging :as log]
[java-time :as t]
[metabase.models.interface :as i]
[metabase.util :as u]
......@@ -94,3 +95,10 @@
{:style/indent 1}
[info & body]
`(do-with-task-history ~info (fn [] ~@body)))
;; TaskHistory can contain an exception for logging purposes, so use the built-in
;; serialization of a `Throwable->map` to make this something that can be JSON encoded.
(add-encoder
Throwable
(fn [throwable json-generator]
(encode-map (Throwable->map throwable) json-generator)))
(ns metabase.sync.sync-metadata.sync-timezone
(:require [clojure.tools.logging :as log]
[metabase.driver :as driver]
(:require [metabase.driver :as driver]
[metabase.driver.util :as driver.u]
[metabase.models.database :refer [Database]]
[metabase.sync.interface :as i]
......@@ -13,18 +12,12 @@
(-> dt .getChronology .getZone .getID))
(s/defn sync-timezone!
"Query `database` for it' current time to determine its timezone. The results of this function are used by the sync
process to update the timezone if it's different.
Catches and logs Exceptions if querying for current timezone fails. Returns timezone as `{:timezone-id <timezone>}`
upon success, `nil` if query failed."
"Query `database` for its current time to determine its timezone. The results of this function are used by the sync
process to update the timezone if it's different."
[database :- i/DatabaseInstance]
(try
(let [driver (driver.u/database->driver database)
zone-id (or (driver/db-default-timezone driver database)
(some-> (driver/current-db-time driver database) extract-time-zone))]
(when-not (= zone-id (:timezone database))
(db/update! Database (:id database) {:timezone zone-id}))
{:timezone-id zone-id})
(catch Exception e
(log/warn e "Error syncing database timezone"))))
(let [driver (driver.u/database->driver database)
zone-id (or (driver/db-default-timezone driver database)
(some-> (driver/current-db-time driver database) extract-time-zone))]
(when-not (= zone-id (:timezone database))
(db/update! Database (:id database) {:timezone zone-id}))
{:timezone-id zone-id}))
......@@ -135,6 +135,11 @@
(driver/sync-in-context (driver.u/database->driver database) database
f)))
(def ^:private exception-classes-not-to-retry
;;TODO: future, expand this to `driver` level, where the drivers themselves can add to the
;; list of exception classes (like, driver-specific exceptions)
[java.net.ConnectException java.net.NoRouteToHostException java.net.UnknownHostException
com.mchange.v2.resourcepool.CannotAcquireResourceException])
(defn do-with-error-handling
"Internal implementation of `with-error-handling`; use that instead of calling this directly."
......@@ -144,13 +149,16 @@
([message f]
(try
(f)
(catch Throwable e
(log/error e message)
e))))
(catch Throwable t
(log/warn t message)
t))))
(defmacro with-error-handling
"Execute `body` in a way that catches and logs any Exceptions thrown, and returns `nil` if they do so. Pass a
`message` to help provide information about what failed for the log message."
`message` to help provide information about what failed for the log message.
The exception classes in `exception-classes-not-to-retry` are a list of classes tested against exceptions thrown.
If there is a match found, the sync is aborted as that error is not considered recoverable for this sync run."
{:style/indent 1}
[message & body]
`(do-with-error-handling ~message (fn [] ~@body)))
......@@ -339,7 +347,11 @@
results (with-start-and-finish-debug-logging (trs "step ''{0}'' for {1}"
step-name
(name-for-logging database))
#(sync-fn database))
(fn [& args]
(try
(apply sync-fn database args)
(catch Throwable t
{:throwable t}))))
end-time (t/zoned-date-time)]
[step-name (assoc results
:start-time start-time
......@@ -424,7 +436,21 @@
database :- i/DatabaseInstance
sync-steps :- [StepDefinition]]
(let [start-time (t/zoned-date-time)
step-metadata (mapv #(run-step-with-metadata database %) sync-steps)
step-metadata (loop [[step-defn & rest-defns] sync-steps
result []]
(let [[step-name r] (run-step-with-metadata database step-defn)
new-result (conj result [step-name r])]
(if (contains? r :throwable)
(let [caught-exception (:throwable r)
exception-classes (u/full-exception-chain caught-exception)
abandon? (some true? (for [ex exception-classes
test-ex exception-classes-not-to-retry]
(= (.. ^Object ex getClass getName) (.. ^Class test-ex getName))))]
(cond abandon? new-result
(not (seq rest-defns)) new-result
:else (recur rest-defns new-result)))
(cond (not (seq rest-defns)) new-result
:else (recur rest-defns new-result)))))
end-time (t/zoned-date-time)
sync-metadata {:start-time start-time
:end-time end-time
......
......@@ -271,14 +271,15 @@
(task/job-info \"metabase.task.sync-and-analyze.job\")"
[job-key]
(let [job-key (->job-key job-key)]
(try
(assoc (job-detail->info (qs/get-job (scheduler) job-key))
:triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName)
(qs/get-triggers-of-job (scheduler) job-key))]
(trigger->info trigger)))
(catch Throwable e
(log/warn e (trs "Error fetching details for Job: {0}" (.getName job-key)))))))
(when-let [scheduler (scheduler)]
(let [job-key (->job-key job-key)]
(try
(assoc (job-detail->info (qs/get-job scheduler job-key))
:triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName)
(qs/get-triggers-of-job scheduler job-key))]
(trigger->info trigger)))
(catch Throwable e
(log/warn e (trs "Error fetching details for Job: {0}" (.getName job-key))))))))
(defn- jobs-info []
(->> (some-> (scheduler) (.getJobKeys nil))
......
......@@ -199,20 +199,41 @@
;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info
(cron/with-misfire-handling-instruction-do-nothing)))))
(s/defn ^:private schedule-tasks-for-db!
"Schedule a new Quartz job for `database` and `task-info`."
(s/defn ^:private check-and-schedule-tasks-for-db!
"Schedule a new Quartz job for `database` and `task-info` if it doesn't already exist or is incorrect."
[database :- DatabaseInstance]
(let [sync-trigger (trigger database sync-analyze-task-info)
fv-trigger (trigger database field-values-task-info)]
;; unschedule any tasks that might already be scheduled
(unschedule-tasks-for-db! database)
(log/debug
(u/format-color 'green "Scheduling sync/analyze and field-values task for database %d: trigger: %s and trigger: %s"
(u/get-id database) (.getName (.getKey sync-trigger))
(u/get-id database) (.getName (.getKey fv-trigger))))
;; now (re)schedule all the tasks
(task/add-trigger! sync-trigger)
(task/add-trigger! fv-trigger)))
(let [sync-job (task/job-info (job-key sync-analyze-task-info))
fv-job (task/job-info (job-key field-values-task-info))
sync-trigger (trigger database sync-analyze-task-info)
fv-trigger (trigger database field-values-task-info)
existing-sync-trigger (some (fn [trigger] (when (= (:key trigger) (.. sync-trigger getKey getName))
trigger))
(:triggers sync-job))
existing-fv-trigger (some (fn [trigger] (when (= (:key trigger) (.. fv-trigger getKey getName))
trigger))
(:triggers fv-job))]
(doseq [{:keys [existing-trigger existing-schedule ti trigger description]}
[{:existing-trigger existing-sync-trigger
:existing-schedule (:metadata_sync_schedule database)
:ti sync-analyze-task-info
:trigger sync-trigger
:description "sync/analyze"}
{:existing-trigger existing-fv-trigger
:existing-schedule (:cache_field_values_schedule database)
:ti field-values-task-info
:trigger fv-trigger
:description "field-values"}]]
(when (or (not existing-trigger)
(not= (:schedule existing-trigger) existing-schedule))
(delete-task! database ti)
(log/info
(u/format-color 'green "Scheduling %s for database %d: trigger: %s"
description (u/get-id database) (.. ^org.quartz.Trigger trigger getKey getName)))
;; now (re)schedule the task
(task/add-trigger! trigger)))))
;;; +----------------------------------------------------------------------------------------------------------------+
......@@ -257,7 +278,6 @@
(job-init)
(doseq [database (db/select Database)]
(try
;; TODO -- shouldn't all the triggers be scheduled already?
(schedule-tasks-for-db! (maybe-update-db-schedules database))
(check-and-schedule-tasks-for-db! (maybe-update-db-schedules database))
(catch Throwable e
(log/error e (trs "Failed to schedule tasks for Database {0}" (:id database)))))))
......@@ -406,6 +406,11 @@
(^String [s max-length]
(str/join (take max-length (slugify s)))))
(defn full-exception-chain
"Gather the full exception chain into a single vector."
[e]
(take-while some? (iterate #(.getCause ^Throwable %) e)))
(defn all-ex-data
"Like `ex-data`, but merges `ex-data` from causes. If duplicate keys exist, the keys from the highest level are
preferred.
......@@ -422,7 +427,7 @@
(fn [data e]
(merge (ex-data e) data))
nil
(take-while some? (iterate #(.getCause ^Throwable %) e))))
(full-exception-chain e)))
(defn do-with-auto-retries
"Execute `f`, a function that takes no arguments, and return the results.
......
......@@ -37,27 +37,3 @@
(is (nil? tz-after-update)))
(testing "Check that the value was set again after sync"
(is (time/time-zone-for-id (db-timezone db)))))))))
(deftest bad-change-test
(mt/test-drivers #{:postgres}
(testing "Test that if timezone is changed to something that fails, timezone is unaffected."
;; Setting timezone to "Austrailia/Sydney" fails on some computers, especially the CI ones. In that case it fails as
;; the dates on PostgreSQL return 'AEST' for the time zone name. The Exception is logged, but the timezone column
;; should be left alone and processing should continue.
;;
;; TODO - Recently this call has started *succeeding* for me on Java 10/11 and Postgres 9.6. I've seen it sync as both
;; "Australia/Hobart" and "Australia/Sydney". Since setting the timezone no longer always fails it's no longer a good
;; test. We need to think of something else here. In the meantime, I'll go ahead and consider any of the three options
;; valid answers.
(mt/dataset test-data
;; use `with-temp-vals-in-db` to make sure the test data DB timezone gets reset to whatever it was before the test
;; ran if we accidentally end up setting it in the `:after` part
(mt/with-temp-vals-in-db Database (mt/db) {:timezone (db-timezone (mt/db))}
(sync-tz/sync-timezone! (mt/db))
(testing "before"
(is (= "UTC"
(db-timezone (mt/db)))))
(testing "after"
(mt/with-temporary-setting-values [report-timezone "Australia/Sydney"]
(sync-tz/sync-timezone! (mt/db))
(is (contains? #{"Australia/Hobart" "Australia/Sydney" "UTC"} (db-timezone (mt/db)))))))))))
......@@ -8,6 +8,7 @@
[metabase.models.task-history :refer [TaskHistory]]
[metabase.sync :as sync]
[metabase.sync.util :as sync-util :refer :all]
[metabase.test :as mt]
[metabase.test.util :as tu]
[toucan.db :as db]
[toucan.util.test :as tt]))
......@@ -188,3 +189,59 @@
(testing "has-step-duration?"
(is (= true
(str/includes? results "4.0 s"))))))))
(deftest error-handling-test
(testing "A ConnectException will cause sync to stop"
(mt/dataset sample-dataset
(let [expected (java.io.IOException.
"outer"
(java.net.ConnectException.
"inner, this one triggers the failure"))
actual (sync-util/sync-operation :sync-error-handling (mt/db) "sync error handling test"
(sync-util/run-sync-operation
"sync"
(mt/db)
[(sync-util/create-sync-step "failure-step"
(fn [_]
(throw expected)))
(sync-util/create-sync-step "should-not-run"
(fn [_]
{}))]))
[step-name result] (first (:steps actual))]
(is (= 1 (count (:steps actual))))
(is (= "failure-step" step-name))
(is (= {:throwable expected :log-summary-fn nil}
(dissoc result :start-time :end-time))))))
(doseq [ex [(java.io.IOException.
"outer, does not trigger"
(java.net.SocketException. "inner, this one does not trigger"))
(java.lang.IllegalArgumentException. "standalone, does not trigger")
(java.sql.SQLException.
"outer, does not trigger"
(java.sql.SQLException.
"inner, does not trigger"
(java.lang.IllegalArgumentException.
"third level, does not trigger")))]]
(testing "Other errors will not cause sync to stop"
(let [actual (sync-util/sync-operation :sync-error-handling (mt/db) "sync error handling test"
(sync-util/run-sync-operation
"sync"
(mt/db)
[(sync-util/create-sync-step "failure-step"
(fn [_]
(throw ex)))
(sync-util/create-sync-step "should-continue"
(fn [_]
{}))]))]
;; make sure we've ran two steps. the first one will have thrown an exception,
;; but it wasn't an exception that can cause an abort.
(is (= 2 (count (:steps actual))))
(let [[step-name result] (first (:steps actual))]
(is (= "failure-step" step-name))
(is (= {:throwable ex :log-summary-fn nil}
(dissoc result :start-time :end-time))))
(let [[step-name result] (second (:steps actual))]
(is (= "should-continue" step-name))
(is (= {:log-summary-fn nil} (dissoc result :start-time :end-time))))))))
......@@ -46,7 +46,7 @@
(update :triggers (partial filter #(str/ends-with? (:key %) (str \. (u/get-id db-or-id)))))
(dissoc :class)))))
(defmacro ^:private with-scheduler-setup [& body]
(defmacro with-scheduler-setup [& body]
`(tu/with-temp-scheduler
(#'sync-db/job-init)
~@body))
......@@ -108,18 +108,18 @@
(deftest schedule-changes-only-expected-test
(is (= [sync-job
(assoc-in fv-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")]
(with-scheduler-setup
(mt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:cache_field_values_schedule "0 15 10 ? * MON-FRI")
(current-tasks-for-db database)))))
(with-scheduler-setup
(mt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:cache_field_values_schedule "0 15 10 ? * MON-FRI")
(current-tasks-for-db database)))))
(is (= [(assoc-in sync-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")
fv-job]
(with-scheduler-setup
(mt/with-temp Database [database {:engine :postgres}]
(db/update! Database (u/get-id database)
:metadata_sync_schedule "0 15 10 ? * MON-FRI")
:metadata_sync_schedule "0 15 10 ? * MON-FRI")
(current-tasks-for-db database))))))
(deftest validate-schedules-test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment