Skip to content
Snippets Groups Projects
Unverified Commit 927b5f8c authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

More pervasive and atomic Liquibase locking (#38810)

parent bbe6b0b8
No related branches found
No related tags found
No related merge requests found
...@@ -200,13 +200,16 @@ ...@@ -200,13 +200,16 @@
(catch Exception e (catch Exception e
(log/error e (trs "Unable to release the Liquibase lock after a migration failure")))))) (log/error e (trs "Unable to release the Liquibase lock after a migration failure"))))))
(defn- wait-for-migration-lock-to-be-cleared (defn- lock-service ^LockService [^Liquibase liquibase]
(.getLockService (LockServiceFactory/getInstance) (.getDatabase liquibase)))
(defn- wait-for-migration-lock
"Check and make sure the database isn't locked. If it is, sleep for 2 seconds and then retry several times. There's a "Check and make sure the database isn't locked. If it is, sleep for 2 seconds and then retry several times. There's a
chance the lock will end up clearing up so we can run migrations normally." chance the lock will end up clearing up so we can run migrations normally."
[^Liquibase liquibase] [^Liquibase liquibase]
(let [retry-counter (volatile! 0)] (let [retry-counter (volatile! 0)]
(u/auto-retry 5 (u/auto-retry 5
(when (migration-lock-exists? liquibase) (when-not (.acquireLock (lock-service liquibase))
(Thread/sleep 2000) (Thread/sleep 2000)
(vswap! retry-counter inc) (vswap! retry-counter inc)
(throw (throw
...@@ -216,66 +219,100 @@ ...@@ -216,66 +219,100 @@
" " " "
(trs "You can force-release these locks by running `java -jar metabase.jar migrate release-locks`.")))))) (trs "You can force-release these locks by running `java -jar metabase.jar migrate release-locks`."))))))
(if (pos? @retry-counter) (if (pos? @retry-counter)
(log/warnf "Migration lock was cleared after %d retries." @retry-counter) (log/warnf "Migration lock was acquired after %d retries." @retry-counter)
(log/info "No migration lock found.")))) (do (log/info "No migration lock found.")
(log/info "Migration lock acquired.")))))
(defn migrate-up-if-needed!
"Run any unrun `liquibase` migrations, if needed." (defn holding-lock?
[^Liquibase liquibase ^DataSource data-source] "Check whether the given Liquibase instance is already holding the database migration lock."
(log/info (trs "Checking if Database has unrun migrations...")) [liquibase]
(if (seq (unrun-migrations data-source)) (.hasChangeLogLock (lock-service liquibase)))
(do
(log/info (trs "Database has unrun migrations. Checking if migraton lock is taken...")) (def ^:private ^:dynamic *lock-depth* 0)
(wait-for-migration-lock-to-be-cleared liquibase)
;; while we were waiting for the lock, it was possible that another instance finished the migration(s), so make (defn- assert-locked [liquibase]
;; sure something still needs to be done... (when-not (holding-lock? liquibase)
(let [to-run-migrations (unrun-migrations data-source) (throw (ex-info "This operation requires a hold on the liquibase migration lock."
unrun-migrations-count (count to-run-migrations)] {:lock-exists? (migration-lock-exists? liquibase)
(if (pos? unrun-migrations-count) ;; It's possible that the lock was accidentally released by an operation, or force released by
(let [^Contexts contexts nil ;; another process, so its useful for debugging to know whether we were still within a locked
start-time (System/currentTimeMillis)] ;; scope.
(log/info (trs "Running {0} migrations ..." unrun-migrations-count)) :lock-depth *lock-depth*}))))
(doseq [^ChangeSet change to-run-migrations]
(log/tracef "To run migration %s" (.getId change)))
(.update liquibase contexts)
(log/info (trs "Migration complete in {0}" (u/format-milliseconds (- (System/currentTimeMillis) start-time)))))
(log/info
(trs "Migration lock cleared, but nothing to do here! Migrations were finished by another instance.")))))
(log/info (trs "No unrun migrations found."))))
(defn run-in-scope-locked (defn run-in-scope-locked
"Run function `f` in a scope on the Liquibase instance `liquibase`. "Run function `f` in a scope on the Liquibase instance `liquibase`.
Liquibase scopes are used to hold configuration and parameters (akin to binding dynamic variables in Liquibase scopes are used to hold configuration and parameters (akin to binding dynamic variables in
Clojure). This function initializes the database and the resource accessor which are often required." Clojure). This function initializes the database and the resource accessor which are often required.
In order to ensure that mutual exclusion of these scopes across all running Metabase instances, we take a lock
in the app database. It's the responsibility of inner functions which require the lock to call [[assert-locked]]."
[^Liquibase liquibase f] [^Liquibase liquibase f]
(let [database (.getDatabase liquibase) ;; Disallow nested locking in dev and CI, in order to force a clear lexical boundary where locking begins.
^LockService lock-service (.getLockService (LockServiceFactory/getInstance) database) ;; Inner functions that require the lock to be held should
scope-objects {(.name Scope$Attr/database) database (when (holding-lock? liquibase)
;; In somehow we encounter this situation in production, rather take a nested lock - it is re-entrant.
(when-not config/is-prod?
(throw (LockException. "Attempted to take a Liquibase lock, but we already are holding it."))))
(let [database (.getDatabase liquibase)
scope-objects {(.name Scope$Attr/database) database
(.name Scope$Attr/resourceAccessor) (.getResourceAccessor liquibase)}] (.name Scope$Attr/resourceAccessor) (.getResourceAccessor liquibase)}]
(Scope/child ^Map scope-objects (Scope/child ^Map scope-objects
(reify Scope$ScopedRunner (reify Scope$ScopedRunner
(run [_] (run [_]
(.waitForLock lock-service) (wait-for-migration-lock liquibase)
(try (try
(f) (binding [*lock-depth* (inc *lock-depth*)]
(f))
(finally (finally
(.releaseLock lock-service)))))))) (when (zero? *lock-depth*)
(.releaseLock (lock-service liquibase))))))))))
(defmacro with-scope-locked
"Run `body` in a scope on the Liquibase instance `liquibase`.
Liquibase scopes are used to hold configuration and parameters (akin to binding dynamic variables in
Clojure). This function initializes the database and the resource accessor which are often required.
The underlying locks are re-entrant, so it is safe to nest these blocks."
{:style/indent 1}
[liquibase & body]
`(run-in-scope-locked ~liquibase (fn [] ~@body)))
(defn migrate-up-if-needed!
"Run any unrun `liquibase` migrations, if needed."
[^Liquibase liquibase ^DataSource data-source]
(log/info (trs "Checking if Database has unrun migrations..."))
(if (seq (unrun-migrations data-source))
(do
(log/info (trs "Database has unrun migrations. Checking if migration lock is taken..."))
(with-scope-locked liquibase
;; while we were waiting for the lock, it was possible that another instance finished the migration(s), so make
;; sure something still needs to be done...
(let [to-run-migrations (unrun-migrations data-source)
unrun-migrations-count (count to-run-migrations)]
(if (pos? unrun-migrations-count)
(let [^Contexts contexts nil
start-time (System/currentTimeMillis)]
(log/info (trs "Running {0} migrations ..." unrun-migrations-count))
(doseq [^ChangeSet change to-run-migrations]
(log/tracef "To run migration %s" (.getId change)))
(.update liquibase contexts)
(log/info (trs "Migration complete in {0}" (u/format-milliseconds (- (System/currentTimeMillis) start-time)))))
(log/info
(trs "Migration lock cleared, but nothing to do here! Migrations were finished by another instance."))))))
(log/info (trs "No unrun migrations found."))))
(defn update-with-change-log (defn update-with-change-log
"Run update with the change log instances in `liquibase`." "Run update with the change log instances in `liquibase`. Must be called within a scope holding the liquibase lock."
([liquibase] ([liquibase]
(update-with-change-log liquibase {})) (update-with-change-log liquibase {}))
([^Liquibase liquibase ([^Liquibase liquibase
{:keys [^List change-set-filters exec-listener] {:keys [^List change-set-filters exec-listener]
:or {change-set-filters []}}] :or {change-set-filters []}}]
(assert-locked liquibase)
(let [change-log (.getDatabaseChangeLog liquibase) (let [change-log (.getDatabaseChangeLog liquibase)
database (.getDatabase liquibase) database (.getDatabase liquibase)
log-iterator (ChangeLogIterator. change-log ^"[Lliquibase.changelog.filter.ChangeSetFilter;" (into-array ChangeSetFilter change-set-filters)) log-iterator (ChangeLogIterator. change-log ^"[Lliquibase.changelog.filter.ChangeSetFilter;" (into-array ChangeSetFilter change-set-filters))
update-visitor (UpdateVisitor. database ^ChangeExecListener exec-listener) update-visitor (UpdateVisitor. database ^ChangeExecListener exec-listener)
runtime-env (RuntimeEnvironment. database (Contexts.) nil)] runtime-env (RuntimeEnvironment. database (Contexts.) nil)]
(run-in-scope-locked (.run ^ChangeLogIterator log-iterator update-visitor runtime-env))))
liquibase
#(.run ^ChangeLogIterator log-iterator update-visitor runtime-env)))))
(mu/defn force-migrate-up-if-needed! (mu/defn force-migrate-up-if-needed!
"Force migrating up. This does three things differently from [[migrate-up-if-needed!]]: "Force migrating up. This does three things differently from [[migrate-up-if-needed!]]:
...@@ -287,39 +324,47 @@ ...@@ -287,39 +324,47 @@
#3295." #3295."
[^Liquibase liquibase :- (ms/InstanceOfClass Liquibase) [^Liquibase liquibase :- (ms/InstanceOfClass Liquibase)
^DataSource data-source :- (ms/InstanceOfClass DataSource)] ^DataSource data-source :- (ms/InstanceOfClass DataSource)]
;; have to do this before clear the checksums else it will wait for locks to be released ;; We should have already released the lock before consolidating the changelog, but include this statement again
;; here to avoid depending on that non-local implementation detail. It is possible that the lock has been taken
;; again by another process before we reach this, and it's even possible that we lose yet *another* race again
;; between the next two lines, but we accept the risk of blocking in that latter case rather than complicating things
;; further.
(release-lock-if-needed! liquibase) (release-lock-if-needed! liquibase)
;; This implicitly clears the lock, so it needs to execute first.
(.clearCheckSums liquibase) (.clearCheckSums liquibase)
(when (seq (unrun-migrations data-source)) (with-scope-locked liquibase
(let [change-log (.getDatabaseChangeLog liquibase) (when (seq (unrun-migrations data-source))
fail-on-errors (mapv (fn [^ChangeSet change-set] [change-set (.getFailOnError change-set)]) (let [change-log (.getDatabaseChangeLog liquibase)
(.getChangeSets change-log)) fail-on-errors (mapv (fn [^ChangeSet change-set] [change-set (.getFailOnError change-set)])
exec-listener (proxy [AbstractChangeExecListener] [] (.getChangeSets change-log))
(willRun [^ChangeSet change-set _database-change-log _database _run-status] exec-listener (proxy [AbstractChangeExecListener] []
(when (instance? ChangeSet change-set) (willRun [^ChangeSet change-set _database-change-log _database _run-status]
(log/info (format "Start executing migration with id %s" (.getId change-set))))) (when (instance? ChangeSet change-set)
(log/info (format "Start executing migration with id %s" (.getId change-set)))))
(runFailed [^ChangeSet change-set _database-change-log _database ^Exception e]
(log/error (u/format-color 'red "[ERROR] %s" (.getMessage e)))) (runFailed [^ChangeSet _change-set _database-change-log _database ^Exception e]
(log/error (u/format-color 'red "[ERROR] %s" (.getMessage e))))
(ran [change-set _database-change-log _database ^ChangeSet$ExecType exec-type]
(when (instance? ChangeSet change-set) (ran [change-set _database-change-log _database ^ChangeSet$ExecType exec-type]
(condp = exec-type (when (instance? ChangeSet change-set)
ChangeSet$ExecType/EXECUTED (condp = exec-type
(log/info (u/format-color 'green "[SUCCESS]")) ChangeSet$ExecType/EXECUTED
(log/info (u/format-color 'green "[SUCCESS]"))
ChangeSet$ExecType/FAILED
(log/error (u/format-color 'red "[ERROR]")) ChangeSet$ExecType/FAILED
(log/error (u/format-color 'red "[ERROR]"))
(log/info (format "[%s]" (.name exec-type)))))))]
(try (log/info (format "[%s]" (.name exec-type)))))))]
(doseq [^ChangeSet change-set (.getChangeSets change-log)] (try
(.setFailOnError change-set false)) (doseq [^ChangeSet change-set (.getChangeSets change-log)]
(update-with-change-log liquibase {:exec-listener exec-listener}) (.setFailOnError change-set false))
(finally (update-with-change-log liquibase {:exec-listener exec-listener})
(doseq [[^ChangeSet change-set fail-on-error?] fail-on-errors] (finally
(.setFailOnError change-set fail-on-error?))))))) (doseq [[^ChangeSet change-set fail-on-error?] fail-on-errors]
(.setFailOnError change-set fail-on-error?))))))))
(def ^:private legacy-migrations-file "migrations/000_legacy_migrations.yaml")
(def ^:private update-migrations-file "migrations/001_update_migrations.yaml")
(mu/defn consolidate-liquibase-changesets! (mu/defn consolidate-liquibase-changesets!
"Consolidate all previous DB migrations so they come from single file. "Consolidate all previous DB migrations so they come from single file.
...@@ -330,16 +375,25 @@ ...@@ -330,16 +375,25 @@
See https://github.com/metabase/metabase/issues/3715 See https://github.com/metabase/metabase/issues/3715
Also see https://github.com/metabase/metabase/pull/34400" Also see https://github.com/metabase/metabase/pull/34400"
[conn :- (ms/InstanceOfClass java.sql.Connection)] [conn :- (ms/InstanceOfClass java.sql.Connection)
liquibase :- (ms/InstanceOfClass Liquibase)]
(let [liquibase-table-name (changelog-table-name conn) (let [liquibase-table-name (changelog-table-name conn)
statement (format "UPDATE %s SET FILENAME = CASE WHEN ID = ? THEN ? WHEN ID < ? THEN ? ELSE ? END" liquibase-table-name)] conn-spec {:connection conn}]
(when-not (fresh-install? conn) (when-not (fresh-install? conn)
(jdbc/execute! ;; Skip mutating the table if the filenames are already correct. It assumes we have never moved the boundary
{:connection conn} ;; between the two files, i.e. that update-migrations still start from v45.
[statement (when-not (= #{legacy-migrations-file update-migrations-file}
"v00.00-000" "migrations/001_update_migrations.yaml" (->> (str "SELECT DISTINCT(FILENAME) AS filename FROM " liquibase-table-name)
"v45.00-001" "migrations/000_legacy_migrations.yaml" (jdbc/query conn-spec)
"migrations/001_update_migrations.yaml"])))) (into #{} (map :filename))))
(log/info "Updating liquibase table to reflect consolidated changeset filenames")
(with-scope-locked liquibase
(jdbc/execute!
conn-spec
[(format "UPDATE %s SET FILENAME = CASE WHEN ID = ? THEN ? WHEN ID < ? THEN ? ELSE ? END" liquibase-table-name)
"v00.00-000" update-migrations-file
"v45.00-001" legacy-migrations-file
update-migrations-file]))))))
(defn- extract-numbers (defn- extract-numbers
"Returns contiguous integers parsed from string s" "Returns contiguous integers parsed from string s"
...@@ -359,13 +413,14 @@ ...@@ -359,13 +413,14 @@
(throw (IllegalArgumentException. (throw (IllegalArgumentException.
(format "target version must be a number between 44 and the previous major version (%d), inclusive" (format "target version must be a number between 44 and the previous major version (%d), inclusive"
(config/current-major-version))))) (config/current-major-version)))))
;; count and rollback only the applied change set ids which come after the target version (only the "v..." IDs need to be considered) (with-scope-locked liquibase
(let [changeset-query (format "SELECT id FROM %s WHERE id LIKE 'v%%' ORDER BY ORDEREXECUTED ASC" (changelog-table-name conn)) ;; count and rollback only the applied change set ids which come after the target version (only the "v..." IDs need to be considered)
changeset-ids (map :id (jdbc/query {:connection conn} [changeset-query])) (let [changeset-query (format "SELECT id FROM %s WHERE id LIKE 'v%%' ORDER BY ORDEREXECUTED ASC" (changelog-table-name conn))
;; IDs in changesets do not include the leading 0/1 digit, so the major version is the first number changeset-ids (map :id (jdbc/query {:connection conn} [changeset-query]))
ids-to-drop (drop-while #(not= (inc target-version) (first (extract-numbers %))) changeset-ids)] ;; IDs in changesets do not include the leading 0/1 digit, so the major version is the first number
(log/infof "Rolling back app database schema to version %d" target-version) ids-to-drop (drop-while #(not= (inc target-version) (first (extract-numbers %))) changeset-ids)]
(.rollback liquibase (count ids-to-drop) "")))) (log/infof "Rolling back app database schema to version %d" target-version)
(.rollback liquibase (count ids-to-drop) "")))))
(defn latest-applied-major-version (defn latest-applied-major-version
"Gets the latest version that was applied to the database." "Gets the latest version that was applied to the database."
......
...@@ -74,7 +74,13 @@ ...@@ -74,7 +74,13 @@
(log/info (trs "Setting up Liquibase...")) (log/info (trs "Setting up Liquibase..."))
(liquibase/with-liquibase [liquibase conn] (liquibase/with-liquibase [liquibase conn]
(try (try
(liquibase/consolidate-liquibase-changesets! conn) ;; Consolidating the changeset requires the lock, so we may need to release it first.
(when (= :force direction)
(liquibase/release-lock-if-needed! liquibase))
;; Releasing the locks does not depend on the changesets, so we skip this step as it might require locking.
(when-not (= :release-locks direction)
(liquibase/consolidate-liquibase-changesets! conn liquibase))
(log/info (trs "Liquibase is ready.")) (log/info (trs "Liquibase is ready."))
(case direction (case direction
:up (liquibase/migrate-up-if-needed! liquibase data-source) :up (liquibase/migrate-up-if-needed! liquibase data-source)
......
...@@ -67,9 +67,9 @@ ...@@ -67,9 +67,9 @@
;; fake a db where we ran all the migrations, including the legacy ones ;; fake a db where we ran all the migrations, including the legacy ones
(with-redefs [liquibase/decide-liquibase-file (fn [& _args] @#'liquibase/changelog-legacy-file)] (with-redefs [liquibase/decide-liquibase-file (fn [& _args] @#'liquibase/changelog-legacy-file)]
(liquibase/with-liquibase [liquibase conn] (liquibase/with-liquibase [liquibase conn]
(.update liquibase "")) (.update liquibase "")
(t2/update! (liquibase/changelog-table-name conn) {:filename "migrations/000_migrations.yaml"}) (t2/update! (liquibase/changelog-table-name conn) {:filename "migrations/000_migrations.yaml"})
(liquibase/consolidate-liquibase-changesets! conn) (liquibase/consolidate-liquibase-changesets! conn liquibase))
(testing "makes sure the change log filename are correctly set" (testing "makes sure the change log filename are correctly set"
(is (= (set (liquibase-file->included-ids "migrations/000_legacy_migrations.yaml" driver/*driver*)) (is (= (set (liquibase-file->included-ids "migrations/000_legacy_migrations.yaml" driver/*driver*))
(t2/select-fn-set :id (liquibase/changelog-table-name conn) :filename "migrations/000_legacy_migrations.yaml"))) (t2/select-fn-set :id (liquibase/changelog-table-name conn) :filename "migrations/000_legacy_migrations.yaml")))
......
...@@ -189,14 +189,12 @@ ...@@ -189,14 +189,12 @@
accept?) accept?)
(ChangeSetFilterResult. accept? "decision according to range" (class this)))))] (ChangeSetFilterResult. accept? "decision according to range" (class this)))))]
change-log-service (.getChangeLogService (ChangeLogHistoryServiceFactory/getInstance) database)] change-log-service (.getChangeLogService (ChangeLogHistoryServiceFactory/getInstance) database)]
(liquibase/run-in-scope-locked (liquibase/with-scope-locked liquibase
liquibase ;; Calling .listUnrunChangeSets has the side effect of creating the Liquibase tables
(fn [] ;; and initializing checksums so that they match the ones generated in production.
;; Calling .listUnrunChangeSets has the side effect of creating the Liquibase tables (.listUnrunChangeSets liquibase nil (LabelExpression.))
;; and initializing checksums so that they match the ones generated in production. (.generateDeploymentId change-log-service)
(.listUnrunChangeSets liquibase nil (LabelExpression.)) (liquibase/update-with-change-log liquibase {:change-set-filters change-set-filters})))))
(.generateDeploymentId change-log-service)
(liquibase/update-with-change-log liquibase {:change-set-filters change-set-filters}))))))
(defn- test-migrations-for-driver [driver [start-id end-id] f] (defn- test-migrations-for-driver [driver [start-id end-id] f]
(log/debug (u/format-color 'yellow "Testing migrations for driver %s..." driver)) (log/debug (u/format-color 'yellow "Testing migrations for driver %s..." driver))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment