Skip to content
Snippets Groups Projects
Unverified Commit c60dfc5f authored by dpsutton's avatar dpsutton Committed by GitHub
Browse files

Changing type (#16776)

* Move sync executor to bespoke namespace

* Refingerprint fields on type change

* Check if can connect in refingerprint-field!

* docstring update

* Cleanup tests a bit

* Error handling

* Table field values endpoint on sync.concurrent executor

* ns sort api field

* ns cleanup
parent 3ec2c8a8
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
(mbql.match/match 1) (mbql.match/match 1)
(mt/test-drivers 1) (mt/test-drivers 1)
(mt/query 1) (mt/query 1)
(mt/dataset 1)
(mbql.match/match-one 1) (mbql.match/match-one 1)
(mbql.match/replace 1) (mbql.match/replace 1)
(mbql.match/replace-in 2) (mbql.match/replace-in 2)
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
[metabase.query-processor :as qp] [metabase.query-processor :as qp]
[metabase.related :as related] [metabase.related :as related]
[metabase.server.middleware.offset-paging :as offset-paging] [metabase.server.middleware.offset-paging :as offset-paging]
[metabase.sync :as sync]
[metabase.sync.concurrent :as sync.concurrent]
[metabase.types :as types] [metabase.types :as types]
[metabase.util :as u] [metabase.util :as u]
[metabase.util.i18n :refer [trs]] [metabase.util.i18n :refer [trs]]
...@@ -138,8 +140,11 @@ ...@@ -138,8 +140,11 @@
:present #{:caveats :description :fk_target_field_id :points_of_interest :semantic_type :visibility_type :coercion_strategy :effective_type :present #{:caveats :description :fk_target_field_id :points_of_interest :semantic_type :visibility_type :coercion_strategy :effective_type
:has_field_values} :has_field_values}
:non-nil #{:display_name :settings}))))) :non-nil #{:display_name :settings})))))
;; return updated field ;; return updated field. note the fingerprint on this might be out of date if the task below would replace them
(hydrate (Field id) :dimensions))) ;; but that shouldn't matter for the datamodel page
(u/prog1 (hydrate (Field id) :dimensions)
(when (not= effective-type (:effective_type field))
(sync.concurrent/submit-task (fn [] (sync/refingerprint-field! <>)))))))
;;; ------------------------------------------------- Field Metadata ------------------------------------------------- ;;; ------------------------------------------------- Field Metadata -------------------------------------------------
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
[metabase.models.table :as table :refer [Table]] [metabase.models.table :as table :refer [Table]]
[metabase.related :as related] [metabase.related :as related]
[metabase.sync :as sync] [metabase.sync :as sync]
[metabase.sync.concurrent :as sync.concurrent]
[metabase.sync.field-values :as sync-field-values] [metabase.sync.field-values :as sync-field-values]
[metabase.types :as types] [metabase.types :as types]
[metabase.util :as u] [metabase.util :as u]
...@@ -20,8 +21,7 @@ ...@@ -20,8 +21,7 @@
[metabase.util.schema :as su] [metabase.util.schema :as su]
[schema.core :as s] [schema.core :as s]
[toucan.db :as db] [toucan.db :as db]
[toucan.hydrate :refer [hydrate]]) [toucan.hydrate :refer [hydrate]]))
(:import [java.util.concurrent Callable Executors ExecutorService Future ThreadFactory]))
(def ^:private TableVisibilityType (def ^:private TableVisibilityType
"Schema for a valid table visibility type." "Schema for a valid table visibility type."
...@@ -62,28 +62,11 @@ ...@@ -62,28 +62,11 @@
(hydrate updated-table [:fields [:target :has_field_values] :dimensions :has_field_values])) (hydrate updated-table [:fields [:target :has_field_values] :dimensions :has_field_values]))
updated-table))) updated-table)))
(defonce ^:private thread-factory
(reify ThreadFactory
(newThread [_ r]
(doto (Thread. r)
(.setName "table sync worker")
(.setDaemon true)))))
(defonce ^:private executor
(delay (Executors/newFixedThreadPool 1 ^ThreadFactory thread-factory)))
(defn- submit-task
"Submit a task to the single thread executor. This will attempt to serialize repeated requests to sync tables. It
obviously cannot work across multiple instances."
^Future [^Callable f]
(let [task (bound-fn [] (f))]
(.submit ^ExecutorService @executor ^Callable task)))
(defn- sync-unhidden-tables (defn- sync-unhidden-tables
"Function to call on newly unhidden tables. Starts a thread to sync all tables." "Function to call on newly unhidden tables. Starts a thread to sync all tables."
[newly-unhidden] [newly-unhidden]
(when (seq newly-unhidden) (when (seq newly-unhidden)
(submit-task (sync.concurrent/submit-task
(fn [] (fn []
(let [database (table/database (first newly-unhidden))] (let [database (table/database (first newly-unhidden))]
(if (driver.u/can-connect-with-details? (:engine database) (:details database)) (if (driver.u/can-connect-with-details? (:engine database) (:details database))
...@@ -401,8 +384,9 @@ ...@@ -401,8 +384,9 @@
[id] [id]
(api/check-superuser) (api/check-superuser)
;; async so as not to block the UI ;; async so as not to block the UI
(future (sync.concurrent/submit-task
(sync-field-values/update-field-values-for-table! (api/check-404 (Table id)))) (fn []
(sync-field-values/update-field-values-for-table! (api/check-404 (Table id)))))
{:status :success}) {:status :success})
(api/defendpoint POST "/:id/discard_values" (api/defendpoint POST "/:id/discard_values"
......
...@@ -8,7 +8,11 @@ ...@@ -8,7 +8,11 @@
In the near future these steps will be scheduled individually, meaning those functions will In the near future these steps will be scheduled individually, meaning those functions will
be called directly instead of calling the `sync-database!` function to do all three at once." be called directly instead of calling the `sync-database!` function to do all three at once."
(:require [metabase.sync.analyze :as analyze] (:require [metabase.driver.util :as driver.u]
[metabase.models.field :as field]
[metabase.models.table :as table]
[metabase.sync.analyze :as analyze]
[metabase.sync.analyze.fingerprint :as fingerprint]
[metabase.sync.field-values :as field-values] [metabase.sync.field-values :as field-values]
[metabase.sync.interface :as i] [metabase.sync.interface :as i]
[metabase.sync.sync-metadata :as sync-metadata] [metabase.sync.sync-metadata :as sync-metadata]
...@@ -52,8 +56,22 @@ ...@@ -52,8 +56,22 @@
[field-values/update-field-values! "field-values"])]))))) [field-values/update-field-values! "field-values"])])))))
(s/defn sync-table! (s/defn sync-table!
"Perform all the different sync operations synchronously for a given `table`." "Perform all the different sync operations synchronously for a given `table`. Since often called on a sequence of
tables, caller should check if can connect."
[table :- i/TableInstance] [table :- i/TableInstance]
(sync-metadata/sync-table-metadata! table) (let [database (table/database table)]
(analyze/analyze-table! table) (sync-metadata/sync-table-metadata! table)
(field-values/update-field-values-for-table! table)) (analyze/analyze-table! table)
(field-values/update-field-values-for-table! table)))
(s/defn refingerprint-field!
"Refingerprint a field, usually after its type changes. Checks if can connect to database, returning
`:sync/no-connection` if not."
[field :- i/FieldInstance]
(let [table (field/table field)
database (table/database table)]
(if (driver.u/can-connect-with-details? (:engine database) (:details database))
(sync-util/with-error-handling (format "Error refingerprinting field %s"
(sync-util/name-for-logging field))
(fingerprint/refingerprint-field field))
:sync/no-connection)))
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
[honeysql.helpers :as h] [honeysql.helpers :as h]
[metabase.db.metadata-queries :as metadata-queries] [metabase.db.metadata-queries :as metadata-queries]
[metabase.db.util :as mdb.u] [metabase.db.util :as mdb.u]
[metabase.models.field :refer [Field]] [metabase.models.field :as field :refer [Field]]
[metabase.query-processor.store :as qp.store] [metabase.query-processor.store :as qp.store]
[metabase.sync.analyze.fingerprint.fingerprinters :as f] [metabase.sync.analyze.fingerprint.fingerprinters :as f]
[metabase.sync.interface :as i] [metabase.sync.interface :as i]
...@@ -235,3 +235,9 @@ ...@@ -235,3 +235,9 @@
log-progress-fn log-progress-fn
(fn [stats-acc] (fn [stats-acc]
(< (:fingerprints-attempted stats-acc) max-refingerprint-field-count))))) (< (:fingerprints-attempted stats-acc) max-refingerprint-field-count)))))
(s/defn refingerprint-field
"Refingerprint a field"
[field :- i/FieldInstance]
(let [table (field/table field)]
(fingerprint-table! table [field])))
(ns metabase.sync.concurrent
"Namespace with helpers for concurrent tasks in sync. Intended for quick, one-off tasks like re-syncing a table,
fingerprinting a field, etc."
(:import [java.util.concurrent Callable Executors ExecutorService Future ThreadFactory]))
(defonce ^:private thread-factory
(reify ThreadFactory
(newThread [_ r]
(doto (Thread. r)
(.setName "table sync worker")
(.setDaemon true)))))
(defonce ^:private executor
(delay (Executors/newFixedThreadPool 1 ^ThreadFactory thread-factory)))
(defn submit-task
"Submit a task to the single thread executor. This will attempt to serialize repeated requests to sync tables. It
obviously cannot work across multiple instances."
^Future [^Callable f]
(let [task (bound-fn [] (f))]
(.submit ^ExecutorService @executor ^Callable task)))
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
[metabase.api.field :as field-api] [metabase.api.field :as field-api]
[metabase.driver.util :as driver.u] [metabase.driver.util :as driver.u]
[metabase.models :refer [Database Field FieldValues Table]] [metabase.models :refer [Database Field FieldValues Table]]
[metabase.sync :as sync]
[metabase.test :as mt] [metabase.test :as mt]
[metabase.test.fixtures :as fixtures] [metabase.test.fixtures :as fixtures]
[metabase.timeseries-query-processor-test.util :as tqp.test] [metabase.timeseries-query-processor-test.util :as tqp.test]
...@@ -76,6 +77,11 @@ ...@@ -76,6 +77,11 @@
(defn simple-field-details [field] (defn simple-field-details [field]
(select-keys field [:name :display_name :description :visibility_type :semantic_type :fk_target_field_id])) (select-keys field [:name :display_name :description :visibility_type :semantic_type :fk_target_field_id]))
(mt/defdataset integer-coerceable
[["t" [{:field-name "f"
:base-type :type/Integer}]
[[100000] [200000] [300000]]]])
(deftest update-field-test (deftest update-field-test
(testing "PUT /api/field/:id" (testing "PUT /api/field/:id"
(testing "test that we can do basic field update work, including unsetting some fields such as semantic-type" (testing "test that we can do basic field update work, including unsetting some fields such as semantic-type"
...@@ -133,7 +139,24 @@ ...@@ -133,7 +139,24 @@
((juxt :effective_type :coercion_strategy) ((juxt :effective_type :coercion_strategy)
(mt/user-http-request :crowberto :put 200 (format "field/%d" field-id) (mt/user-http-request :crowberto :put 200 (format "field/%d" field-id)
;; unix is an integer->Temporal conversion ;; unix is an integer->Temporal conversion
{:coercion_strategy :Coercion/UNIXMicroSeconds->DateTime}))))))))) {:coercion_strategy :Coercion/UNIXMicroSeconds->DateTime}))))))
(testing "Refingerprints field when updated"
(with-redefs [metabase.sync.concurrent/submit-task (fn [task] (task))]
(mt/dataset integer-coerceable
(sync/sync-database! (Database (mt/id)))
(let [field-id (mt/id :t :f)
set-strategy! (fn [strategy]
(mt/user-http-request :crowberto :put 200 (format "field/%d" field-id)
{:coercion_strategy strategy}))]
;; ensure that there is no coercion strategy from previous tests
(set-strategy! nil)
(let [field (Field field-id)]
(is (= :type/Integer (:effective_type field)))
(is (contains? (get-in field [:fingerprint :type]) :type/Number)))
(set-strategy! :Coercion/UNIXSeconds->DateTime)
(let [field (Field field-id)]
(is (= :type/Instant (:effective_type field)))
(is (contains? (get-in field [:fingerprint :type]) :type/DateTime))))))))))
(deftest remove-fk-semantic-type-test (deftest remove-fk-semantic-type-test
(testing "PUT /api/field/:id" (testing "PUT /api/field/:id"
......
(ns metabase.sync.analyze-test (ns metabase.sync.analyze-test
(:require [clojure.test :refer :all] (:require [clojure.test :refer :all]
[metabase.api.table :as table-api]
[metabase.models.database :refer [Database]] [metabase.models.database :refer [Database]]
[metabase.models.field :as field :refer [Field]] [metabase.models.field :as field :refer [Field]]
[metabase.models.table :refer [Table]] [metabase.models.table :refer [Table]]
...@@ -10,6 +9,7 @@ ...@@ -10,6 +9,7 @@
[metabase.sync.analyze.classifiers.no-preview-display :as classifiers.no-preview-display] [metabase.sync.analyze.classifiers.no-preview-display :as classifiers.no-preview-display]
[metabase.sync.analyze.classifiers.text-fingerprint :as classify-text-fingerprint] [metabase.sync.analyze.classifiers.text-fingerprint :as classify-text-fingerprint]
[metabase.sync.analyze.fingerprint.fingerprinters :as fingerprinters] [metabase.sync.analyze.fingerprint.fingerprinters :as fingerprinters]
[metabase.sync.concurrent :as sync.concurrent]
[metabase.sync.interface :as i] [metabase.sync.interface :as i]
[metabase.sync.sync-metadata :as sync-metadata] [metabase.sync.sync-metadata :as sync-metadata]
[metabase.test :as mt] [metabase.test :as mt]
...@@ -231,18 +231,13 @@ ...@@ -231,18 +231,13 @@
(deftest analyze-unhidden-tables-test (deftest analyze-unhidden-tables-test
(testing "un-hiding a table should cause it to be analyzed" (testing "un-hiding a table should cause it to be analyzed"
(let [original-submit (var-get #'table-api/submit-task) (with-redefs [sync.concurrent/submit-task (fn [task] (task))]
finished? (promise)] (mt/with-temp* [Table [table (fake-table)]
(with-redefs [table-api/submit-task (fn [task] Field [field (fake-field table)]]
@(original-submit task) (set-table-visibility-type-via-api! table "hidden")
(deliver finished? true))] (set-table-visibility-type-via-api! table nil)
(mt/with-temp* [Table [table (fake-table)] (is (= true
Field [field (fake-field table)]] (fake-field-was-analyzed? field)))))))
(set-table-visibility-type-via-api! table "hidden")
(set-table-visibility-type-via-api! table nil)
(deref finished? 1000 ::timeout)
(is (= true
(fake-field-was-analyzed? field))))))))
(deftest dont-analyze-rehidden-table-test (deftest dont-analyze-rehidden-table-test
(testing "re-hiding a table should not cause it to be analyzed" (testing "re-hiding a table should not cause it to be analyzed"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment