diff --git a/.circleci/config.yml b/.circleci/config.yml
index e675b6097333fc56c920906eab5b6986a91eddf5..b2c90abaace88af852c9a42a3276a0f51df4cc13 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -1333,7 +1333,7 @@ workflows:
           <<: *Matrix
 
       - fe-tests-cypress:
-          name: fe-tests-cypres-mysql-8-<< matrix.edition >>
+          name: fe-tests-cypress-mysql-8-<< matrix.edition >>
           requires:
             - build-uberjar-<< matrix.edition >>
           e: fe-mysql-8
diff --git a/frontend/test/__runner__/test_db_fixture.db.mv.db b/frontend/test/__runner__/test_db_fixture.db.mv.db
index 8455bd0ecc0bbbe87bd1671aa67e6192e04203a1..1fcd479290e1bb9d3078fb0e824e4b173d7d5270 100644
Binary files a/frontend/test/__runner__/test_db_fixture.db.mv.db and b/frontend/test/__runner__/test_db_fixture.db.mv.db differ
diff --git a/resources/quartz.properties b/resources/quartz.properties
index a39a1c48e87a148443d34f703f89153c174cda36..853c7b4f0bcce7d31b8e6263e30fee8467e140fc 100644
--- a/resources/quartz.properties
+++ b/resources/quartz.properties
@@ -4,7 +4,7 @@ org.quartz.scheduler.instanceId = AUTO
 org.quartz.threadPool.threadCount = 10
 
 # Don't phone home
-org.quartz.scheduler.skipUpdateCheck: true
+org.quartz.scheduler.skipUpdateCheck = true
 
 # Use the JDBC backend so we can cluster when running multiple instances!
 # See http://www.quartz-scheduler.org/documentation/quartz-2.x/configuration/ConfigJDBCJobStoreClustering
@@ -27,16 +27,6 @@ org.quartz.jobStore.isClustered = true
 # than not at all for such things)
 org.quartz.jobStore.misfireThreshold=900000
 
-# By default, Quartz will fire triggers up to a minute late without considering them to be misfired; if it cannot fire
-# anything within that period for one reason or another (such as all threads in the thread pool being tied up), the
-# trigger is considered misfired. Threshold is in milliseconds.
-#
-# Default threshould is one minute (60,000)
-# We'll bump it up to 15 minutes (900,000) because the sorts of things we're scheduling aren't extremely time-sensitive,
-# for example Pulses and Sync can be sent out more than a minute late without issue. (In fact, 15 minutes late is better
-# than not at all for such things)
-org.quartz.jobStore.misfireThreshold=900000
-
 # Useful for debugging when Quartz jobs run and when they misfire
 #org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingTriggerHistoryPlugin
 #org.quartz.plugin.triggHistory.triggerFiredMessage = Trigger \{1\}.\{0\} fired job \{6\}.\{5\} at: \{4, date, HH:mm:ss MM/dd/yyyy}
diff --git a/src/metabase/models/database.clj b/src/metabase/models/database.clj
index b6bb0c31cc7b14a4a8a0a2451dfdc7e00d200751..994c79f7d74938529b081fedc27f4bf2cace0fa9 100644
--- a/src/metabase/models/database.clj
+++ b/src/metabase/models/database.clj
@@ -25,7 +25,7 @@
   (try
     ;; this is done this way to avoid circular dependencies
     (classloader/require 'metabase.task.sync-databases)
-    ((resolve 'metabase.task.sync-databases/schedule-tasks-for-db!) database)
+    ((resolve 'metabase.task.sync-databases/check-and-schedule-tasks-for-db!) database)
     (catch Throwable e
       (log/error e (trs "Error scheduling tasks for DB")))))
 
diff --git a/src/metabase/models/task_history.clj b/src/metabase/models/task_history.clj
index f146b1f15cc526c3111db14a649c3353c84e670a..4af8f2fb68af801596826ee7f408c7565d84c9a2 100644
--- a/src/metabase/models/task_history.clj
+++ b/src/metabase/models/task_history.clj
@@ -1,5 +1,6 @@
 (ns metabase.models.task-history
-  (:require [clojure.tools.logging :as log]
+  (:require [cheshire.generate :refer [add-encoder encode-map]]
+            [clojure.tools.logging :as log]
             [java-time :as t]
             [metabase.models.interface :as i]
             [metabase.util :as u]
@@ -94,3 +95,10 @@
   {:style/indent 1}
   [info & body]
   `(do-with-task-history ~info (fn [] ~@body)))
+
+;; TaskHistory can contain an exception for logging purposes, so use the built-in
+;; serialization of a `Throwable->map` to make this something that can be JSON encoded.
+(add-encoder
+ Throwable
+ (fn [throwable json-generator]
+   (encode-map (Throwable->map throwable) json-generator)))
diff --git a/src/metabase/sync/sync_metadata/sync_timezone.clj b/src/metabase/sync/sync_metadata/sync_timezone.clj
index d01613bd175a040fe9e4d9d8f36134139d006020..098aa7cbf316ff90b3d147fad62b9c7ce55d11ee 100644
--- a/src/metabase/sync/sync_metadata/sync_timezone.clj
+++ b/src/metabase/sync/sync_metadata/sync_timezone.clj
@@ -1,6 +1,5 @@
 (ns metabase.sync.sync-metadata.sync-timezone
-  (:require [clojure.tools.logging :as log]
-            [metabase.driver :as driver]
+  (:require [metabase.driver :as driver]
             [metabase.driver.util :as driver.u]
             [metabase.models.database :refer [Database]]
             [metabase.sync.interface :as i]
@@ -13,18 +12,12 @@
   (-> dt .getChronology .getZone .getID))
 
 (s/defn sync-timezone!
-  "Query `database` for it' current time to determine its timezone. The results of this function are used by the sync
-  process to update the timezone if it's different.
-
-  Catches and logs Exceptions if querying for current timezone fails. Returns timezone as `{:timezone-id <timezone>}`
-  upon success, `nil` if query failed."
+  "Query `database` for its current time to determine its timezone. The results of this function are used by the sync
+  process to update the timezone if it's different."
   [database :- i/DatabaseInstance]
-  (try
-    (let [driver  (driver.u/database->driver database)
-          zone-id (or (driver/db-default-timezone driver database)
-                      (some-> (driver/current-db-time driver database) extract-time-zone))]
-      (when-not (= zone-id (:timezone database))
-        (db/update! Database (:id database) {:timezone zone-id}))
-      {:timezone-id zone-id})
-    (catch Exception e
-      (log/warn e "Error syncing database timezone"))))
+  (let [driver  (driver.u/database->driver database)
+        zone-id (or (driver/db-default-timezone driver database)
+                    (some-> (driver/current-db-time driver database) extract-time-zone))]
+    (when-not (= zone-id (:timezone database))
+      (db/update! Database (:id database) {:timezone zone-id}))
+    {:timezone-id zone-id}))
diff --git a/src/metabase/sync/util.clj b/src/metabase/sync/util.clj
index f1766bf59d7dd080abeedf03c985c5b752374f0c..283675f394f042b28d2460df25816b591b02b22d 100644
--- a/src/metabase/sync/util.clj
+++ b/src/metabase/sync/util.clj
@@ -135,6 +135,11 @@
     (driver/sync-in-context (driver.u/database->driver database) database
       f)))
 
+(def ^:private exception-classes-not-to-retry
+  ;;TODO: future, expand this to `driver` level, where the drivers themselves can add to the
+  ;; list of exception classes (like, driver-specific exceptions)
+  [java.net.ConnectException java.net.NoRouteToHostException java.net.UnknownHostException
+   com.mchange.v2.resourcepool.CannotAcquireResourceException])
 
 (defn do-with-error-handling
   "Internal implementation of `with-error-handling`; use that instead of calling this directly."
@@ -144,13 +149,16 @@
   ([message f]
    (try
      (f)
-     (catch Throwable e
-       (log/error e message)
-       e))))
+     (catch Throwable t
+       (log/warn t message)
+       t))))
 
 (defmacro with-error-handling
   "Execute `body` in a way that catches and logs any Exceptions thrown, and returns `nil` if they do so. Pass a
-  `message` to help provide information about what failed for the log message."
+  `message` to help provide information about what failed for the log message.
+
+  The exception classes in `exception-classes-not-to-retry` are a list of classes tested against exceptions thrown.
+  If there is a match found, the sync is aborted as that error is not considered recoverable for this sync run."
   {:style/indent 1}
   [message & body]
   `(do-with-error-handling ~message (fn [] ~@body)))
@@ -339,7 +347,11 @@
         results    (with-start-and-finish-debug-logging (trs "step ''{0}'' for {1}"
                                                              step-name
                                                              (name-for-logging database))
-                     #(sync-fn database))
+                     (fn [& args]
+                       (try
+                         (apply sync-fn database args)
+                         (catch Throwable t
+                           {:throwable t}))))
         end-time   (t/zoned-date-time)]
     [step-name (assoc results
                  :start-time start-time
@@ -424,7 +436,21 @@
    database :- i/DatabaseInstance
    sync-steps :- [StepDefinition]]
   (let [start-time    (t/zoned-date-time)
-        step-metadata (mapv #(run-step-with-metadata database %) sync-steps)
+        step-metadata (loop [[step-defn & rest-defns] sync-steps
+                             result                   []]
+                        (let [[step-name r] (run-step-with-metadata database step-defn)
+                              new-result    (conj result [step-name r])]
+                          (if (contains? r :throwable)
+                            (let [caught-exception  (:throwable r)
+                                  exception-classes (u/full-exception-chain caught-exception)
+                                  abandon?          (some true? (for [ex      exception-classes
+                                                                      test-ex exception-classes-not-to-retry]
+                                                        (= (.. ^Object ex getClass getName) (.. ^Class test-ex getName))))]
+                              (cond abandon? new-result
+                                    (not (seq rest-defns)) new-result
+                                    :else (recur rest-defns new-result)))
+                            (cond (not (seq rest-defns)) new-result
+                                  :else (recur rest-defns new-result)))))
         end-time      (t/zoned-date-time)
         sync-metadata {:start-time start-time
                        :end-time   end-time
diff --git a/src/metabase/task.clj b/src/metabase/task.clj
index d63bd5e2f35d0608ec01b74c9e99ecc43976f889..3dd6d2cb5946d04ccd9a9ec5105b9b674a546ae1 100644
--- a/src/metabase/task.clj
+++ b/src/metabase/task.clj
@@ -271,14 +271,15 @@
 
     (task/job-info \"metabase.task.sync-and-analyze.job\")"
   [job-key]
-  (let [job-key (->job-key job-key)]
-    (try
-      (assoc (job-detail->info (qs/get-job (scheduler) job-key))
-             :triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName)
-                                              (qs/get-triggers-of-job (scheduler) job-key))]
-                         (trigger->info trigger)))
-      (catch Throwable e
-        (log/warn e (trs "Error fetching details for Job: {0}" (.getName job-key)))))))
+  (when-let [scheduler (scheduler)]
+    (let [job-key (->job-key job-key)]
+      (try
+        (assoc (job-detail->info (qs/get-job scheduler job-key))
+               :triggers (for [trigger (sort-by #(-> ^Trigger % .getKey .getName)
+                                                (qs/get-triggers-of-job scheduler job-key))]
+                           (trigger->info trigger)))
+        (catch Throwable e
+          (log/warn e (trs "Error fetching details for Job: {0}" (.getName job-key))))))))
 
 (defn- jobs-info []
   (->> (some-> (scheduler) (.getJobKeys nil))
diff --git a/src/metabase/task/sync_databases.clj b/src/metabase/task/sync_databases.clj
index 9cf1698bfa6ddf814316d707264a6df2d46874d0..c8f5a41c9fd05980d60022d5a5bb38fb801f6d3c 100644
--- a/src/metabase/task/sync_databases.clj
+++ b/src/metabase/task/sync_databases.clj
@@ -199,20 +199,41 @@
       ;; See https://www.nurkiewicz.com/2012/04/quartz-scheduler-misfire-instructions.html for more info
       (cron/with-misfire-handling-instruction-do-nothing)))))
 
-(s/defn ^:private schedule-tasks-for-db!
-  "Schedule a new Quartz job for `database` and `task-info`."
+(s/defn ^:private check-and-schedule-tasks-for-db!
+  "Schedule a new Quartz job for `database` and `task-info` if it doesn't already exist or is incorrect."
   [database :- DatabaseInstance]
-  (let [sync-trigger (trigger database sync-analyze-task-info)
-        fv-trigger   (trigger database field-values-task-info)]
-    ;; unschedule any tasks that might already be scheduled
-    (unschedule-tasks-for-db! database)
-    (log/debug
-     (u/format-color 'green "Scheduling sync/analyze and field-values task for database %d: trigger: %s and trigger: %s"
-                     (u/get-id database) (.getName (.getKey sync-trigger))
-                     (u/get-id database) (.getName (.getKey fv-trigger))))
-    ;; now (re)schedule all the tasks
-    (task/add-trigger! sync-trigger)
-    (task/add-trigger! fv-trigger)))
+  (let [sync-job (task/job-info (job-key sync-analyze-task-info))
+        fv-job   (task/job-info (job-key field-values-task-info))
+
+        sync-trigger (trigger database sync-analyze-task-info)
+        fv-trigger   (trigger database field-values-task-info)
+
+        existing-sync-trigger (some (fn [trigger] (when (= (:key trigger) (.. sync-trigger getKey getName))
+                                                    trigger))
+                                    (:triggers sync-job))
+        existing-fv-trigger   (some (fn [trigger] (when (= (:key trigger) (.. fv-trigger getKey getName))
+                                                    trigger))
+                                    (:triggers fv-job))]
+
+    (doseq [{:keys [existing-trigger existing-schedule ti trigger description]}
+            [{:existing-trigger  existing-sync-trigger
+              :existing-schedule (:metadata_sync_schedule database)
+              :ti                sync-analyze-task-info
+              :trigger           sync-trigger
+              :description       "sync/analyze"}
+             {:existing-trigger  existing-fv-trigger
+              :existing-schedule (:cache_field_values_schedule database)
+              :ti                field-values-task-info
+              :trigger           fv-trigger
+              :description       "field-values"}]]
+      (when (or (not existing-trigger)
+                (not= (:schedule existing-trigger) existing-schedule))
+        (delete-task! database ti)
+        (log/info
+         (u/format-color 'green "Scheduling %s for database %d: trigger: %s"
+                         description (u/get-id database) (.. ^org.quartz.Trigger trigger getKey getName)))
+        ;; now (re)schedule the task
+        (task/add-trigger! trigger)))))
 
 
 ;;; +----------------------------------------------------------------------------------------------------------------+
@@ -257,7 +278,6 @@
   (job-init)
   (doseq [database (db/select Database)]
     (try
-      ;; TODO -- shouldn't all the triggers be scheduled already?
-      (schedule-tasks-for-db! (maybe-update-db-schedules database))
+      (check-and-schedule-tasks-for-db! (maybe-update-db-schedules database))
       (catch Throwable e
         (log/error e (trs "Failed to schedule tasks for Database {0}" (:id database)))))))
diff --git a/src/metabase/util.clj b/src/metabase/util.clj
index c1ad71a755e9f5e4961a3018b23fb9d9ad55158a..a0f3b77921bad1fb430dca779fa2cd93f452bb37 100644
--- a/src/metabase/util.clj
+++ b/src/metabase/util.clj
@@ -406,6 +406,11 @@
   (^String [s max-length]
    (str/join (take max-length (slugify s)))))
 
+(defn full-exception-chain
+  "Gather the full exception chain into a single vector."
+  [e]
+  (take-while some? (iterate #(.getCause ^Throwable %) e)))
+
 (defn all-ex-data
   "Like `ex-data`, but merges `ex-data` from causes. If duplicate keys exist, the keys from the highest level are
   preferred.
@@ -422,7 +427,7 @@
    (fn [data e]
      (merge (ex-data e) data))
    nil
-   (take-while some? (iterate #(.getCause ^Throwable %) e))))
+   (full-exception-chain e)))
 
 (defn do-with-auto-retries
   "Execute `f`, a function that takes no arguments, and return the results.
diff --git a/test/metabase/sync/sync_metadata/sync_timezone_test.clj b/test/metabase/sync/sync_metadata/sync_timezone_test.clj
index b00b9cb5b3d7529d6bcfb4d9a29943f7f48ddf4f..5f91b2277e7fe4acea92dffff25f00e16b8ddaaf 100644
--- a/test/metabase/sync/sync_metadata/sync_timezone_test.clj
+++ b/test/metabase/sync/sync_metadata/sync_timezone_test.clj
@@ -37,27 +37,3 @@
             (is (nil? tz-after-update)))
           (testing "Check that the value was set again after sync"
             (is (time/time-zone-for-id (db-timezone db)))))))))
-
-(deftest bad-change-test
-  (mt/test-drivers #{:postgres}
-    (testing "Test that if timezone is changed to something that fails, timezone is unaffected."
-      ;; Setting timezone to "Austrailia/Sydney" fails on some computers, especially the CI ones. In that case it fails as
-      ;; the dates on PostgreSQL return 'AEST' for the time zone name. The Exception is logged, but the timezone column
-      ;; should be left alone and processing should continue.
-      ;;
-      ;; TODO - Recently this call has started *succeeding* for me on Java 10/11 and Postgres 9.6. I've seen it sync as both
-      ;; "Australia/Hobart" and "Australia/Sydney". Since setting the timezone no longer always fails it's no longer a good
-      ;; test. We need to think of something else here. In the meantime, I'll go ahead and consider any of the three options
-      ;; valid answers.
-      (mt/dataset test-data
-        ;; use `with-temp-vals-in-db` to make sure the test data DB timezone gets reset to whatever it was before the test
-        ;; ran if we accidentally end up setting it in the `:after` part
-        (mt/with-temp-vals-in-db Database (mt/db) {:timezone (db-timezone (mt/db))}
-          (sync-tz/sync-timezone! (mt/db))
-          (testing "before"
-            (is (= "UTC"
-                   (db-timezone (mt/db)))))
-          (testing "after"
-            (mt/with-temporary-setting-values [report-timezone "Australia/Sydney"]
-              (sync-tz/sync-timezone! (mt/db))
-              (is (contains? #{"Australia/Hobart" "Australia/Sydney" "UTC"} (db-timezone (mt/db)))))))))))
diff --git a/test/metabase/sync/util_test.clj b/test/metabase/sync/util_test.clj
index 9b34dcc7052a8001b9f7ff63e0ec895f0933fcf3..d3ecf4fd8addd56e74538606d5d43cc91e7b6fbd 100644
--- a/test/metabase/sync/util_test.clj
+++ b/test/metabase/sync/util_test.clj
@@ -8,6 +8,7 @@
             [metabase.models.task-history :refer [TaskHistory]]
             [metabase.sync :as sync]
             [metabase.sync.util :as sync-util :refer :all]
+            [metabase.test :as mt]
             [metabase.test.util :as tu]
             [toucan.db :as db]
             [toucan.util.test :as tt]))
@@ -188,3 +189,59 @@
         (testing "has-step-duration?"
           (is (= true
                  (str/includes? results "4.0 s"))))))))
+
+(deftest error-handling-test
+  (testing "A ConnectException will cause sync to stop"
+    (mt/dataset sample-dataset
+      (let [expected           (java.io.IOException.
+                                "outer"
+                                (java.net.ConnectException.
+                                 "inner, this one triggers the failure"))
+            actual             (sync-util/sync-operation :sync-error-handling (mt/db) "sync error handling test"
+                                 (sync-util/run-sync-operation
+                                  "sync"
+                                  (mt/db)
+                                  [(sync-util/create-sync-step "failure-step"
+                                                               (fn [_]
+                                                                 (throw expected)))
+                                   (sync-util/create-sync-step "should-not-run"
+                                                               (fn [_]
+                                                                 {}))]))
+            [step-name result] (first (:steps actual))]
+        (is (= 1 (count (:steps actual))))
+        (is (= "failure-step" step-name))
+        (is (= {:throwable expected :log-summary-fn nil}
+               (dissoc result :start-time :end-time))))))
+
+  (doseq [ex [(java.io.IOException.
+               "outer, does not trigger"
+               (java.net.SocketException. "inner, this one does not trigger"))
+              (java.lang.IllegalArgumentException. "standalone, does not trigger")
+              (java.sql.SQLException.
+               "outer, does not trigger"
+               (java.sql.SQLException.
+                "inner, does not trigger"
+                (java.lang.IllegalArgumentException.
+                 "third level, does not trigger")))]]
+    (testing "Other errors will not cause sync to stop"
+      (let [actual             (sync-util/sync-operation :sync-error-handling (mt/db) "sync error handling test"
+                                 (sync-util/run-sync-operation
+                                  "sync"
+                                  (mt/db)
+                                  [(sync-util/create-sync-step "failure-step"
+                                                               (fn [_]
+                                                                 (throw ex)))
+                                   (sync-util/create-sync-step "should-continue"
+                                                               (fn [_]
+                                                                 {}))]))]
+
+        ;; make sure we've ran two steps. the first one will have thrown an exception,
+        ;; but it wasn't an exception that can cause an abort.
+        (is (= 2 (count (:steps actual))))
+        (let [[step-name result] (first (:steps actual))]
+          (is (= "failure-step" step-name))
+          (is (= {:throwable ex :log-summary-fn nil}
+                 (dissoc result :start-time :end-time))))
+        (let [[step-name result] (second (:steps actual))]
+          (is (= "should-continue" step-name))
+          (is (= {:log-summary-fn nil} (dissoc result :start-time :end-time))))))))
diff --git a/test/metabase/task/sync_databases_test.clj b/test/metabase/task/sync_databases_test.clj
index 2813da9f0a3a126b0b936e232cbd9a261b00076d..ecdd7a50c472d9ab04582ae9f83dd6e9cebf25b7 100644
--- a/test/metabase/task/sync_databases_test.clj
+++ b/test/metabase/task/sync_databases_test.clj
@@ -46,7 +46,7 @@
          (update :triggers (partial filter #(str/ends-with? (:key %) (str \. (u/get-id db-or-id)))))
          (dissoc :class)))))
 
-(defmacro ^:private with-scheduler-setup [& body]
+(defmacro with-scheduler-setup [& body]
   `(tu/with-temp-scheduler
      (#'sync-db/job-init)
      ~@body))
@@ -108,18 +108,18 @@
 (deftest schedule-changes-only-expected-test
   (is (= [sync-job
           (assoc-in fv-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")]
-        (with-scheduler-setup
-          (mt/with-temp Database [database {:engine :postgres}]
-            (db/update! Database (u/get-id database)
-              :cache_field_values_schedule "0 15 10 ? * MON-FRI")
-            (current-tasks-for-db database)))))
+         (with-scheduler-setup
+           (mt/with-temp Database [database {:engine :postgres}]
+             (db/update! Database (u/get-id database)
+                         :cache_field_values_schedule "0 15 10 ? * MON-FRI")
+             (current-tasks-for-db database)))))
 
   (is (= [(assoc-in sync-job [:triggers 0 :cron-schedule] "0 15 10 ? * MON-FRI")
           fv-job]
          (with-scheduler-setup
            (mt/with-temp Database [database {:engine :postgres}]
              (db/update! Database (u/get-id database)
-               :metadata_sync_schedule "0 15 10 ? * MON-FRI")
+                         :metadata_sync_schedule "0 15 10 ? * MON-FRI")
              (current-tasks-for-db database))))))
 
 (deftest validate-schedules-test