Skip to content
Snippets Groups Projects
Unverified Commit 0e9b99f9 authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

Create X-or-insert! methods and use them for FieldValues (#38760)

parent 65c881fe
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@
[compojure.core :refer [DELETE GET POST PUT]]
[metabase.api.common :as api]
[metabase.db.metadata-queries :as metadata-queries]
[metabase.db.util :as mdb.u]
[metabase.lib.schema.metadata :as lib.schema.metadata]
[metabase.models.dimension :refer [Dimension]]
[metabase.models.field :as field :refer [Field]]
......@@ -12,7 +13,7 @@
[metabase.models.params.chain-filter :as chain-filter]
[metabase.models.params.field-values :as params.field-values]
[metabase.models.permissions :as perms]
[metabase.models.table :as table :refer [Table]]
[metabase.models.table :refer [Table]]
[metabase.query-processor :as qp]
[metabase.related :as related]
[metabase.server.middleware.offset-paging :as mw.offset-paging]
......@@ -297,12 +298,6 @@
[400 "If remapped values are specified, they must be specified for all field values"])
has-human-readable-values?))
(defn- update-field-values! [field-value-id update-map]
(api/check-500 (pos? (t2/update! FieldValues field-value-id update-map))))
(defn- create-field-values! [field-or-id update-map]
(t2/insert! FieldValues (assoc update-map :type :full :field_id (u/the-id field-or-id))))
(api/defendpoint POST "/:id/values"
"Update the fields values and human-readable values for a `Field` whose semantic type is
`category`/`city`/`state`/`country` or whose base type is `type/Boolean`. The human-readable values are optional."
......@@ -314,13 +309,12 @@
[400 (str "You can only update the human readable values of a mapped values of a Field whose value of "
"`has_field_values` is `list` or whose 'base_type' is 'type/Boolean'.")])
(let [human-readable-values? (validate-human-readable-pairs value-pairs)
update-map {:values (map first value-pairs)
:human_readable_values (when human-readable-values?
(map second value-pairs))}]
(t2/with-transaction [_conn]
(if-let [field-value-id (:id (field-values/get-latest-full-field-values id))]
(update-field-values! field-value-id update-map)
(create-field-values! field update-map)))))
update-map {:values (map first value-pairs)
:human_readable_values (when human-readable-values?
(map second value-pairs))}
updated-pk (mdb.u/update-or-insert! FieldValues {:field_id (u/the-id field), :type :full}
(constantly update-map))]
(api/check-500 (pos? updated-pk))))
{:status :success})
(api/defendpoint POST "/:id/rescan_values"
......
......@@ -5,7 +5,9 @@
[metabase.util.malli :as mu]
[metabase.util.malli.schema :as ms]
[toucan2.core :as t2]
[toucan2.model :as t2.model]))
[toucan2.model :as t2.model])
(:import
(clojure.lang ExceptionInfo)))
(defn toucan-model?
"Check if `model` is a toucan model."
......@@ -66,3 +68,86 @@
;; [:in :semantic_type #{"type/URL" "type/ImageURL"}]
([expr type-keyword]
[:in expr (type-keyword->descendants type-keyword)]))
(defmacro with-conflict-retry
"Retry a database mutation a single time if it fails due to concurrent insertions.
May retry for other reasons."
[& body]
`(try
~@body
(catch ExceptionInfo e#
;; The underlying exception thrown by the driver is database specific and opaque, so we treat any exception as a
;; possible database conflict due to a concurrent insert. If we want to be more conservative, we would need
;; a per-driver or driver agnostic way to test the exception.
~@body)))
(defn select-or-insert!
"Return a database record if it exists, otherwise create it.
The `select-map` is used to query the `model`, and if a result is found it is immediately returned.
If no value is found, `insert-fn` is called to generate the entity to be inserted.
Note that this generated entity must be consistent with `select-map`, if it disagrees on any keys then an exception
will be thrown. It is OK for the entity to omit fields from `select-map`, they will implicitly be added on.
This is more general than using `UPSERT`, `MERGE` or `INSERT .. ON CONFLICT`, and it also allows one to avoid
calculating initial values that may be expensive, or require side effects.
In the case where there is an underlying db constraint to prevent duplicates, this method takes care of handling
rejection from the database due to a concurrent insert, and will retry a single time to pick up the existing row.
This may result in `insert-fn` being called a second time.
In the case where there is no underlying db constraint, concurrent calls may still result in duplicates.
To prevent this in a database agnostic way, during an existing non-serializable transaction, would be non-trivial."
[model select-map insert-fn]
(let [select-kvs (mapcat identity select-map)
insert-fn #(let [instance (insert-fn)]
;; the inserted values must be consistent with the select query
(assert (not (u/conflicting-keys? select-map instance))
"this should not be used to change any of the identifying values")
;; for convenience, we allow insert-fn's result to omit fields in the search-map
(merge instance select-map))]
(with-conflict-retry
(or (apply t2/select-one model select-kvs)
(t2/insert-returning-instance! model (insert-fn))))))
(defn update-or-insert!
"Update a database record, if it exists, otherwise create it.
The `select-map` is used to query the `model`, and if a result is found then we will update that entity, otherwise
a new entity will be created. We use `update-fn` to calculate both updates and initial values - in the first case
it will be called with the existing value, and in the second case it will be called with nil, analogous to the way
that [[clojure.core/update]] calls its function.
Note that the generated entity must be consistent with `select-map`, if it disagrees on any keys then an exception
will be thrown. It is OK for the entity to omit fields from `select-map`, they will implicitly be added on.
This is more general than using `UPSERT`, `MERGE` or `INSERT .. ON CONFLICT`, and it also allows one to avoid
calculating initial values that may be expensive, or require side effects.
In the case where there is an underlying db constraint to prevent duplicates, this method takes care of handling
rejection from the database due to a concurrent insert, and will retry a single time to pick up the existing row.
This may result in `update-fn` being called a second time.
In the case where there is no underlying db constraint, concurrent calls may still result in duplicates.
To prevent this in a database agnostic way, during an existing non-serializable transaction, would be non-trivial."
[model select-map update-fn]
(let [select-kvs (mapcat identity select-map)
pks (t2/primary-keys model)
_ (assert (= 1 (count pks)) "This helper does not currently support compound keys")
pk-key (keyword (first pks))
update-fn (fn [existing]
(let [updated (update-fn existing)]
;; the inserted / updated values must be consistent with the select query
(assert (not (u/conflicting-keys? select-map updated))
"This should not be used to change any of the identifying values")
;; For convenience, we allow the update-fn to omit fields in the search-map
(merge updated select-map)))]
(with-conflict-retry
(if-let [existing (apply t2/select-one model select-kvs)]
(let [pk (pk-key existing)
updated (update-fn existing)]
(t2/update! model pk updated)
;; the private key may have been changed by the update, and this is OK.
(pk-key updated pk))
(t2/insert-returning-pk! model (update-fn nil))))))
......@@ -27,6 +27,7 @@
[java-time.api :as t]
[malli.core :as mc]
[medley.core :as m]
[metabase.db.util :as mdb.u]
[metabase.models.interface :as mi]
[metabase.models.serialization :as serdes]
[metabase.plugins.classloader :as classloader]
......@@ -443,12 +444,10 @@
unwrapped-values
(do
(log/debug (trs "Storing FieldValues for Field {0}..." field-name))
(t2/insert! FieldValues
:type :full
:field_id (u/the-id field)
:has_more_values has_more_values
:values values
:human_readable_values human-readable-values)
(mdb.u/select-or-insert! FieldValues {:field_id (u/the-id field), :type :full}
(constantly {:has_more_values has_more_values
:values values
:human_readable_values human-readable-values}))
::fv-created)
;; otherwise this Field isn't eligible, so delete any FieldValues that might exist
......
......@@ -3,6 +3,7 @@
values (`GET /api/field/:id/values`) endpoint; used by the chain filter endpoints under certain circumstances."
(:require
[medley.core :as m]
[metabase.db.util :as mdb.u]
[metabase.models.field :as field]
[metabase.models.field-values :as field-values :refer [FieldValues]]
[metabase.models.interface :as mi]
......@@ -100,26 +101,24 @@
:impersonation
(field-values/hash-key-for-impersonation field-id)))
(defn create-advanced-field-values!
"Fetch and create a FieldValues for `field` with type `fv-type`.
The human_readable_values of Advanced FieldValues will be automatically fixed up based on the
list of values and human_readable_values of the full FieldValues of the same field."
(defn prepare-advanced-field-values
"Fetch and construct the FieldValues for `field` with type `fv-type`. This does not do any insertion.
The human_readable_values of Advanced FieldValues will be automatically fixed up based on the
list of values and human_readable_values of the full FieldValues of the same field."
[fv-type field hash-key constraints]
(when-let [{wrapped-values :values
:keys [has_more_values]} (fetch-advanced-field-values fv-type field constraints)]
(when-let [{wrapped-values :values :keys [has_more_values]}
(fetch-advanced-field-values fv-type field constraints)]
(let [;; each value in `wrapped-values` is a 1-tuple, so unwrap the raw values for storage
values (map first wrapped-values)
;; If the full FieldValues of this field has a human-readable-values, fix it with the new values
human-readable-values (field-values/fixup-human-readable-values
(field-values/get-latest-full-field-values (:id field))
values)]
(first (t2/insert-returning-instances! FieldValues
:field_id (:id field)
:type fv-type
:hash_key hash-key
:has_more_values has_more_values
:human_readable_values human-readable-values
:values values)))))
;; If the full FieldValues of this field have human-readable-values, ensure that we reuse them
full-field-values (field-values/get-latest-full-field-values (:id field))
human-readable-values (field-values/fixup-human-readable-values full-field-values values)]
{:field_id (:id field)
:type fv-type
:hash_key hash-key
:has_more_values has_more_values
:human_readable_values human-readable-values
:values values})))
(defn get-or-create-advanced-field-values!
"Fetch an Advanced FieldValues with type `fv-type` for a `field`, creating them if needed.
......@@ -128,16 +127,20 @@
(get-or-create-advanced-field-values! fv-type field nil))
([fv-type field constraints]
(let [hash-key (hash-key-for-advanced-field-values fv-type (:id field) constraints)
fv (or (field-values/get-latest-field-values (:id field) fv-type hash-key)
(create-advanced-field-values! fv-type field hash-key constraints))]
(let [hash-key (hash-key-for-advanced-field-values fv-type (:id field) constraints)
select-kvs {:field_id (:id field) :type fv-type :hash_key hash-key}
fv (mdb.u/select-or-insert! :model/FieldValues select-kvs
#(prepare-advanced-field-values fv-type field hash-key constraints))]
(cond
(nil? fv) nil
;; If it's expired, delete then try to re-create it
(field-values/advanced-field-values-expired? fv) (do
(t2/delete! FieldValues :id (:id fv))
(recur fv-type field constraints))
(field-values/advanced-field-values-expired? fv)
(do
;; It's possible another process has already recalculated this, but spurious recalculations are OK.
(t2/delete! FieldValues :id (:id fv))
(recur fv-type field constraints))
:else fv))))
;;; +----------------------------------------------------------------------------------------------------------------+
......
......@@ -879,3 +879,13 @@
"A reversed java.util.Comparator, useful for sorting elements in descending in order"
[x y]
(compare y x))
(defn conflicting-keys
"Given two maps, return a seq of the keys on which they disagree. We only consider keys that are present in both."
[m1 m2]
(keep (fn [[k v]] (when (not= v (get m1 k v)) k)) m2))
(defn conflicting-keys?
"Given two maps, are any keys on which they disagree? We only consider keys that are present in both."
[m1 m2]
(boolean (some identity (conflicting-keys m1 m2))))
(ns metabase.db.util-test
(:require
[clojure.set :as set]
[clojure.test :refer [deftest testing is]]
[metabase.db.util :as mdb.u]
[metabase.models.setting :refer [Setting]]
[metabase.util :as u]
[toucan2.core :as t2])
(:import
(java.util.concurrent CountDownLatch)))
(set! *warn-on-reflection* true)
(defn- repeat-concurrently [n f]
;; Use a latch to ensure that the functions start as close to simultaneously as possible.
(let [latch (CountDownLatch. n)
futures (atom [])]
(dotimes [_ n]
(swap! futures conj (future (.countDown latch)
(.await latch)
(f))))
(into #{} (map deref) @futures)))
(deftest select-or-insert!-test
;; We test both a case where the database protects against duplicates, and where it does not.
;; Using Setting is perfect because it has only two required fields - (the primary) key & value (with no constraint).
;;
;; In the `:key` case using the `idempotent-insert!` rather than an `or` prevents the from application throwing an
;; exception when there are race conditions. For `:value` it prevents us silently inserting duplicates.
;;
;; It's important to test both, as only the latter has a phantom read issue and thus requires serializable isolation.
(let [columns [:key :value]]
(doseq [search-col columns]
(testing (format "When the search column %s a uniqueness constraint in the db"
(if (= :key search-col) "has" "does not have"))
(let [search-value (str (random-uuid))
other-col (first (remove #{search-col} columns))]
(try
;; ensure there is no database detritus to trip us up
(t2/delete! Setting search-col search-value)
(let [threads 5
latch (CountDownLatch. threads)
thunk (fn []
(mdb.u/select-or-insert! Setting {search-col search-value}
(fn []
;; Make sure all the threads are in the mutating path
(.countDown latch)
(.await latch)
{other-col (str (random-uuid))})))
results (repeat-concurrently threads thunk)
n (count results)
latest (t2/select-one Setting search-col search-value)]
(case search-col
:key
(do (testing "every call returns the same row"
(is (= #{latest} results)))
(testing "we never insert any duplicates"
(is (= 1 (t2/count Setting search-col search-value))))
(testing "later calls just return the existing row as well"
(is (= latest (thunk)))
(is (= 1 (t2/count Setting search-col search-value)))))
:value
(do
(testing "there may be race conditions, but we insert at least once"
(is (pos? n)))
(testing "we returned the same values that were inserted into the database"
(is (= results (set (t2/select Setting search-col search-value)))))
(testing "later calls just return an existing row as well"
(is (contains? results (thunk)))
(is (= results (set (t2/select Setting search-col search-value))))))))
;; Since we couldn't use with-temp, we need to clean up manually.
(finally
(t2/delete! Setting search-col search-value))))))))
(deftest updated-or-insert!-test
;; We test both a case where the database protects against duplicates, and where it does not.
;; Using Setting is perfect because it has only two required fields - (the primary) key & value (with no constraint).
(let [columns [:key :value]]
(doseq [search-col columns]
(testing (format "When the search column %s a uniqueness constraint in the db"
(if (= :key search-col) "has" "does not have"))
(doseq [already-exists? [true false]]
(let [search-value (str (random-uuid))
other-col (first (remove #{search-col} columns))
other-value (str (random-uuid))]
(try
;; ensure there is no database detritus to trip us up
(t2/delete! Setting search-col search-value)
(when already-exists?
(t2/insert! Setting search-col search-value other-col other-value))
(let [threads 5
latch (CountDownLatch. threads)
thunk (fn []
(u/prog1 (str (random-uuid))
(mdb.u/update-or-insert! Setting {search-col search-value}
(fn [_]
;; Make sure all the threads are in the mutating path
(.countDown latch)
(.await latch)
{other-col <>}))))
values-set (repeat-concurrently threads thunk)
latest (get (t2/select-one Setting search-col search-value) other-col)]
(testing "each update tried to set a different value"
(is (= threads (count values-set))))
;; Unfortunately updates are not serialized, but we cannot show that without using a model with more
;; than 2 fields.
(testing "the row is updated to match the last update call that resolved"
(is (not= other-value latest))
(is (contains? values-set latest)))
(when (or (= :key search-col) already-exists?)
(is (= 1 (count (t2/select Setting search-col search-value)))))
(testing "After the database is created, it does not create further duplicates"
(let [count (t2/count Setting search-col search-value)]
(is (pos? count))
(is (empty? (set/intersection values-set (repeat-concurrently threads thunk))))
(is (= count (t2/count Setting search-col search-value))))))
;; Since we couldn't use with-temp, we need to clean up manually.
(finally
(t2/delete! Setting search-col search-value)))))))))
......@@ -481,7 +481,7 @@
(testing "should created a full FieldValues when constraints is `nil`"
;; warm up the cache
(chain-filter categories.name nil)
(with-redefs [params.field-values/create-advanced-field-values! (fn [& _args]
(with-redefs [params.field-values/prepare-advanced-field-values (fn [& _args]
(assert false "Should not be called"))]
(is (= {:values [["African"] ["American"] ["Artisan"]]
:has_more_values false}
......@@ -493,7 +493,7 @@
(field-values/clear-advanced-field-values-for-field! field-id)
;; warm up the cache
(chain-filter categories.name {venues.price 4})
(with-redefs [params.field-values/create-advanced-field-values! (fn [& _args]
(with-redefs [params.field-values/prepare-advanced-field-values (fn [& _args]
(assert false "Should not be called"))]
(is (= {:values [["Japanese"] ["Steakhouse"]]
:has_more_values false}
......
......@@ -457,3 +457,9 @@
2 1250.04
1 1250.0
0 1250.0))
(deftest conflicting-keys-test
(is (= [] (u/conflicting-keys {:a 1 :b 2}
{:b 2 :c 3})))
(is (= [:c :e] (u/conflicting-keys {:a 1 :b 2 :c 3 :e nil}
{:b 2 :c 4 :d 5 :e 6}))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment