From c60dfc5f266c7d5c00292199d7fddeb5adcc03f8 Mon Sep 17 00:00:00 2001 From: dpsutton <dan@dpsutton.com> Date: Wed, 11 Aug 2021 13:06:15 -0400 Subject: [PATCH] Changing type (#16776) * Move sync executor to bespoke namespace * Refingerprint fields on type change * Check if can connect in refingerprint-field! * docstring update * Cleanup tests a bit * Error handling * Table field values endpoint on sync.concurrent executor * ns sort api field * ns cleanup --- .dir-locals.el | 1 + src/metabase/api/field.clj | 9 ++++++-- src/metabase/api/table.clj | 28 +++++------------------ src/metabase/sync.clj | 28 +++++++++++++++++++---- src/metabase/sync/analyze/fingerprint.clj | 8 ++++++- src/metabase/sync/concurrent.clj | 21 +++++++++++++++++ test/metabase/api/field_test.clj | 25 +++++++++++++++++++- test/metabase/sync/analyze_test.clj | 21 +++++++---------- 8 files changed, 97 insertions(+), 44 deletions(-) create mode 100644 src/metabase/sync/concurrent.clj diff --git a/.dir-locals.el b/.dir-locals.el index be1ae3c9207..68f263f5009 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -25,6 +25,7 @@ (mbql.match/match 1) (mt/test-drivers 1) (mt/query 1) + (mt/dataset 1) (mbql.match/match-one 1) (mbql.match/replace 1) (mbql.match/replace-in 2) diff --git a/src/metabase/api/field.clj b/src/metabase/api/field.clj index 71209a9e3a3..d7498a330d2 100644 --- a/src/metabase/api/field.clj +++ b/src/metabase/api/field.clj @@ -13,6 +13,8 @@ [metabase.query-processor :as qp] [metabase.related :as related] [metabase.server.middleware.offset-paging :as offset-paging] + [metabase.sync :as sync] + [metabase.sync.concurrent :as sync.concurrent] [metabase.types :as types] [metabase.util :as u] [metabase.util.i18n :refer [trs]] @@ -138,8 +140,11 @@ :present #{:caveats :description :fk_target_field_id :points_of_interest :semantic_type :visibility_type :coercion_strategy :effective_type :has_field_values} :non-nil #{:display_name :settings}))))) - ;; return updated field - (hydrate (Field id) :dimensions))) + ;; return updated field. note the fingerprint on this might be out of date if the task below would replace them + ;; but that shouldn't matter for the datamodel page + (u/prog1 (hydrate (Field id) :dimensions) + (when (not= effective-type (:effective_type field)) + (sync.concurrent/submit-task (fn [] (sync/refingerprint-field! <>))))))) ;;; ------------------------------------------------- Field Metadata ------------------------------------------------- diff --git a/src/metabase/api/table.clj b/src/metabase/api/table.clj index 1d9ed7de21a..ada7dfa1470 100644 --- a/src/metabase/api/table.clj +++ b/src/metabase/api/table.clj @@ -13,6 +13,7 @@ [metabase.models.table :as table :refer [Table]] [metabase.related :as related] [metabase.sync :as sync] + [metabase.sync.concurrent :as sync.concurrent] [metabase.sync.field-values :as sync-field-values] [metabase.types :as types] [metabase.util :as u] @@ -20,8 +21,7 @@ [metabase.util.schema :as su] [schema.core :as s] [toucan.db :as db] - [toucan.hydrate :refer [hydrate]]) - (:import [java.util.concurrent Callable Executors ExecutorService Future ThreadFactory])) + [toucan.hydrate :refer [hydrate]])) (def ^:private TableVisibilityType "Schema for a valid table visibility type." @@ -62,28 +62,11 @@ (hydrate updated-table [:fields [:target :has_field_values] :dimensions :has_field_values])) updated-table))) -(defonce ^:private thread-factory - (reify ThreadFactory - (newThread [_ r] - (doto (Thread. r) - (.setName "table sync worker") - (.setDaemon true))))) - -(defonce ^:private executor - (delay (Executors/newFixedThreadPool 1 ^ThreadFactory thread-factory))) - -(defn- submit-task - "Submit a task to the single thread executor. This will attempt to serialize repeated requests to sync tables. It - obviously cannot work across multiple instances." - ^Future [^Callable f] - (let [task (bound-fn [] (f))] - (.submit ^ExecutorService @executor ^Callable task))) - (defn- sync-unhidden-tables "Function to call on newly unhidden tables. Starts a thread to sync all tables." [newly-unhidden] (when (seq newly-unhidden) - (submit-task + (sync.concurrent/submit-task (fn [] (let [database (table/database (first newly-unhidden))] (if (driver.u/can-connect-with-details? (:engine database) (:details database)) @@ -401,8 +384,9 @@ [id] (api/check-superuser) ;; async so as not to block the UI - (future - (sync-field-values/update-field-values-for-table! (api/check-404 (Table id)))) + (sync.concurrent/submit-task + (fn [] + (sync-field-values/update-field-values-for-table! (api/check-404 (Table id))))) {:status :success}) (api/defendpoint POST "/:id/discard_values" diff --git a/src/metabase/sync.clj b/src/metabase/sync.clj index 52ac95f3c10..30ab4899128 100644 --- a/src/metabase/sync.clj +++ b/src/metabase/sync.clj @@ -8,7 +8,11 @@ In the near future these steps will be scheduled individually, meaning those functions will be called directly instead of calling the `sync-database!` function to do all three at once." - (:require [metabase.sync.analyze :as analyze] + (:require [metabase.driver.util :as driver.u] + [metabase.models.field :as field] + [metabase.models.table :as table] + [metabase.sync.analyze :as analyze] + [metabase.sync.analyze.fingerprint :as fingerprint] [metabase.sync.field-values :as field-values] [metabase.sync.interface :as i] [metabase.sync.sync-metadata :as sync-metadata] @@ -52,8 +56,22 @@ [field-values/update-field-values! "field-values"])]))))) (s/defn sync-table! - "Perform all the different sync operations synchronously for a given `table`." + "Perform all the different sync operations synchronously for a given `table`. Since often called on a sequence of + tables, caller should check if can connect." [table :- i/TableInstance] - (sync-metadata/sync-table-metadata! table) - (analyze/analyze-table! table) - (field-values/update-field-values-for-table! table)) + (let [database (table/database table)] + (sync-metadata/sync-table-metadata! table) + (analyze/analyze-table! table) + (field-values/update-field-values-for-table! table))) + +(s/defn refingerprint-field! + "Refingerprint a field, usually after its type changes. Checks if can connect to database, returning + `:sync/no-connection` if not." + [field :- i/FieldInstance] + (let [table (field/table field) + database (table/database table)] + (if (driver.u/can-connect-with-details? (:engine database) (:details database)) + (sync-util/with-error-handling (format "Error refingerprinting field %s" + (sync-util/name-for-logging field)) + (fingerprint/refingerprint-field field)) + :sync/no-connection))) diff --git a/src/metabase/sync/analyze/fingerprint.clj b/src/metabase/sync/analyze/fingerprint.clj index cc92b320622..37fb1183450 100644 --- a/src/metabase/sync/analyze/fingerprint.clj +++ b/src/metabase/sync/analyze/fingerprint.clj @@ -6,7 +6,7 @@ [honeysql.helpers :as h] [metabase.db.metadata-queries :as metadata-queries] [metabase.db.util :as mdb.u] - [metabase.models.field :refer [Field]] + [metabase.models.field :as field :refer [Field]] [metabase.query-processor.store :as qp.store] [metabase.sync.analyze.fingerprint.fingerprinters :as f] [metabase.sync.interface :as i] @@ -235,3 +235,9 @@ log-progress-fn (fn [stats-acc] (< (:fingerprints-attempted stats-acc) max-refingerprint-field-count))))) + +(s/defn refingerprint-field + "Refingerprint a field" + [field :- i/FieldInstance] + (let [table (field/table field)] + (fingerprint-table! table [field]))) diff --git a/src/metabase/sync/concurrent.clj b/src/metabase/sync/concurrent.clj new file mode 100644 index 00000000000..d3d4e7417f9 --- /dev/null +++ b/src/metabase/sync/concurrent.clj @@ -0,0 +1,21 @@ +(ns metabase.sync.concurrent + "Namespace with helpers for concurrent tasks in sync. Intended for quick, one-off tasks like re-syncing a table, + fingerprinting a field, etc." + (:import [java.util.concurrent Callable Executors ExecutorService Future ThreadFactory])) + +(defonce ^:private thread-factory + (reify ThreadFactory + (newThread [_ r] + (doto (Thread. r) + (.setName "table sync worker") + (.setDaemon true))))) + +(defonce ^:private executor + (delay (Executors/newFixedThreadPool 1 ^ThreadFactory thread-factory))) + +(defn submit-task + "Submit a task to the single thread executor. This will attempt to serialize repeated requests to sync tables. It + obviously cannot work across multiple instances." + ^Future [^Callable f] + (let [task (bound-fn [] (f))] + (.submit ^ExecutorService @executor ^Callable task))) diff --git a/test/metabase/api/field_test.clj b/test/metabase/api/field_test.clj index 00235007c8d..0b2669cdb7b 100644 --- a/test/metabase/api/field_test.clj +++ b/test/metabase/api/field_test.clj @@ -5,6 +5,7 @@ [metabase.api.field :as field-api] [metabase.driver.util :as driver.u] [metabase.models :refer [Database Field FieldValues Table]] + [metabase.sync :as sync] [metabase.test :as mt] [metabase.test.fixtures :as fixtures] [metabase.timeseries-query-processor-test.util :as tqp.test] @@ -76,6 +77,11 @@ (defn simple-field-details [field] (select-keys field [:name :display_name :description :visibility_type :semantic_type :fk_target_field_id])) +(mt/defdataset integer-coerceable + [["t" [{:field-name "f" + :base-type :type/Integer}] + [[100000] [200000] [300000]]]]) + (deftest update-field-test (testing "PUT /api/field/:id" (testing "test that we can do basic field update work, including unsetting some fields such as semantic-type" @@ -133,7 +139,24 @@ ((juxt :effective_type :coercion_strategy) (mt/user-http-request :crowberto :put 200 (format "field/%d" field-id) ;; unix is an integer->Temporal conversion - {:coercion_strategy :Coercion/UNIXMicroSeconds->DateTime}))))))))) + {:coercion_strategy :Coercion/UNIXMicroSeconds->DateTime})))))) + (testing "Refingerprints field when updated" + (with-redefs [metabase.sync.concurrent/submit-task (fn [task] (task))] + (mt/dataset integer-coerceable + (sync/sync-database! (Database (mt/id))) + (let [field-id (mt/id :t :f) + set-strategy! (fn [strategy] + (mt/user-http-request :crowberto :put 200 (format "field/%d" field-id) + {:coercion_strategy strategy}))] + ;; ensure that there is no coercion strategy from previous tests + (set-strategy! nil) + (let [field (Field field-id)] + (is (= :type/Integer (:effective_type field))) + (is (contains? (get-in field [:fingerprint :type]) :type/Number))) + (set-strategy! :Coercion/UNIXSeconds->DateTime) + (let [field (Field field-id)] + (is (= :type/Instant (:effective_type field))) + (is (contains? (get-in field [:fingerprint :type]) :type/DateTime)))))))))) (deftest remove-fk-semantic-type-test (testing "PUT /api/field/:id" diff --git a/test/metabase/sync/analyze_test.clj b/test/metabase/sync/analyze_test.clj index 202146868e9..2bf0451da7f 100644 --- a/test/metabase/sync/analyze_test.clj +++ b/test/metabase/sync/analyze_test.clj @@ -1,6 +1,5 @@ (ns metabase.sync.analyze-test (:require [clojure.test :refer :all] - [metabase.api.table :as table-api] [metabase.models.database :refer [Database]] [metabase.models.field :as field :refer [Field]] [metabase.models.table :refer [Table]] @@ -10,6 +9,7 @@ [metabase.sync.analyze.classifiers.no-preview-display :as classifiers.no-preview-display] [metabase.sync.analyze.classifiers.text-fingerprint :as classify-text-fingerprint] [metabase.sync.analyze.fingerprint.fingerprinters :as fingerprinters] + [metabase.sync.concurrent :as sync.concurrent] [metabase.sync.interface :as i] [metabase.sync.sync-metadata :as sync-metadata] [metabase.test :as mt] @@ -231,18 +231,13 @@ (deftest analyze-unhidden-tables-test (testing "un-hiding a table should cause it to be analyzed" - (let [original-submit (var-get #'table-api/submit-task) - finished? (promise)] - (with-redefs [table-api/submit-task (fn [task] - @(original-submit task) - (deliver finished? true))] - (mt/with-temp* [Table [table (fake-table)] - Field [field (fake-field table)]] - (set-table-visibility-type-via-api! table "hidden") - (set-table-visibility-type-via-api! table nil) - (deref finished? 1000 ::timeout) - (is (= true - (fake-field-was-analyzed? field)))))))) + (with-redefs [sync.concurrent/submit-task (fn [task] (task))] + (mt/with-temp* [Table [table (fake-table)] + Field [field (fake-field table)]] + (set-table-visibility-type-via-api! table "hidden") + (set-table-visibility-type-via-api! table nil) + (is (= true + (fake-field-was-analyzed? field))))))) (deftest dont-analyze-rehidden-table-test (testing "re-hiding a table should not cause it to be analyzed" -- GitLab