From 0e9b99f934fb0a0648502fe56301215c61c20246 Mon Sep 17 00:00:00 2001 From: Chris Truter <crisptrutski@users.noreply.github.com> Date: Fri, 16 Feb 2024 18:38:57 +0200 Subject: [PATCH] Create X-or-insert! methods and use them for FieldValues (#38760) --- src/metabase/api/field.clj | 22 ++- src/metabase/db/util.clj | 87 +++++++++++- src/metabase/models/field_values.clj | 11 +- src/metabase/models/params/field_values.clj | 49 ++++--- src/metabase/util.cljc | 10 ++ test/metabase/db/util_test.clj | 134 ++++++++++++++++++ .../models/params/chain_filter_test.clj | 4 +- test/metabase/util_test.cljc | 6 + 8 files changed, 277 insertions(+), 46 deletions(-) create mode 100644 test/metabase/db/util_test.clj diff --git a/src/metabase/api/field.clj b/src/metabase/api/field.clj index 2d9a8a7f5cb..48023b0069a 100644 --- a/src/metabase/api/field.clj +++ b/src/metabase/api/field.clj @@ -4,6 +4,7 @@ [compojure.core :refer [DELETE GET POST PUT]] [metabase.api.common :as api] [metabase.db.metadata-queries :as metadata-queries] + [metabase.db.util :as mdb.u] [metabase.lib.schema.metadata :as lib.schema.metadata] [metabase.models.dimension :refer [Dimension]] [metabase.models.field :as field :refer [Field]] @@ -12,7 +13,7 @@ [metabase.models.params.chain-filter :as chain-filter] [metabase.models.params.field-values :as params.field-values] [metabase.models.permissions :as perms] - [metabase.models.table :as table :refer [Table]] + [metabase.models.table :refer [Table]] [metabase.query-processor :as qp] [metabase.related :as related] [metabase.server.middleware.offset-paging :as mw.offset-paging] @@ -297,12 +298,6 @@ [400 "If remapped values are specified, they must be specified for all field values"]) has-human-readable-values?)) -(defn- update-field-values! [field-value-id update-map] - (api/check-500 (pos? (t2/update! FieldValues field-value-id update-map)))) - -(defn- create-field-values! [field-or-id update-map] - (t2/insert! FieldValues (assoc update-map :type :full :field_id (u/the-id field-or-id)))) - (api/defendpoint POST "/:id/values" "Update the fields values and human-readable values for a `Field` whose semantic type is `category`/`city`/`state`/`country` or whose base type is `type/Boolean`. The human-readable values are optional." @@ -314,13 +309,12 @@ [400 (str "You can only update the human readable values of a mapped values of a Field whose value of " "`has_field_values` is `list` or whose 'base_type' is 'type/Boolean'.")]) (let [human-readable-values? (validate-human-readable-pairs value-pairs) - update-map {:values (map first value-pairs) - :human_readable_values (when human-readable-values? - (map second value-pairs))}] - (t2/with-transaction [_conn] - (if-let [field-value-id (:id (field-values/get-latest-full-field-values id))] - (update-field-values! field-value-id update-map) - (create-field-values! field update-map))))) + update-map {:values (map first value-pairs) + :human_readable_values (when human-readable-values? + (map second value-pairs))} + updated-pk (mdb.u/update-or-insert! FieldValues {:field_id (u/the-id field), :type :full} + (constantly update-map))] + (api/check-500 (pos? updated-pk)))) {:status :success}) (api/defendpoint POST "/:id/rescan_values" diff --git a/src/metabase/db/util.clj b/src/metabase/db/util.clj index 6a2e8e17a72..211e8d0e46b 100644 --- a/src/metabase/db/util.clj +++ b/src/metabase/db/util.clj @@ -5,7 +5,9 @@ [metabase.util.malli :as mu] [metabase.util.malli.schema :as ms] [toucan2.core :as t2] - [toucan2.model :as t2.model])) + [toucan2.model :as t2.model]) + (:import + (clojure.lang ExceptionInfo))) (defn toucan-model? "Check if `model` is a toucan model." @@ -66,3 +68,86 @@ ;; [:in :semantic_type #{"type/URL" "type/ImageURL"}] ([expr type-keyword] [:in expr (type-keyword->descendants type-keyword)])) + +(defmacro with-conflict-retry + "Retry a database mutation a single time if it fails due to concurrent insertions. + May retry for other reasons." + [& body] + `(try + ~@body + (catch ExceptionInfo e# + ;; The underlying exception thrown by the driver is database specific and opaque, so we treat any exception as a + ;; possible database conflict due to a concurrent insert. If we want to be more conservative, we would need + ;; a per-driver or driver agnostic way to test the exception. + ~@body))) + +(defn select-or-insert! + "Return a database record if it exists, otherwise create it. + + The `select-map` is used to query the `model`, and if a result is found it is immediately returned. + If no value is found, `insert-fn` is called to generate the entity to be inserted. + + Note that this generated entity must be consistent with `select-map`, if it disagrees on any keys then an exception + will be thrown. It is OK for the entity to omit fields from `select-map`, they will implicitly be added on. + + This is more general than using `UPSERT`, `MERGE` or `INSERT .. ON CONFLICT`, and it also allows one to avoid + calculating initial values that may be expensive, or require side effects. + + In the case where there is an underlying db constraint to prevent duplicates, this method takes care of handling + rejection from the database due to a concurrent insert, and will retry a single time to pick up the existing row. + This may result in `insert-fn` being called a second time. + + In the case where there is no underlying db constraint, concurrent calls may still result in duplicates. + To prevent this in a database agnostic way, during an existing non-serializable transaction, would be non-trivial." + [model select-map insert-fn] + (let [select-kvs (mapcat identity select-map) + insert-fn #(let [instance (insert-fn)] + ;; the inserted values must be consistent with the select query + (assert (not (u/conflicting-keys? select-map instance)) + "this should not be used to change any of the identifying values") + ;; for convenience, we allow insert-fn's result to omit fields in the search-map + (merge instance select-map))] + (with-conflict-retry + (or (apply t2/select-one model select-kvs) + (t2/insert-returning-instance! model (insert-fn)))))) + +(defn update-or-insert! + "Update a database record, if it exists, otherwise create it. + + The `select-map` is used to query the `model`, and if a result is found then we will update that entity, otherwise + a new entity will be created. We use `update-fn` to calculate both updates and initial values - in the first case + it will be called with the existing value, and in the second case it will be called with nil, analogous to the way + that [[clojure.core/update]] calls its function. + + Note that the generated entity must be consistent with `select-map`, if it disagrees on any keys then an exception + will be thrown. It is OK for the entity to omit fields from `select-map`, they will implicitly be added on. + + This is more general than using `UPSERT`, `MERGE` or `INSERT .. ON CONFLICT`, and it also allows one to avoid + calculating initial values that may be expensive, or require side effects. + + In the case where there is an underlying db constraint to prevent duplicates, this method takes care of handling + rejection from the database due to a concurrent insert, and will retry a single time to pick up the existing row. + This may result in `update-fn` being called a second time. + + In the case where there is no underlying db constraint, concurrent calls may still result in duplicates. + To prevent this in a database agnostic way, during an existing non-serializable transaction, would be non-trivial." + [model select-map update-fn] + (let [select-kvs (mapcat identity select-map) + pks (t2/primary-keys model) + _ (assert (= 1 (count pks)) "This helper does not currently support compound keys") + pk-key (keyword (first pks)) + update-fn (fn [existing] + (let [updated (update-fn existing)] + ;; the inserted / updated values must be consistent with the select query + (assert (not (u/conflicting-keys? select-map updated)) + "This should not be used to change any of the identifying values") + ;; For convenience, we allow the update-fn to omit fields in the search-map + (merge updated select-map)))] + (with-conflict-retry + (if-let [existing (apply t2/select-one model select-kvs)] + (let [pk (pk-key existing) + updated (update-fn existing)] + (t2/update! model pk updated) + ;; the private key may have been changed by the update, and this is OK. + (pk-key updated pk)) + (t2/insert-returning-pk! model (update-fn nil)))))) diff --git a/src/metabase/models/field_values.clj b/src/metabase/models/field_values.clj index c4b1f29bf88..31e2af5a3a6 100644 --- a/src/metabase/models/field_values.clj +++ b/src/metabase/models/field_values.clj @@ -27,6 +27,7 @@ [java-time.api :as t] [malli.core :as mc] [medley.core :as m] + [metabase.db.util :as mdb.u] [metabase.models.interface :as mi] [metabase.models.serialization :as serdes] [metabase.plugins.classloader :as classloader] @@ -443,12 +444,10 @@ unwrapped-values (do (log/debug (trs "Storing FieldValues for Field {0}..." field-name)) - (t2/insert! FieldValues - :type :full - :field_id (u/the-id field) - :has_more_values has_more_values - :values values - :human_readable_values human-readable-values) + (mdb.u/select-or-insert! FieldValues {:field_id (u/the-id field), :type :full} + (constantly {:has_more_values has_more_values + :values values + :human_readable_values human-readable-values})) ::fv-created) ;; otherwise this Field isn't eligible, so delete any FieldValues that might exist diff --git a/src/metabase/models/params/field_values.clj b/src/metabase/models/params/field_values.clj index 96db237cfd2..f493d07b00b 100644 --- a/src/metabase/models/params/field_values.clj +++ b/src/metabase/models/params/field_values.clj @@ -3,6 +3,7 @@ values (`GET /api/field/:id/values`) endpoint; used by the chain filter endpoints under certain circumstances." (:require [medley.core :as m] + [metabase.db.util :as mdb.u] [metabase.models.field :as field] [metabase.models.field-values :as field-values :refer [FieldValues]] [metabase.models.interface :as mi] @@ -100,26 +101,24 @@ :impersonation (field-values/hash-key-for-impersonation field-id))) -(defn create-advanced-field-values! - "Fetch and create a FieldValues for `field` with type `fv-type`. - The human_readable_values of Advanced FieldValues will be automatically fixed up based on the - list of values and human_readable_values of the full FieldValues of the same field." +(defn prepare-advanced-field-values + "Fetch and construct the FieldValues for `field` with type `fv-type`. This does not do any insertion. + The human_readable_values of Advanced FieldValues will be automatically fixed up based on the + list of values and human_readable_values of the full FieldValues of the same field." [fv-type field hash-key constraints] - (when-let [{wrapped-values :values - :keys [has_more_values]} (fetch-advanced-field-values fv-type field constraints)] + (when-let [{wrapped-values :values :keys [has_more_values]} + (fetch-advanced-field-values fv-type field constraints)] (let [;; each value in `wrapped-values` is a 1-tuple, so unwrap the raw values for storage values (map first wrapped-values) - ;; If the full FieldValues of this field has a human-readable-values, fix it with the new values - human-readable-values (field-values/fixup-human-readable-values - (field-values/get-latest-full-field-values (:id field)) - values)] - (first (t2/insert-returning-instances! FieldValues - :field_id (:id field) - :type fv-type - :hash_key hash-key - :has_more_values has_more_values - :human_readable_values human-readable-values - :values values))))) + ;; If the full FieldValues of this field have human-readable-values, ensure that we reuse them + full-field-values (field-values/get-latest-full-field-values (:id field)) + human-readable-values (field-values/fixup-human-readable-values full-field-values values)] + {:field_id (:id field) + :type fv-type + :hash_key hash-key + :has_more_values has_more_values + :human_readable_values human-readable-values + :values values}))) (defn get-or-create-advanced-field-values! "Fetch an Advanced FieldValues with type `fv-type` for a `field`, creating them if needed. @@ -128,16 +127,20 @@ (get-or-create-advanced-field-values! fv-type field nil)) ([fv-type field constraints] - (let [hash-key (hash-key-for-advanced-field-values fv-type (:id field) constraints) - fv (or (field-values/get-latest-field-values (:id field) fv-type hash-key) - (create-advanced-field-values! fv-type field hash-key constraints))] + (let [hash-key (hash-key-for-advanced-field-values fv-type (:id field) constraints) + select-kvs {:field_id (:id field) :type fv-type :hash_key hash-key} + fv (mdb.u/select-or-insert! :model/FieldValues select-kvs + #(prepare-advanced-field-values fv-type field hash-key constraints))] (cond (nil? fv) nil ;; If it's expired, delete then try to re-create it - (field-values/advanced-field-values-expired? fv) (do - (t2/delete! FieldValues :id (:id fv)) - (recur fv-type field constraints)) + (field-values/advanced-field-values-expired? fv) + (do + ;; It's possible another process has already recalculated this, but spurious recalculations are OK. + (t2/delete! FieldValues :id (:id fv)) + (recur fv-type field constraints)) + :else fv)))) ;;; +----------------------------------------------------------------------------------------------------------------+ diff --git a/src/metabase/util.cljc b/src/metabase/util.cljc index 8eaaaa52d9a..a470f3d58a0 100644 --- a/src/metabase/util.cljc +++ b/src/metabase/util.cljc @@ -879,3 +879,13 @@ "A reversed java.util.Comparator, useful for sorting elements in descending in order" [x y] (compare y x)) + +(defn conflicting-keys + "Given two maps, return a seq of the keys on which they disagree. We only consider keys that are present in both." + [m1 m2] + (keep (fn [[k v]] (when (not= v (get m1 k v)) k)) m2)) + +(defn conflicting-keys? + "Given two maps, are any keys on which they disagree? We only consider keys that are present in both." + [m1 m2] + (boolean (some identity (conflicting-keys m1 m2)))) diff --git a/test/metabase/db/util_test.clj b/test/metabase/db/util_test.clj new file mode 100644 index 00000000000..51a92092be0 --- /dev/null +++ b/test/metabase/db/util_test.clj @@ -0,0 +1,134 @@ +(ns metabase.db.util-test + (:require + [clojure.set :as set] + [clojure.test :refer [deftest testing is]] + [metabase.db.util :as mdb.u] + [metabase.models.setting :refer [Setting]] + [metabase.util :as u] + [toucan2.core :as t2]) + (:import + (java.util.concurrent CountDownLatch))) + +(set! *warn-on-reflection* true) + +(defn- repeat-concurrently [n f] + ;; Use a latch to ensure that the functions start as close to simultaneously as possible. + (let [latch (CountDownLatch. n) + futures (atom [])] + (dotimes [_ n] + (swap! futures conj (future (.countDown latch) + (.await latch) + (f)))) + (into #{} (map deref) @futures))) + +(deftest select-or-insert!-test + ;; We test both a case where the database protects against duplicates, and where it does not. + ;; Using Setting is perfect because it has only two required fields - (the primary) key & value (with no constraint). + ;; + ;; In the `:key` case using the `idempotent-insert!` rather than an `or` prevents the from application throwing an + ;; exception when there are race conditions. For `:value` it prevents us silently inserting duplicates. + ;; + ;; It's important to test both, as only the latter has a phantom read issue and thus requires serializable isolation. + (let [columns [:key :value]] + (doseq [search-col columns] + (testing (format "When the search column %s a uniqueness constraint in the db" + (if (= :key search-col) "has" "does not have")) + (let [search-value (str (random-uuid)) + other-col (first (remove #{search-col} columns))] + (try + ;; ensure there is no database detritus to trip us up + (t2/delete! Setting search-col search-value) + + (let [threads 5 + latch (CountDownLatch. threads) + thunk (fn [] + (mdb.u/select-or-insert! Setting {search-col search-value} + (fn [] + ;; Make sure all the threads are in the mutating path + (.countDown latch) + (.await latch) + {other-col (str (random-uuid))}))) + results (repeat-concurrently threads thunk) + n (count results) + latest (t2/select-one Setting search-col search-value)] + + (case search-col + :key + (do (testing "every call returns the same row" + (is (= #{latest} results))) + + (testing "we never insert any duplicates" + (is (= 1 (t2/count Setting search-col search-value)))) + + (testing "later calls just return the existing row as well" + (is (= latest (thunk))) + (is (= 1 (t2/count Setting search-col search-value))))) + + :value + (do + (testing "there may be race conditions, but we insert at least once" + (is (pos? n))) + + (testing "we returned the same values that were inserted into the database" + (is (= results (set (t2/select Setting search-col search-value))))) + + (testing "later calls just return an existing row as well" + (is (contains? results (thunk))) + (is (= results (set (t2/select Setting search-col search-value)))))))) + + ;; Since we couldn't use with-temp, we need to clean up manually. + (finally + (t2/delete! Setting search-col search-value)))))))) + +(deftest updated-or-insert!-test + ;; We test both a case where the database protects against duplicates, and where it does not. + ;; Using Setting is perfect because it has only two required fields - (the primary) key & value (with no constraint). + (let [columns [:key :value]] + (doseq [search-col columns] + (testing (format "When the search column %s a uniqueness constraint in the db" + (if (= :key search-col) "has" "does not have")) + (doseq [already-exists? [true false]] + (let [search-value (str (random-uuid)) + other-col (first (remove #{search-col} columns)) + other-value (str (random-uuid))] + (try + ;; ensure there is no database detritus to trip us up + (t2/delete! Setting search-col search-value) + + (when already-exists? + (t2/insert! Setting search-col search-value other-col other-value)) + + (let [threads 5 + latch (CountDownLatch. threads) + thunk (fn [] + (u/prog1 (str (random-uuid)) + (mdb.u/update-or-insert! Setting {search-col search-value} + (fn [_] + ;; Make sure all the threads are in the mutating path + (.countDown latch) + (.await latch) + {other-col <>})))) + values-set (repeat-concurrently threads thunk) + latest (get (t2/select-one Setting search-col search-value) other-col)] + + (testing "each update tried to set a different value" + (is (= threads (count values-set)))) + + ;; Unfortunately updates are not serialized, but we cannot show that without using a model with more + ;; than 2 fields. + (testing "the row is updated to match the last update call that resolved" + (is (not= other-value latest)) + (is (contains? values-set latest))) + + (when (or (= :key search-col) already-exists?) + (is (= 1 (count (t2/select Setting search-col search-value))))) + + (testing "After the database is created, it does not create further duplicates" + (let [count (t2/count Setting search-col search-value)] + (is (pos? count)) + (is (empty? (set/intersection values-set (repeat-concurrently threads thunk)))) + (is (= count (t2/count Setting search-col search-value)))))) + + ;; Since we couldn't use with-temp, we need to clean up manually. + (finally + (t2/delete! Setting search-col search-value))))))))) diff --git a/test/metabase/models/params/chain_filter_test.clj b/test/metabase/models/params/chain_filter_test.clj index 361f9b22601..be9c456e8a6 100644 --- a/test/metabase/models/params/chain_filter_test.clj +++ b/test/metabase/models/params/chain_filter_test.clj @@ -481,7 +481,7 @@ (testing "should created a full FieldValues when constraints is `nil`" ;; warm up the cache (chain-filter categories.name nil) - (with-redefs [params.field-values/create-advanced-field-values! (fn [& _args] + (with-redefs [params.field-values/prepare-advanced-field-values (fn [& _args] (assert false "Should not be called"))] (is (= {:values [["African"] ["American"] ["Artisan"]] :has_more_values false} @@ -493,7 +493,7 @@ (field-values/clear-advanced-field-values-for-field! field-id) ;; warm up the cache (chain-filter categories.name {venues.price 4}) - (with-redefs [params.field-values/create-advanced-field-values! (fn [& _args] + (with-redefs [params.field-values/prepare-advanced-field-values (fn [& _args] (assert false "Should not be called"))] (is (= {:values [["Japanese"] ["Steakhouse"]] :has_more_values false} diff --git a/test/metabase/util_test.cljc b/test/metabase/util_test.cljc index a0cda428094..5cc198fbd95 100644 --- a/test/metabase/util_test.cljc +++ b/test/metabase/util_test.cljc @@ -457,3 +457,9 @@ 2 1250.04 1 1250.0 0 1250.0)) + +(deftest conflicting-keys-test + (is (= [] (u/conflicting-keys {:a 1 :b 2} + {:b 2 :c 3}))) + (is (= [:c :e] (u/conflicting-keys {:a 1 :b 2 :c 3 :e nil} + {:b 2 :c 4 :d 5 :e 6})))) -- GitLab