Skip to content
Snippets Groups Projects
Unverified Commit ce9b57be authored by Chris Truter's avatar Chris Truter Committed by GitHub
Browse files

Docstrings and refinements to the query-analysis module (#46023)

parent 0cbd98df
Branches
Tags
No related merge requests found
(ns metabase.query-analysis
"This module handles the analysis of queries, which determines their data dependencies.
It also is used to audit these dependencies for issues - for example, making use of column that no longer exists.
Analysis is typically performed on a background worker thread, and the [[analyze-async!]] method is used to add cards
to the corresponding queue."
(:require
[clojure.set :as set]
[medley.core :as m]
......@@ -17,23 +21,28 @@
(set! *warn-on-reflection* true)
(def ^:private realtime-queue-capacity 1000)
(def ^:private realtime-queue-capacity
"The maximum number of cards which can be queued for async analysis. When exceeded, additional cards will be dropped."
1000)
(def ^:private worker-queue (queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? false}))
(def ^:private worker-queue
"The in-memory queue used to throttle analysis and reduce the chance of race conditions."
(queue/bounded-transfer-queue realtime-queue-capacity {:dedupe? false}))
(def ^:dynamic *analyze-execution-in-dev?*
"Managing a background thread in the REPL is likely to confound and infuriate, especially when we're using it to run
tests. For this reason, we run analysis on the main thread by default."
"Managing a background thread in the REPL is likely to confuse and infuriate, especially when running tests.
For this reason, we run analysis on the main thread by default."
::immediate)
(def ^:dynamic *analyze-execution-in-test?*
"A card's query is normally analyzed on every create/update. For most tests, this is an unnecessary expense, hence
we disable analysis by default."
"A card's query is normally analyzed on every create/update.
For most tests, this is an unnecessary expense; hence we disable analysis by default."
::disabled)
(defmacro with-execution*
"Override the default execution mode, except in prod."
[execution & body]
(assert (not config/is-prod?))
`(binding [*analyze-execution-in-dev?* ~execution
*analyze-execution-in-test?* ~execution]
~@body))
......@@ -53,56 +62,60 @@
[& body]
`(with-execution* ::disabled ~@body))
(defn- execution []
(defn- execution
"The execution strategy for analysis, which can be overridden in dev and tests. In production, it is always async."
[]
(case config/run-mode
:prod ::queued
:dev *analyze-execution-in-dev?*
:test *analyze-execution-in-test?*))
(defn enabled-type?
"Is analysis of the given query type enabled?"
[query-type]
(case query-type
:native (public-settings/sql-parsing-enabled)
:query true
:mbql/query true
false))
(and (public-settings/query-analysis-enabled)
(case query-type
:native (public-settings/sql-parsing-enabled)
:query true
:mbql/query true
false)))
(defn- query-field-ids
"Find out ids of all fields used in a query. Conforms to the same protocol as [[query-analyzer/field-ids-for-sql]],
so returns `{:explicit #{...int ids}}` map.
Does not track wildcards for queries rendered as tables afterwards."
[query]
(let [query-type (lib/normalized-query-type query)]
(when (enabled-type? query-type)
(case query-type
:native (try
(nqa/field-ids-for-native query)
(catch Exception e
(log/error e "Error parsing SQL" query)))
:query {:explicit (mbql.u/referenced-field-ids query)}
:mbql/query {:explicit (lib.util/referenced-field-ids query)}))))
Does not track wildcards for queries rendered as tables afterward."
([query]
(query-field-ids query (lib/normalized-query-type query)))
([query query-type]
(case query-type
:native (try
(nqa/field-ids-for-native query)
(catch Exception e
(log/error e "Error parsing SQL" query)))
:query {:explicit (mbql.u/referenced-field-ids query)}
:mbql/query {:explicit (lib.util/referenced-field-ids query)})))
(defn update-query-analysis-for-card!
"Clears QueryFields associated with this card and creates fresh, up-to-date-ones.
Returns `nil` (and logs the error) if there was a parse error."
Returns `nil` (and logs the error) if there was a parse error.
Returns `nil` and leaves the database records as-is if analysis is disabled for the given query type."
[{card-id :id, query :dataset_query}]
(let [{:keys [explicit implicit] :as res} (query-field-ids query)
id->row (fn [explicit? field-id]
{:card_id card-id
:field_id field-id
:explicit_reference explicit?})
query-field-rows (concat
(map (partial id->row true) explicit)
(map (partial id->row false) implicit))]
;; when the response is `nil`, it's a disabled parser, not unknown columns
(when (some? res)
(query-field/update-query-fields-for-card! card-id query-field-rows))))
(let [query-type (lib/normalized-query-type query)]
(when (enabled-type? query-type)
(let [{:keys [explicit implicit]} (query-field-ids query)
id->row (fn [explicit? field-id]
{:card_id card-id
:field_id field-id
:explicit_reference explicit?})
query-field-rows (concat
(map (partial id->row true) explicit)
(map (partial id->row false) implicit))]
(query-field/update-query-fields-for-card! card-id query-field-rows)))))
(defn- replaced-inner-query-for-native-card
"Substitute new references for certain fields and tables, based upon the given mappings."
[query {:keys [fields tables] :as _replacement-ids}]
(let [keyvals-set #(set/union (set (keys %))
(set (vals %)))
......@@ -153,14 +166,14 @@
{:card card :replacements replacements}))))
(defn ->analyzable
"Ensure that we have all the fields required for analysis."
"Given a partial card or its id, ensure that we have all the fields required for analysis."
[card-or-id]
(if (and (map? card-or-id) (every? (partial contains? card-or-id) [:id :archived :dataset_query]))
card-or-id
(t2/select-one [:model/Card :id :archived :dataset_query] (u/the-id card-or-id))))
(defn analyze-card!
"Update the analysis for the given card if it is active."
"Update the analysis for a given card if it is active. Should only be called from [[metabase.task.analyze-queries]]."
[card-or-id]
(let [card (->analyzable card-or-id)
card-id (:id card)]
......@@ -172,7 +185,8 @@
(update-query-analysis-for-card! card))))
(defn next-card-id!
"Get the id of the next card id to be analyzed. May block indefinitely, relies on producer."
"Get the id of the next card id to be analyzed. May block indefinitely, relies on producer.
Should only be called from [[metabase.task.analyze-queries]]."
([]
(next-card-id! worker-queue))
([queue]
......@@ -180,14 +194,16 @@
([queue timeout]
(queue/blocking-take! queue timeout)))
(defn- queue-or-analyze! [offer-fn! card-or-id]
(defn- queue-or-analyze!
"Indirection used to modify the execution strategy for analysis in dev and tests."
[offer-fn! card-or-id]
(case (execution)
::immediate (analyze-card! (u/the-id card-or-id))
::queued (offer-fn! (u/the-id card-or-id))
::disabled nil))
(defn analyze-async!
"Asynchronously hand-off the given card for analysis, at a high priority."
"Asynchronously hand-off the given card for analysis, at a high priority. This is typically the method you want."
([card-or-id]
(analyze-async! worker-queue card-or-id))
([queue card-or-id]
......
(ns metabase.task.analyze-queries
"The background worker which performs the analysis of queries, and updates the database in accordance.
Restricts the CPU and database load corresponding to this analysis via a crude rate limiting algorithm that puts the
worker to sleep such that it is active at most [[max-cpu-usage-fraction]] of the time."
(:require
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.simple :as simple]
......@@ -14,10 +17,10 @@
(set! *warn-on-reflection* true)
(def ^:private max-cpu-usage-ratio 0.2)
(def ^:private max-cpu-usage-fraction 0.2)
(def ^:private wait-ratio
(/ (- 1 max-cpu-usage-ratio) max-cpu-usage-ratio))
(/ (- 1 max-cpu-usage-fraction) max-cpu-usage-fraction))
(def ^:private fail-wait-ms (* 2 1000))
......
(ns metabase.task.sweep-query-analysis
"A background worker making sure that analyze the queries for all active cards, and that it is up-to-date."
(:require
[clojurewerkz.quartzite.jobs :as jobs]
[clojurewerkz.quartzite.schedule.cron :as cron]
[clojurewerkz.quartzite.triggers :as triggers]
[metabase.public-settings :as public-settings]
[metabase.query-analysis :as query-analysis]
[metabase.task :as task]
[toucan2.core :as t2])
......@@ -72,7 +74,8 @@
(jobs/defjob ^{DisallowConcurrentExecution true
:doc "Backfill QueryField for cards created earlier. Runs once per instance."}
SweepQueryAnalysis [_ctx]
(sweep-query-analysis-loop!))
(when (public-settings/query-analysis-enabled)
(sweep-query-analysis-loop!)))
(defmethod task/init! ::SweepQueryAnalysis [_]
(let [job (jobs/build
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment