Skip to content
Snippets Groups Projects
Unverified Commit d392a498 authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

Implement SQL Analysis worker (#45549)

parent 386797b2
No related branches found
No related tags found
No related merge requests found
Showing
with 372 additions and 149 deletions
......@@ -3,45 +3,48 @@
[clojure.string :as str]
[clojure.test :refer :all]
[mb.hawk.assert-exprs.approximately-equal :as hawk.approx]
[metabase.query-analysis :as query-analysis]
[metabase.test :as mt]
[ring.util.codec :as codec]
[toucan2.tools.with-temp :as t2.with-temp]))
(defn- do-with-test-setup [f]
(t2.with-temp/with-temp [:model/Table {table :id} {:name "T"}
:model/Collection {coll-1 :id} {:name "ZZX"}
:model/Collection {coll-2 :id} {:name "ZZY"}
:model/Collection {coll-3 :id} {:name "ZZZ"}
:model/Card {card-1 :id} {:name "A" :collection_id coll-1}
:model/Card {card-2 :id} {:name "B" :collection_id coll-2}
:model/Card {card-3 :id} {:name "C" :collection_id coll-3}
:model/Card {card-4 :id} {:name "D"}
:model/Field {field-1 :id} {:active false
:name "FA"
:table_id table}
:model/Field {field-2 :id} {:active false
:name "FB"
:table_id table}
:model/Field {field-3 :id} {:active false
:name "FC"
:table_id table}
;; QFs not to include:
;; - Field is still active
:model/QueryField {} {:card_id card-1
:field_id (mt/id :orders :tax)}
;; - Implicit reference
:model/QueryField {} {:card_id card-2
:field_id field-1
:explicit_reference false}
;; QFs to include:
:model/QueryField {qf-1 :id} {:card_id card-1
:field_id field-1}
:model/QueryField {qf-2 :id} {:card_id card-2
:field_id field-2}
:model/QueryField {qf-3 :id} {:card_id card-3
:field_id field-3}]
(mt/with-premium-features #{:query-field-validation}
(mt/call-with-map-params f [card-1 card-2 card-3 card-4 qf-1 qf-2 qf-3]))))
;; Make sure that no additional analysis is created by hooks
(query-analysis/without-analysis
(t2.with-temp/with-temp [:model/Table {table :id} {:name "T"}
:model/Collection {coll-1 :id} {:name "ZZX"}
:model/Collection {coll-2 :id} {:name "ZZY"}
:model/Collection {coll-3 :id} {:name "ZZZ"}
:model/Card {card-1 :id} {:name "A" :collection_id coll-1}
:model/Card {card-2 :id} {:name "B" :collection_id coll-2}
:model/Card {card-3 :id} {:name "C" :collection_id coll-3}
:model/Card {card-4 :id} {:name "D"}
:model/Field {field-1 :id} {:active false
:name "FA"
:table_id table}
:model/Field {field-2 :id} {:active false
:name "FB"
:table_id table}
:model/Field {field-3 :id} {:active false
:name "FC"
:table_id table}
;; QFs not to include:
;; - Field is still active
:model/QueryField {} {:card_id card-1
:field_id (mt/id :orders :tax)}
;; - Implicit reference
:model/QueryField {} {:card_id card-2
:field_id field-1
:explicit_reference false}
;; QFs to include:
:model/QueryField {qf-1 :id} {:card_id card-1
:field_id field-1}
:model/QueryField {qf-2 :id} {:card_id card-2
:field_id field-2}
:model/QueryField {qf-3 :id} {:card_id card-3
:field_id field-3}]
(mt/with-premium-features #{:query-field-validation}
(mt/call-with-map-params f [card-1 card-2 card-3 card-4 qf-1 qf-2 qf-3])))))
(defmacro ^:private with-test-setup
"Creates some non-stale QueryFields and anaphorical provides stale QueryField IDs called `qf-{1-3}` and their
......@@ -114,7 +117,7 @@
(deftest pagination-test
(testing "Lets you page results"
(with-test-setup
(resp= {:total 4
(resp= {:total 3
:limit 2
:offset 0
:data
......@@ -131,7 +134,7 @@
:name "C"}
{:id card-4
:name "D"}])
(resp= {:total 4
(resp= {:total 3
:limit 1
:offset 2
:data
......
......@@ -84,9 +84,13 @@
(defn config-bool "Fetch a configuration key and parse it as a boolean." ^Boolean [k] (some-> k config-str Boolean/parseBoolean))
(defn config-kw "Fetch a configuration key and parse it as a keyword." ^Keyword [k] (some-> k config-str keyword))
(def ^Boolean is-dev? "Are we running in `dev` mode (i.e. in a REPL or via `clojure -M:run`)?" (= :dev (config-kw :mb-run-mode)))
(def ^Boolean is-prod? "Are we running in `prod` mode (i.e. from a JAR)?" (= :prod (config-kw :mb-run-mode)))
(def ^Boolean is-test? "Are we running in `test` mode (i.e. via `clojure -X:test`)?" (= :test (config-kw :mb-run-mode)))
(def run-mode
"The mode in which Metabase is being run"
(config-kw :mb-run-mode))
(def ^Boolean is-dev? "Are we running in `dev` mode (i.e. in a REPL or via `clojure -M:run`)?" (= :dev run-mode))
(def ^Boolean is-prod? "Are we running in `prod` mode (i.e. from a JAR)?" (= :prod run-mode))
(def ^Boolean is-test? "Are we running in `test` mode (i.e. via `clojure -X:test`)?" (= :test run-mode))
;;; Version stuff
......
......@@ -533,7 +533,7 @@
(log/info "Card references Fields in params:" field-ids)
(field-values/update-field-values-for-on-demand-dbs! field-ids))
(parameter-card/upsert-or-delete-from-parameters! "card" (:id card) (:parameters card))
(query-analysis/update-query-analysis-for-card! card)))
(query-analysis/analyze-async! card)))
(t2/define-before-update :model/Card
[{:keys [verified-result-metadata?] :as card}]
......@@ -561,7 +561,7 @@
[card]
(u/prog1 card
(when (contains? (t2/changes card) :dataset_query)
(query-analysis/update-query-analysis-for-card! card))))
(query-analysis/analyze-async! card))))
;; Cards don't normally get deleted (they get archived instead) so this mostly affects tests
(t2/define-before-delete :model/Card
......
......@@ -14,8 +14,6 @@
(defn update-query-fields-for-card!
"Clears QueryFields associated with this card and creates fresh, up-to-date-ones.
If you're invoking this from a test, be sure to turn on [[*parse-queries-in-test?*]].
Returns `nil` (and logs the error) if there was a parse error."
[card-id query-field-rows]
(t2/with-transaction [_conn]
......
......@@ -931,3 +931,10 @@ See [fonts](../configuring-metabase/fonts.md).")
:export? false
:default true
:type :boolean)
(defsetting query-analysis-enabled
(deferred-tru "Whether or not we analyze any queries at all")
:visibility :internal
:export? false
:default true
:type :boolean)
......@@ -12,29 +12,59 @@
[metabase.query-analysis.native-query-analyzer.replacement :as nqa.replacement]
[metabase.util :as u]
[metabase.util.log :as log]
[metabase.util.queue :as queue]
[toucan2.core :as t2]))
(def ^:dynamic *parse-queries-in-test?*
"Normally, a native card's query is parsed on every create/update. For most tests, this is an unnecessary
expense. Therefore, we skip parsing while testing unless this variable is turned on.
(set! *warn-on-reflection* true)
c.f. [[native-analysis-active?]]"
false)
(def ^:private realtime-queue-capacity 1000)
(defn- native-analysis-active?
"Should the query run? Either we're not testing or it's been explicitly turned on.
(defonce ^:private queue (queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? true}))
(def ^:dynamic *analyze-execution-in-dev?*
"Managing a background thread in the REPL is likely to confound and infuriate, especially when we're using it to run
tests. For this reason we run analysis on the main thread by default."
::immediate)
(def ^:dynamic *analyze-execution-in-test?*
"A card's query is normally analyzed on every create/update. For most tests, this is an unnecessary expense, hence
we disable analysis by default."
::disabled)
(defmacro with-execution*
"Override the default execution mode, except in prod."
[execution & body]
`(binding [*analyze-execution-in-dev?* ~execution
*analyze-execution-in-test?* ~execution]
~@body))
(defmacro with-queued-analysis
"Override the default execution mode to always use the queue. Does nothing in prod - only use this in tests."
[& body]
`(with-execution* ::queued ~@body))
(defmacro with-immediate-analysis
"Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests."
[& body]
`(with-execution* ::immediate ~@body))
(defmacro without-analysis
"Override the default execution mode to always use the current thread. Does nothing in prod - only use this in tests."
[& body]
`(with-execution* ::disabled ~@body))
(defn- execution []
(case config/run-mode
:prod ::queued
:dev *analyze-execution-in-dev?*
:test *analyze-execution-in-test?*))
c.f. [[*parse-queries-in-test?*]], [[public-settings/sql-parsing-enabled]]"
[]
(and (public-settings/sql-parsing-enabled)
(or (not config/is-test?)
*parse-queries-in-test?*)))
(defn enabled?
(defn enabled-type?
"Is analysis of the given query type enabled?"
[query-type]
(case query-type
:native (native-analysis-active?)
:native (public-settings/sql-parsing-enabled)
:query true
:mbql/query true
false))
......@@ -46,7 +76,7 @@
Does not track wildcards for queries rendered as tables afterwards."
[query]
(let [query-type (lib/normalized-query-type query)]
(when (enabled? query-type)
(when (enabled-type? query-type)
(case query-type
:native (try
(nqa/field-ids-for-native query)
......@@ -58,24 +88,19 @@
(defn update-query-analysis-for-card!
"Clears QueryFields associated with this card and creates fresh, up-to-date-ones.
If you're invoking this from a test, be sure to turn on [[*parse-queries-in-test?*]].
Returns `nil` (and logs the error) if there was a parse error."
[{card-id :id, query :dataset_query}]
(try
(let [{:keys [explicit implicit] :as res} (query-field-ids query)
id->row (fn [explicit? field-id]
{:card_id card-id
:field_id field-id
:explicit_reference explicit?})
query-field-rows (concat
(map (partial id->row true) explicit)
(map (partial id->row false) implicit))]
;; when the response is `nil`, it's a disabled parser, not unknown columns
(when (some? res)
(query-field/update-query-fields-for-card! card-id query-field-rows)))
(catch Exception e
(log/error e "Error updating query fields"))))
(let [{:keys [explicit implicit] :as res} (query-field-ids query)
id->row (fn [explicit? field-id]
{:card_id card-id
:field_id field-id
:explicit_reference explicit?})
query-field-rows (concat
(map (partial id->row true) explicit)
(map (partial id->row false) implicit))]
;; when the response is `nil`, it's a disabled parser, not unknown columns
(when (some? res)
(query-field/update-query-fields-for-card! card-id query-field-rows))))
(defn- replaced-inner-query-for-native-card
[query {:keys [fields tables] :as _replacement-ids}]
......@@ -126,3 +151,35 @@
:native (replaced-inner-query-for-native-card q replacements)
(throw (ex-info "We don't (yet) support replacing field and table refs in cards with MBQL queries"
{:card card :replacements replacements}))))
(defn analyze-card!
"Update the analysis for the given card, if it is active."
[card-id]
(let [card (t2/select-one [:model/Card :id :archived :dataset_query] card-id)]
(cond
(not card) (log/warnf "Card not found: %s" card-id)
(:archived card) (log/warnf "Skipping archived card: %s" card-id)
:else (log/infof "Performing query analysis for card %s" card-id))
(when (and card (not (:archived card)))
(update-query-analysis-for-card! card))))
(defn next-card-id!
"Get the id of the next card id to be analyzed. May block indefinitely, relies on producer."
[]
(queue/blocking-take! queue))
(defn- queue-or-analyze! [offer-fn! card-or-id]
(case (execution)
::immediate (analyze-card! (u/the-id card-or-id))
::queued (offer-fn! queue (u/the-id card-or-id))
::disabled nil))
(defn analyze-async!
"Asynchronously hand-off the given card for analysis, at a high priority."
[card-or-id]
(queue-or-analyze! queue/maybe-put! card-or-id))
(defn analyze-sync!
"Synchronously hand-off the given card for analysis, at a low priority. May block indefinitely, relies on consumer."
[card-or-id]
(queue-or-analyze! queue/blocking-put! card-or-id))
(ns metabase.task.analyze-queries
(:require
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.simple :as simple]
[clojurewerkz.quartzite.triggers :as triggers]
[metabase.public-settings :as public-settings]
[metabase.query-analysis :as query-analysis]
[metabase.task :as task]
[metabase.util :as u]
[metabase.util.log :as log])
(:import
(org.quartz DisallowConcurrentExecution)))
(set! *warn-on-reflection* true)
(def ^:private max-cpu-usage-ratio 0.2)
(def ^:private wait-ratio
(/ (- 1 max-cpu-usage-ratio) max-cpu-usage-ratio))
(def ^:private fail-wait-ms (* 2 1000))
(def ^:private min-wait-ms 5)
(def ^:private max-wait-ms (* 10 1000))
(defn- wait-proportional ^long [time-taken-ms]
(->> time-taken-ms
(* wait-ratio)
(max min-wait-ms)
(min max-wait-ms)))
(defn- wait-fail ^long [time-taken-ms]
(max fail-wait-ms (wait-proportional time-taken-ms)))
(defn- analyzer-loop! []
(while (public-settings/query-analysis-enabled)
(let [card-id (query-analysis/next-card-id!)
timer (u/start-timer)]
(try
(query-analysis/analyze-card! card-id)
(Thread/sleep (wait-proportional (u/since-ms timer)))
(catch Exception e
(log/errorf e "Error analysing and updating query for Card %" card-id)
(Thread/sleep (wait-fail (u/since-ms timer))))))))
(jobs/defjob ^{DisallowConcurrentExecution true
:doc "Analyze "}
QueryAnalyzer [_ctx]
(analyzer-loop!))
(defmethod task/init! ::BackfillQueryField [_]
(let [job (jobs/build
(jobs/of-type QueryAnalyzer)
(jobs/with-identity (jobs/key "metabase.task.analyze-queries.job")))
trigger (triggers/build
(triggers/with-identity (triggers/key "metabase.task.analyze-queries.trigger"))
(triggers/with-schedule
(simple/schedule (simple/with-interval-in-minutes 1)))
(triggers/start-now))]
(task/schedule-task! job trigger)))
......@@ -10,19 +10,21 @@
(set! *warn-on-reflection* true)
(defn- backfill-query-fields! []
(let [cards (t2/reducible-select :model/Card :id [:in {:from [[:report_card :c]]
:left-join [[:query_field :f] [:= :f.card_id :c.id]]
:select [:c.id]
:where [:and
[:not :c.archived]
[:= :f.id nil]]}])]
(run! query-analysis/update-query-analysis-for-card! cards)))
(defn- backfill-missing-query-fields!
([]
(backfill-missing-query-fields! query-analysis/analyze-sync!))
([analyze-fn]
(let [cards (t2/reducible-select [:model/Card :id]
{:left-join [[:query_field :f] [:= :f.card_id :report_card.id]]
:where [:and
[:not :report_card.archived]
[:= :f.id nil]]})]
(run! analyze-fn cards))))
(jobs/defjob ^{DisallowConcurrentExecution true
:doc "Backfill QueryField for cards created earlier. Runs once per instance."}
BackfillQueryField [_ctx]
(backfill-query-fields!))
(backfill-missing-query-fields!))
(defmethod task/init! ::BackfillQueryField [_]
(let [job (jobs/build
......
......@@ -392,10 +392,6 @@
[db schema-name]
(nil? (can-create-upload-error db schema-name)))
(defn- start-timer [] (System/nanoTime))
(defn- since-ms [timer] (/ (- (System/nanoTime) timer) 1e6))
;;; +-----------------------------------------
;;; | public interface for creating CSV table
;;; +-----------------------------------------
......@@ -470,7 +466,7 @@
(check-can-create-upload database schema-name)
(collection/check-write-perms-for-collection collection-id)
(try
(let [timer (start-timer)
(let [timer (u/start-timer)
filename-prefix (or (second (re-matches #"(.*)\.(csv|tsv)$" filename))
filename)
humanized-name (humanization/name->human-readable-name filename-prefix)
......@@ -497,7 +493,7 @@
:name card-name
:visualization_settings {}}
@api/*current-user*)
upload-seconds (/ (since-ms timer) 1e3)
upload-seconds (/ (u/since-ms timer) 1e3)
stats (assoc stats :upload-seconds upload-seconds)]
(events/publish-event! :event/upload-create
......@@ -624,7 +620,7 @@
(try
(let [parse (infer-parser file)]
(with-open [reader (bom/bom-reader file)]
(let [timer (start-timer)
(let [timer (u/start-timer)
driver (driver.u/database->driver database)
auto-pk? (auto-pk-column? driver database)
[header & rows] (cond-> (parse reader)
......@@ -661,7 +657,7 @@
:num-columns (count new-types)
:generated-columns (if create-auto-pk? 1 0)
:size-mb (file-size-mb file)
:upload-seconds (since-ms timer)}]
:upload-seconds (u/since-ms timer)}]
(try
(when replace-rows?
(driver/truncate! driver (:id database) (table-identifier table)))
......
......@@ -1042,3 +1042,15 @@
(let [buf (js/Uint8Array. max-length-bytes)
result (.encodeInto (js/TextEncoder.) s buf)] ;; JS obj {read: chars_converted, write: bytes_written}
(subs s 0 (.-read result)))))
#?(:clj
(defn start-timer
"Start and return a timer. Treat the \"timer\" as an opaque object, the implementation may change."
[]
(System/nanoTime)))
#?(:clj
(defn since-ms
"Return how many milliseconds have elapsed since the given timer was started."
[timer]
(/ (- (System/nanoTime) timer) 1e6)))
......@@ -6,13 +6,15 @@
(set! *warn-on-reflection* true)
(defprotocol BoundedTransferQueue
(maybePut! [queue msg]
(maybe-put! [queue msg]
"Put a message on the queue if there is space for it, otherwise drop it.
Returns whether the item was enqueued.")
(blockingPut! [queue msg]
(blocking-put! [queue msg]
"Put a message on the queue. If necessary, block until there is space for it.")
(blockingTake! [queue]
"Take a message off the queue, blocking if necessary."))
(blocking-take! [queue]
"Take a message off the queue, blocking if necessary.")
(clear! [queue]
"Discard all messages on the given queue."))
;; Similar to java.util.concurrent.LinkedTransferQueue, but bounded.
(deftype ^:private ArrayTransferQueue
......@@ -21,16 +23,19 @@
^long block-ms
^long sleep-ms]
BoundedTransferQueue
(maybePut! [_ msg]
(maybe-put! [_ msg]
(.offer async-queue msg))
(blockingPut! [_ msg]
(blocking-put! [_ msg]
(.offer sync-queue msg Long/MAX_VALUE TimeUnit/DAYS))
(blockingTake! [_]
(blocking-take! [_]
;; Async messages are given higher priority, as sync messages will never be dropped.
(or (.poll async-queue)
(.poll sync-queue block-ms TimeUnit/MILLISECONDS)
(do (Thread/sleep ^long sleep-ms)
(recur)))))
(recur))))
(clear! [_]
(.clear sync-queue)
(.clear async-queue)))
;; Similar to ArrayTransferQueue, but drops events that are already in the queue.
(deftype ^:private DeduplicatingArrayTransferQueue
......@@ -40,7 +45,7 @@
^long block-ms
^long sleep-ms]
BoundedTransferQueue
(maybePut!
(maybe-put!
[_ msg]
(let [payload (:payload msg msg)]
;; we hold the lock while we push to avoid races
......@@ -51,11 +56,11 @@
(when-not accepted?
(.remove queued-set payload))
accepted?)))))
(blockingPut! [_ msg]
(blocking-put! [_ msg]
;; we cannot hold the lock while we push, so there is some chance of a duplicate
(when (locking queued-set (.add queued-set (:payload msg msg)))
(.offer sync-queue msg Long/MAX_VALUE TimeUnit/DAYS)))
(blockingTake! [_]
(blocking-take! [_]
;; we lock here to avoid leaving a blocking entry behind that can never be cleared
(or (locking queued-set
(when-let [msg (or (.poll ^Queue async-queue)
......@@ -63,7 +68,12 @@
(.remove queued-set (:payload msg msg))
msg))
(do (Thread/sleep ^long sleep-ms)
(recur)))))
(recur))))
(clear! [_]
(locking queued-set
(.clear sync-queue)
(.clear async-queue)
(.clear queued-set))))
(defn bounded-transfer-queue
"Create a bounded transfer queue, specialized based on the high-level options."
......
......@@ -18,7 +18,7 @@
:card_id card-id))
(defn- do-with-test-setup [f]
(binding [query-analysis/*parse-queries-in-test?* true]
(query-analysis/with-immediate-analysis
(let [table-id (mt/id :orders)
tax-id (mt/id :orders :tax)
total-id (mt/id :orders :total)]
......@@ -41,12 +41,10 @@
(defn- trigger-parse!
"Update the card to an arbitrary query; defaults to querying the two columns that do exist: TAX and TOTAL"
([card-id]
(trigger-parse! card-id "SELECT TAX, TOTAL FROM orders"))
([card-id query]
(if (string? query)
(t2/update! :model/Card card-id {:dataset_query (mt/native-query {:query query})})
(t2/update! :model/Card card-id {:dataset_query query}))))
[card-id query]
(if (string? query)
(t2/update! :model/Card card-id {:dataset_query (mt/native-query {:query query})})
(t2/update! :model/Card card-id {:dataset_query query})))
;;;;
;;;; Actual tests
......@@ -54,11 +52,11 @@
(deftest query-fields-created-by-queries-test
(with-test-setup
(let [total-qf {:card_id card-id
:field_id total-id
(let [total-qf {:card_id card-id
:field_id total-id
:explicit_reference true}
tax-qf {:card_id card-id
:field_id tax-id
tax-qf {:card_id card-id
:field_id tax-id
:explicit_reference true}]
(testing "A freshly created card has relevant corresponding QueryFields"
......@@ -66,7 +64,7 @@
(query-fields-for-card card-id))))
(testing "Adding new columns to the query also adds the QueryFields"
(trigger-parse! card-id)
(trigger-parse! card-id "SELECT tax, total FROM orders")
(is (= #{tax-qf total-qf}
(query-fields-for-card card-id))))
......@@ -77,7 +75,7 @@
(testing "Columns referenced via field filters are still found"
(trigger-parse! card-id
(mt/native-query {:query "SELECT tax FROM orders WHERE {{adequate_total}}"
(mt/native-query {:query "SELECT tax FROM orders WHERE {{adequate_total}}"
:template-tags {"adequate_total"
{:type :dimension
:name "adequate_total"
......
......@@ -14,19 +14,17 @@
(mt/discard-setting-changes [sql-parsing-enabled]
(testing "sql parsing enabled"
(public-settings/sql-parsing-enabled! true)
(binding [query-analysis/*parse-queries-in-test?* true]
(is (true? (query-analysis/enabled? :native)))))
(is (true? (query-analysis/enabled-type? :native))))
(testing "sql parsing disabled"
(public-settings/sql-parsing-enabled! false)
(binding [query-analysis/*parse-queries-in-test?* true]
(is (false? (query-analysis/enabled? :native)))))))
(is (false? (query-analysis/enabled-type? :native))))))
(deftest non-native-query-enabled-test
(testing "mbql parsing is always enabled"
(is (query-analysis/enabled? :query))
(is (query-analysis/enabled? :mbql/query)))
(is (query-analysis/enabled-type? :query))
(is (query-analysis/enabled-type? :mbql/query)))
(testing "other types are disabled"
(is (false? (query-analysis/enabled? :unexpected)))))
(is (false? (query-analysis/enabled-type? :unexpected)))))
(deftest parse-mbql-test
(testing "Parsing MBQL query returns correct used fields"
......
(ns metabase.task.analyze-queries-test
(:require
[clojure.test :refer :all]
[metabase.lib.core :as lib]
[metabase.lib.metadata :as lib.metadata]
[metabase.lib.metadata.jvm :as lib.metadata.jvm]
[metabase.models :refer [Card]]
[metabase.query-analysis :as query-analysis]
[metabase.task.analyze-queries :as task.analyze-queries]
[metabase.test :as mt]
[metabase.util :as u]
[metabase.util.queue :as queue]
[toucan2.core :as t2]))
(deftest analyzer-loop-test
(let [metadata-provider (lib.metadata.jvm/application-database-metadata-provider (mt/id))
venues (lib.metadata/table metadata-provider (mt/id :venues))
venues-name (lib.metadata/field metadata-provider (mt/id :venues :name))
mlv2-query (-> (lib/query metadata-provider venues)
(lib/aggregate (lib/distinct venues-name)))]
(mt/with-temp [Card c1 {:query_type "native"
:dataset_query (mt/native-query {:query "SELECT id FROM venues"})}
Card c2 {:query_type "native"
:dataset_query (mt/native-query {:query "SELECT id FROM venues WHERE name = {{ name }}"
:template-tags {"name" {:id "_name_"
:type :text
:display-name "name"
:default "qwe"}}})}
Card c3 {:query_type "query"
:dataset_query (mt/mbql-query venues {:aggregation [[:distinct $name]]})}
Card c4 {:query_type "query"
:dataset_query mlv2-query}
Card arch {:archived true
:query_type "native"
:dataset_query (mt/native-query {:query "SELECT id FROM venues"})}]
(let [card-ids (map :id [c1 c2 c3 c4 arch])]
;; emulate "cards created before QueryField existed"
(t2/delete! :model/QueryField :card_id [:in card-ids])
(queue/clear! @#'query-analysis/queue)
;; `(first (vals %))` is necessary since h2 generates `:count(id)` as a name for the column
(let [get-count #(t2/select-one-fn (comp first vals) [:model/QueryField [[:count :id]]] :card_id %)]
(testing "QueryField is empty - queries weren't analyzed"
(is (every? zero? (map get-count card-ids))))
;; queue the cards
(query-analysis/with-queued-analysis
(run! query-analysis/analyze-async! card-ids))
;; run the analysis for 1s
(try
(u/with-timeout 1000
(#'task.analyze-queries/analyzer-loop!))
(catch Exception _))
(testing "QueryField is filled now"
(testing "for a native query"
(is (pos? (get-count (:id c1)))))
(testing "for a native query with template tags"
(is (pos? (get-count (:id c2)))))
(testing "for an MBQL"
(is (pos? (get-count (:id c3)))))
(testing "for an MLv2"
(is (pos? (get-count (:id c4)))))
(testing "but not for an archived card"
(is (zero? (get-count (:id arch)))))))))))
(ns metabase.task.backfill-query-fields-test
(:require
[clojure.set :as set]
[clojure.test :refer :all]
[metabase.lib.core :as lib]
[metabase.lib.metadata :as lib.metadata]
......@@ -8,14 +9,17 @@
[metabase.query-analysis :as query-analysis]
[metabase.task.backfill-query-fields :as backfill]
[metabase.test :as mt]
[metabase.util.queue :as queue]
[toucan2.core :as t2]))
(deftest backfill-query-field-test
(let [metadata-provider (lib.metadata.jvm/application-database-metadata-provider (mt/id))
venues (lib.metadata/table metadata-provider (mt/id :venues))
venues-name (lib.metadata/field metadata-provider (mt/id :venues :name))
mlv2-query (-> (lib/query metadata-provider venues)
(lib/aggregate (lib/distinct venues-name)))]
(mt/with-temp [Card c1 {:query_type "native"
:dataset_query (mt/native-query {:query "SELECT id FROM venues"})}
Card c2 {:query_type "native"
......@@ -32,27 +36,31 @@
:query_type "native"
:dataset_query (mt/native-query {:query "SELECT id FROM venues"})}]
;; emulate "cards created before QueryField existed"
;; Make sure there is no existing analysis for the relevant cards
(t2/delete! :model/QueryField :card_id [:in (map :id [c1 c2 c3 c4 arch])])
;; `(first (vals %))` is necessary since h2 generates `:count(id)` as a name for the column
(let [get-count #(t2/select-one-fn (comp first vals) [:model/QueryField [[:count :id]]] :card_id %)]
(testing "QueryField is empty - queries weren't analyzed"
(is (zero? (get-count (:id c1))))
(is (zero? (get-count (:id c2))))
(is (zero? (get-count (:id c3))))
(is (zero? (get-count (:id c4))))
(is (zero? (get-count (:id arch)))))
(binding [query-analysis/*parse-queries-in-test?* true]
(#'backfill/backfill-query-fields!))
(testing "QueryField is filled now"
(testing "for a native query"
(is (pos? (get-count (:id c1)))))
(testing "for a native query with template tags"
(is (pos? (get-count (:id c2)))))
(testing "for an MBQL"
(is (pos? (get-count (:id c3)))))
(testing "for an MLv2"
(is (pos? (get-count (:id c4)))))
(testing "but not for an archived card"
(is (zero? (get-count (:id arch))))))))))
;; Make sure some other card has analysis
(testing "There is at least one card with existing analysis"
(query-analysis/analyze-card! (:id c3))
(is (pos? (count (t2/select :model/QueryField :card_id (:id c3))))))
(let [queued-ids (atom #{})
expected-ids (into #{} (map :id) [c1 c2 c4])]
;; Run the backfill with a mocked out publisher
(#'backfill/backfill-missing-query-fields!
#(swap! queued-ids conj (:id %)))
(testing "The expected cards were all sent to the analyzer"
(is (= expected-ids (set/intersection expected-ids @queued-ids))))
(testing "The card with existing analysis was not sent to the analyzer again"
(is (not (@queued-ids (:id c3)))))))))
(comment
(set! *warn-on-reflection* true)
(queue/clear! @#'query-analysis/queue)
(.-queued-set @#'query-analysis/queue)
(.peek (.-async-queue @#'query-analysis/queue))
(.peek (.-sync-queue @#'query-analysis/queue)))
......@@ -19,13 +19,13 @@
realtime-fn (fn []
(let [id (rand-int 1000)]
(doseq [e realtime-events]
(case (queue/maybePut! queue {:thread (str "real-" id) :payload e})
(case (queue/maybe-put! queue {:thread (str "real-" id) :payload e})
true (swap! sent inc)
false (swap! dropped inc)
nil (swap! skipped inc)))))
background-fn (fn []
(doseq [e backfill-events]
(queue/blockingPut! queue {:thread "back", :payload e})))
(queue/blocking-put! queue {:thread "back", :payload e})))
run! (fn [f]
(future (f)))]
......@@ -39,7 +39,7 @@
(while true
;; Stop the consumer once we are sure that there are no more events coming.
(u/with-timeout 100
(vswap! processed conj (:payload (queue/blockingTake! queue)))
(vswap! processed conj (:payload (queue/blocking-take! queue)))
;; Sleep to provide some backpressure
(Thread/sleep 1)))
(assert false "this is never reached")
......@@ -66,7 +66,8 @@
:realtime-events realtime-events)]
(testing "We processed all the events that were enqueued"
(is (= (count processed) (+ (count backfill-events) sent))))
(is (= (+ (count backfill-events) sent)
(count processed))))
(if dedupe?
(testing "Some items are deduplicated"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment