Skip to content
Snippets Groups Projects
Unverified Commit 0bc880ad authored by Cal Herries's avatar Cal Herries Committed by GitHub
Browse files

Avoid all tables in memory during sync (#46193)


Co-authored-by: default avatarNgoc Khuat <qn.khuat@gmail.com>
parent a31adea8
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,6 @@
like running MBQL queries and fetching values to do things like determine Table row counts
and infer field semantic types."
(:require
[metabase.models.field :refer [Field]]
[metabase.sync.analyze.classify :as classify]
[metabase.sync.analyze.fingerprint :as sync.fingerprint]
[metabase.sync.interface :as i]
......@@ -52,26 +51,23 @@
;; newly re-fingerprinted Fields, because we'll know to skip the ones from last time since their value of
;; `last_analyzed` is not `nil`.
(mu/defn- update-last-analyzed!
[tables :- [:sequential i/TableInstance]]
(when-let [ids (seq (map u/the-id tables))]
;; The WHERE portion of this query should match up with that of `classify/fields-to-classify`
(t2/update! Field {:table_id [:in ids]
:fingerprint_version i/*latest-fingerprint-version*
:last_analyzed nil}
{:last_analyzed :%now})))
(mu/defn- update-fields-last-analyzed!
"Update the `last_analyzed` date for all the recently re-fingerprinted/re-classified Fields in TABLE."
"Update the `last_analyzed` date for all the recently re-fingerprinted/re-classified Fields in `table`."
[table :- i/TableInstance]
(update-last-analyzed! [table]))
(t2/update! :model/Field
(merge (sync.fingerprint/incomplete-analysis-kvs)
{:table_id (:id table)})
{:last_analyzed :%now}))
(mu/defn- update-fields-last-analyzed-for-db!
"Update the `last_analyzed` date for all the recently re-fingerprinted/re-classified Fields in TABLE."
[_database :- i/DatabaseInstance
tables :- [:sequential i/TableInstance]]
;; The WHERE portion of this query should match up with that of `classify/fields-to-classify`
(update-last-analyzed! tables))
"Update the `last_analyzed` date for all the recently re-fingerprinted/re-classified Fields in `database`."
[database :- i/DatabaseInstance]
(t2/update! :model/Field
(merge (sync.fingerprint/incomplete-analysis-kvs)
{:table_id [:in {:select [:id]
:from [(t2/table-name :model/Table)]
:where [:and sync-util/sync-tables-clause [:= :db_id (:id database)]]}]})
{:last_analyzed :%now}))
(mu/defn analyze-table!
"Perform in-depth analysis for a `table`."
......@@ -99,15 +95,15 @@
(format "Total number of tables classified %d, %d updated"
total-tables tables-classified))
(defn- make-analyze-steps [tables log-fn]
(defn- make-analyze-steps [log-fn]
[(sync-util/create-sync-step "fingerprint-fields"
#(sync.fingerprint/fingerprint-fields-for-db! % tables log-fn)
#(sync.fingerprint/fingerprint-fields-for-db! % log-fn)
fingerprint-fields-summary)
(sync-util/create-sync-step "classify-fields"
#(classify/classify-fields-for-db! % tables log-fn)
#(classify/classify-fields-for-db! % log-fn)
classify-fields-summary)
(sync-util/create-sync-step "classify-tables"
#(classify/classify-tables-for-db! % tables log-fn)
#(classify/classify-tables-for-db! % log-fn)
classify-tables-summary)])
(mu/defn analyze-db!
......@@ -116,21 +112,19 @@
`:last_analyzed` value for each affected Field."
[database :- i/DatabaseInstance]
(sync-util/sync-operation :analyze database (format "Analyze data for %s" (sync-util/name-for-logging database))
(let [tables (sync-util/db->sync-tables database)]
(sync-util/with-emoji-progress-bar [emoji-progress-bar (inc (* 3 (count tables)))]
(u/prog1 (sync-util/run-sync-operation "analyze" database (make-analyze-steps tables (maybe-log-progress emoji-progress-bar)))
(update-fields-last-analyzed-for-db! database tables))))))
(sync-util/with-emoji-progress-bar [emoji-progress-bar (inc (* 3 (sync-util/sync-tables-count database)))]
(u/prog1 (sync-util/run-sync-operation "analyze" database (make-analyze-steps (maybe-log-progress emoji-progress-bar)))
(update-fields-last-analyzed-for-db! database)))))
(mu/defn refingerprint-db!
"Refingerprint a subset of tables in a given `database`. This will re-fingerprint tables up to a threshold amount of
[[fingerprint/max-refingerprint-field-count]]."
[database :- i/DatabaseInstance]
(sync-util/sync-operation :refingerprint database (format "Refingerprinting tables for %s" (sync-util/name-for-logging database))
(let [tables (sync-util/db->sync-tables database)
log-fn (fn [step table]
(let [log-fn (fn [step table]
(log/info (u/format-color 'blue "%s Analyzed %s" step (sync-util/name-for-logging table))))]
(sync-util/run-sync-operation "refingerprint database"
database
[(sync-util/create-sync-step "refingerprinting fields"
#(sync.fingerprint/refingerprint-fields-for-db! % tables log-fn)
#(sync.fingerprint/refingerprint-fields-for-db! % log-fn)
fingerprint-fields-summary)]))))
......@@ -22,6 +22,7 @@
[metabase.lib.metadata :as lib.metadata]
[metabase.models.interface :as mi]
[metabase.query-processor.store :as qp.store]
[metabase.sync.analyze.fingerprint :as sync.fingerprint]
[metabase.sync.interface :as i]
[metabase.sync.util :as sync-util]
[metabase.util :as u]
......@@ -89,10 +90,9 @@
"Return a sequences of Fields belonging to `table` for which we should attempt to determine semantic type. This
should include Fields that have the latest fingerprint, but have not yet *completed* analysis."
[table :- i/TableInstance]
(seq (t2/select :model/Field
:table_id (u/the-id table)
:fingerprint_version i/*latest-fingerprint-version*
:last_analyzed nil)))
(seq (apply t2/select :model/Field
:table_id (u/the-id table)
(reduce concat [] (sync.fingerprint/incomplete-analysis-kvs)))))
(mu/defn classify-fields!
"Run various classifiers on the appropriate `fields` in a `table` that have not been previously analyzed. These do
......@@ -117,27 +117,29 @@
(mu/defn classify-tables-for-db!
"Classify all tables found in a given database"
[_database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
[database :- i/DatabaseInstance
log-progress-fn]
{:total-tables (count tables)
:tables-classified (sync-util/sum-numbers (fn [table]
(let [result (classify-table! table)]
(log-progress-fn "classify-tables" table)
(if result
1
0)))
tables)})
(let [tables (sync-util/reducible-sync-tables database)]
(transduce (map (fn [table]
(let [result (classify-table! table)]
(log-progress-fn "classify-tables" table)
{:tables-classified (if result
1
0)
:total-tables 1})))
(partial merge-with +)
{:tables-classified 0, :total-tables 0}
tables)))
(mu/defn classify-fields-for-db!
"Classify all fields found in a given database"
[_database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
[database :- i/DatabaseInstance
log-progress-fn]
(apply merge-with +
{:fields-classified 0, :fields-failed 0}
(map (fn [table]
(let [result (classify-fields! table)]
(log-progress-fn "classify-fields" table)
result))
tables)))
(let [tables (sync-util/reducible-sync-tables database)]
(transduce (map (fn [table]
(let [result (classify-fields! table)]
(log-progress-fn "classify-fields" table)
result)))
(partial merge-with +)
{:fields-classified 0, :fields-failed 0}
tables)))
......@@ -25,17 +25,21 @@
(comment
metadata-queries/keep-me-for-default-table-row-sample)
(defn incomplete-analysis-kvs
"Key-value pairs corresponding to the state of Fields that have the latest fingerprint, but have not yet
*completed* analysis. All Fields who get new fingerprints should get marked as having the latest fingerprint
version, but we'll clear their values for `last_analyzed`. This way we know these fields haven't 'completed'
analysis for the latest fingerprints. This is a function because `*latest-fingerprint-version* may be rebound
in tests."
[]
{:fingerprint_version i/*latest-fingerprint-version*
:last_analyzed nil})
(mu/defn- save-fingerprint!
[field :- i/FieldInstance
fingerprint :- [:maybe analyze/Fingerprint]]
(log/debugf "Saving fingerprint for %s" (sync-util/name-for-logging field))
;; All Fields who get new fingerprints should get marked as having the latest fingerprint version, but we'll
;; clear their values for `last_analyzed`. This way we know these fields haven't "completed" analysis for the
;; latest fingerprints.
(t2/update! Field (u/the-id field)
{:fingerprint fingerprint
:fingerprint_version i/*latest-fingerprint-version*
:last_analyzed nil}))
(t2/update! Field (u/the-id field) (merge (incomplete-analysis-kvs) {:fingerprint fingerprint})))
(mr/def ::FingerprintStats
[:map
......@@ -211,31 +215,31 @@
(mu/defn- fingerprint-fields-for-db!*
"Invokes `fingerprint-fields!` on every table in `database`"
([database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
log-progress-fn :- LogProgressFn]
(fingerprint-fields-for-db!* database tables log-progress-fn (constantly true)))
(fingerprint-fields-for-db!* database log-progress-fn (constantly true)))
([database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
log-progress-fn :- LogProgressFn
continue? :- [:=> [:cat ::FingerprintStats] :any]]
(qp.store/with-metadata-provider (u/the-id database)
(reduce (fn [acc table]
(log-progress-fn (if *refingerprint?* "refingerprint-fields" "fingerprint-fields") table)
(let [new-acc (merge-with + acc (fingerprint-fields! table))]
(if (continue? new-acc)
new-acc
(reduced new-acc))))
(empty-stats-map 0)
tables))))
(let [tables (if *refingerprint?*
(sync-util/refingerprint-reducible-sync-tables database)
(sync-util/reducible-sync-tables database))]
(reduce (fn [acc table]
(log-progress-fn (if *refingerprint?* "refingerprint-fields" "fingerprint-fields") table)
(let [new-acc (merge-with + acc (fingerprint-fields! table))]
(if (continue? new-acc)
new-acc
(reduced new-acc))))
(empty-stats-map 0)
tables)))))
(mu/defn fingerprint-fields-for-db!
"Invokes [[fingerprint-fields!]] on every table in `database`"
[database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
log-progress-fn :- LogProgressFn]
(if (driver.u/supports? (:engine database) :fingerprint database)
(fingerprint-fields-for-db!* database tables log-progress-fn)
(fingerprint-fields-for-db!* database log-progress-fn)
(empty-stats-map 0)))
(def ^:private max-refingerprint-field-count
......@@ -246,13 +250,9 @@
(mu/defn refingerprint-fields-for-db!
"Invokes [[fingeprint-fields!]] on every table in `database` up to some limit."
[database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]
log-progress-fn :- LogProgressFn]
(binding [*refingerprint?* true]
(fingerprint-fields-for-db!* database
;; our rudimentary refingerprint strategy is to shuffle the tables and fingerprint
;; until we are over some threshold of fields
(shuffle tables)
log-progress-fn
(fn [stats-acc]
(< (:fingerprints-attempted stats-acc) max-refingerprint-field-count)))))
......
......@@ -11,8 +11,7 @@
[metabase.sync.util :as sync-util]
[metabase.util.log :as log]
[metabase.util.malli :as mu]
[metabase.util.malli.fn :as mu.fn]
[toucan2.realize :as t2.realize]))
[metabase.util.malli.fn :as mu.fn]))
(defmacro log-if-error
"Logs an error message if an exception is thrown while executing the body."
......@@ -55,10 +54,10 @@
"Replaces [[metabase.driver/describe-fields]] for drivers that haven't implemented it. Uses [[driver/describe-table]]
instead. Also includes nested field column metadata."
[_driver database & {:keys [schema-names table-names]}]
(let [tables (sync-util/db->reducible-sync-tables database :schema-names schema-names :table-names table-names)]
(let [tables (sync-util/reducible-sync-tables database :schema-names schema-names :table-names table-names)]
(eduction
(mapcat (fn [table]
(for [x (table-fields-metadata database (t2.realize/realize table))]
(for [x (table-fields-metadata database table)]
(assoc x :table-schema (:schema table) :table-name (:name table)))))
tables)))
......@@ -83,7 +82,7 @@
"Replaces [[metabase.driver/describe-fks]] for drivers that haven't implemented it. Uses [[driver/describe-table-fks]]
which is deprecated."
[driver database & {:keys [schema-names table-names]}]
(let [tables (sync-util/db->reducible-sync-tables database :schema-names schema-names :table-names table-names)]
(let [tables (sync-util/reducible-sync-tables database :schema-names schema-names :table-names table-names)]
(eduction
(mapcat (fn [table]
#_{:clj-kondo/ignore [:deprecated-var]}
......
......@@ -61,9 +61,9 @@
(table->fields-to-scan table)))
(mu/defn- update-field-values-for-database!
[_database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]]
(apply merge-with + (map update-field-values-for-table! tables)))
[database :- i/DatabaseInstance]
(let [tables (sync-util/reducible-sync-tables database)]
(transduce (map update-field-values-for-table!) (partial merge-with +) tables)))
(defn- update-field-values-summary [{:keys [created updated deleted errors]}]
(format "Updated %d field value sets, created %d, deleted %d with %d errors"
......@@ -95,24 +95,23 @@
(reduce +)))
(mu/defn- delete-expired-advanced-field-values-for-database!
[_database :- i/DatabaseInstance
tables :- [:maybe [:sequential i/TableInstance]]]
{:deleted (transduce (comp (map delete-expired-advanced-field-values-for-table!)
(map (fn [result]
(if (instance? Throwable result)
(throw result)
result))))
+
0
tables)})
[database :- i/DatabaseInstance]
(let [tables (sync-util/reducible-sync-tables database)]
{:deleted (transduce (comp (map delete-expired-advanced-field-values-for-table!)
(map (fn [result]
(if (instance? Throwable result)
(throw result)
result))))
+
0
tables)}))
(defn- make-sync-field-values-steps
[tables]
(def ^:private sync-field-values-steps
[(sync-util/create-sync-step "delete-expired-advanced-field-values"
#(delete-expired-advanced-field-values-for-database! % tables)
delete-expired-advanced-field-values-for-database!
delete-expired-advanced-field-values-summary)
(sync-util/create-sync-step "update-field-values"
#(update-field-values-for-database! % tables)
update-field-values-for-database!
update-field-values-summary)])
(mu/defn update-field-values!
......@@ -121,5 +120,4 @@
[database :- i/DatabaseInstance]
(sync-util/sync-operation :cache-field-values database (format "Cache field values in %s"
(sync-util/name-for-logging database))
(let [tables (sync-util/db->sync-tables database)]
(sync-util/run-sync-operation "field values scanning" database (make-sync-field-values-steps tables)))))
(sync-util/run-sync-operation "field values scanning" database sync-field-values-steps)))
......@@ -77,7 +77,7 @@
(let [driver (driver.u/database->driver database)
schemas? (driver.u/supports? driver :schemas database)
fields-metadata (if schemas?
(fetch-metadata/fields-metadata database :schema-names (sync-util/db->sync-schemas database))
(fetch-metadata/fields-metadata database :schema-names (sync-util/sync-schemas database))
(fetch-metadata/fields-metadata database))]
(transduce (comp
(partition-by (juxt :table-name :table-schema))
......
......@@ -116,7 +116,7 @@
(u/prog1 (sync-util/with-error-handling (format "Error syncing FKs for %s" (sync-util/name-for-logging database))
(let [driver (driver.u/database->driver database)
schema-names (when (driver.u/supports? driver :schemas database)
(sync-util/db->sync-schemas database))
(sync-util/sync-schemas database))
fk-metadata (fetch-metadata/fk-metadata database :schema-names schema-names)]
(transduce (map (fn [x]
(let [[updated failed] (try [(mark-fk! database x) 0]
......
......@@ -51,6 +51,8 @@
"Sync the indexes for all tables in `database` if the driver supports storing index info."
[database]
(if (driver.u/supports? (driver.u/database->driver database) :index-info database)
(apply merge-with + empty-stats
(map #(maybe-sync-indexes-for-table! database %) (sync-util/db->sync-tables database)))
(transduce (map #(maybe-sync-indexes-for-table! database %))
(partial merge-with +)
empty-stats
(sync-util/reducible-sync-tables database))
empty-stats))
......@@ -19,7 +19,8 @@
[metabase.util.malli :as mu]
[metabase.util.malli.registry :as mr]
[metabase.util.malli.schema :as ms]
[toucan2.core :as t2])
[toucan2.core :as t2]
[toucan2.realize :as t2.realize])
(:import
(java.time.temporal Temporal)))
......@@ -332,21 +333,38 @@
(into [:and] (for [[k v] sync-tables-kv-args]
[:= k v])))
(defn db->sync-tables
"Returns all the Tables that have their metadata sync'd for `database-or-id`."
[database-or-id]
(t2/select :model/Table, :db_id (u/the-id database-or-id), {:where sync-tables-clause}))
(defn db->reducible-sync-tables
(defn reducible-sync-tables
"Returns a reducible of all the Tables that should go through the sync processes for `database-or-id`."
[database-or-id & {:keys [schema-names table-names]}]
(t2/reducible-select :model/Table
:db_id (u/the-id database-or-id)
{:where [:and sync-tables-clause
(when (seq schema-names) [:in :schema schema-names])
(when (seq table-names) [:in :name table-names])]}))
(eduction (map t2.realize/realize)
(t2/reducible-select :model/Table
:db_id (u/the-id database-or-id)
{:where [:and sync-tables-clause
(when (seq schema-names) [:in :schema schema-names])
(when (seq table-names) [:in :name table-names])]})))
(defn sync-tables-count
"The count of all tables that should be synced for `database-or-id`."
[database-or-id]
(t2/count :model/Table :db_id (u/the-id database-or-id) {:where sync-tables-clause}))
(defn db->sync-schemas
(defn refingerprint-reducible-sync-tables
"A reducible collection of all the Tables that should go through the sync processes for `database-or-id`, in the
order they should be refingerprinted (by earliest last_analyzed timestamp)."
[database-or-id]
(eduction (map t2.realize/realize)
(t2/reducible-select :model/Table
{:select [:t.*]
:from [[(t2/table-name :model/Table) :t]]
:left-join [[{:select [:table_id
[[:min :last_analyzed] :earliest_last_analyzed]]
:from [(t2/table-name :model/Field)]
:group-by [:table_id]} :sub]
[:= :t.id :sub.table_id]]
:where [:and sync-tables-clause [:= :t.db_id (u/the-id database-or-id)]]
:order-by [[:sub.earliest_last_analyzed :asc]]})))
(defn sync-schemas
"Returns all the Schemas that have their metadata sync'd for `database-or-id`.
Assumes the database supports schemas."
[database-or-id]
......
......@@ -76,7 +76,7 @@
:global {:distinct-count 3}}
:last_analyzed nil}]
(is (nil? (:semantic_type (t2/select-one Field :id (u/the-id field)))))
(classify/classify-fields-for-db! db [table] (constantly nil))
(classify/classify-fields-for-db! db (constantly nil))
(is (= :type/Income (:semantic_type (t2/select-one Field :id (u/the-id field))))))))
(deftest classify-decimal-fields-test
......@@ -94,5 +94,5 @@
:global {:distinct-count 3}}
:last_analyzed nil}]
(is (nil? (:semantic_type (t2/select-one Field :id (u/the-id field)))))
(classify/classify-fields-for-db! db [table] (constantly nil))
(classify/classify-fields-for-db! db (constantly nil))
(is (= :type/Income (:semantic_type (t2/select-one Field :id (u/the-id field))))))))
......@@ -268,17 +268,19 @@
:fingerprint nil
:fingerprint_version 1
:last_analyzed #t "2017-08-09T00:00:00"}]
(binding [i/*latest-fingerprint-version* 3]
(with-redefs [qp/process-query (fn [_query rff]
(transduce identity (rff :metadata) [[1] [2] [3] [4] [5]]))
fingerprinters/fingerprinter (constantly (fingerprinters/constant-fingerprinter {:experimental {:fake-fingerprint? true}}))]
(is (= {:no-data-fingerprints 0, :failed-fingerprints 0,
:updated-fingerprints 1, :fingerprints-attempted 1}
(#'sync.fingerprint/fingerprint-table! (t2/select-one Table :id (data/id :venues)) [field])))
(is (= {:fingerprint {:experimental {:fake-fingerprint? true}}
:fingerprint_version 3
:last_analyzed nil}
(into {} (t2/select-one [Field :fingerprint :fingerprint_version :last_analyzed] :id (u/the-id field))))))))))
(binding [i/*latest-fingerprint-version* 3]
(with-redefs [qp/process-query (fn [_query rff]
(transduce identity (rff :metadata) [[1] [2] [3] [4] [5]]))
fingerprinters/fingerprinter (constantly (fingerprinters/constant-fingerprinter {:experimental {:fake-fingerprint? true}}))]
(is (= {:no-data-fingerprints 0
:failed-fingerprints 0
:updated-fingerprints 1
:fingerprints-attempted 1}
(#'sync.fingerprint/fingerprint-table! (t2/select-one Table :id (data/id :venues)) [field])))
(is (= {:fingerprint {:experimental {:fake-fingerprint? true}}
:fingerprint_version 3
:last_analyzed nil}
(into {} (t2/select-one [Field :fingerprint :fingerprint_version :last_analyzed] :id (u/the-id field))))))))))
(deftest test-fingerprint-failure
(testing "if fingerprinting fails, the exception should not propagate"
......@@ -318,10 +320,9 @@
(testing "refingerprints up to a limit"
(with-redefs [sync.fingerprint/save-fingerprint! (constantly nil)
sync.fingerprint/max-refingerprint-field-count 31] ;; prime number so we don't have exact matches
(let [table (t2/select-one Table :id (mt/id :checkins))
results (sync.fingerprint/refingerprint-fields-for-db! (mt/db)
(repeat (* @#'sync.fingerprint/max-refingerprint-field-count 2) table)
(constantly nil))
(let [results (sync.fingerprint/refingerprint-fields-for-db!
(mt/db)
(constantly nil))
attempted (:fingerprints-attempted results)]
;; it can exceed the max field count as our resolution is after each table check it.
(is (<= @#'sync.fingerprint/max-refingerprint-field-count attempted))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment