From 97594871e9ca0e384ddd5e6ff44e7d49433d1d34 Mon Sep 17 00:00:00 2001 From: Chris Truter <crisptrutski@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:11:14 +0200 Subject: [PATCH] Use message's query for async updates (#46508) --- src/metabase/models/card.clj | 6 +++--- src/metabase/query_analysis.clj | 14 ++++++++------ src/metabase/task/analyze_queries.clj | 11 ++++++----- src/metabase/task/sweep_query_analysis.clj | 1 + test/metabase/models/query_analysis_test.clj | 4 +--- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/metabase/models/card.clj b/src/metabase/models/card.clj index b558bebc684..c098d2930ed 100644 --- a/src/metabase/models/card.clj +++ b/src/metabase/models/card.clj @@ -507,10 +507,10 @@ (params/assert-valid-parameters changes) (params/assert-valid-parameter-mappings changes) (update-parameters-using-card-as-values-source changes) - ;; TODO: should be done in after-update - ;; has to place it here because changes is not available in after-update hook see toucan2#129 + ;; TODO: this would ideally be done only once the query changes have been commited to the database, to avoid + ;; race conditions leading to stale analysis triggering the "last one wins" analysis update. (when (contains? changes :dataset_query) - (query-analysis/analyze-async! id)) + (query-analysis/analyze-async! changes)) (when (:parameters changes) (parameter-card/upsert-or-delete-from-parameters! "card" id (:parameters changes))) ;; additional checks (Enterprise Edition only) diff --git a/src/metabase/query_analysis.clj b/src/metabase/query_analysis.clj index a5eae494064..1225d9e71c9 100644 --- a/src/metabase/query_analysis.clj +++ b/src/metabase/query_analysis.clj @@ -202,8 +202,10 @@ (defn ->analyzable "Given a partial card or its id, ensure that we have all the fields required for analysis." [card-or-id] - (if (and (map? card-or-id) (every? (partial contains? card-or-id) [:id :archived :dataset_query])) + ;; If we don't know whether a card has been archived, give it the benefit of the doubt. + (if (every? #(some? (% card-or-id)) [:id :dataset_query]) card-or-id + ;; If we need to query the database though, find out for sure. (t2/select-one [:model/Card :id :archived :dataset_query] (u/the-id card-or-id)))) (defn analyze-card! @@ -218,13 +220,13 @@ (when (and card (not (:archived card))) (update-query-analysis-for-card! card)))) -(defn next-card-id! +(defn next-card-or-id! "Get the id of the next card id to be analyzed. May block indefinitely, relies on producer. Should only be called from [[metabase.task.analyze-queries]]." ([] - (next-card-id! worker-queue)) + (next-card-or-id! worker-queue)) ([queue] - (next-card-id! queue Long/MAX_VALUE)) + (next-card-or-id! queue Long/MAX_VALUE)) ([queue timeout] (queue/blocking-take! queue timeout))) @@ -232,8 +234,8 @@ "Indirection used to modify the execution strategy for analysis in dev and tests." [offer-fn! card-or-id] (case (execution) - ::immediate (analyze-card! (u/the-id card-or-id)) - ::queued (offer-fn! (u/the-id card-or-id)) + ::immediate (analyze-card! card-or-id) + ::queued (offer-fn! card-or-id) ::disabled nil)) (defn analyze-async! diff --git a/src/metabase/task/analyze_queries.clj b/src/metabase/task/analyze_queries.clj index cb846a9cfc2..91aca311305 100644 --- a/src/metabase/task/analyze_queries.clj +++ b/src/metabase/task/analyze_queries.clj @@ -40,9 +40,10 @@ (defn- analyzer-loop* [stop-after next-card-id-fn] (loop [remaining stop-after] (when (public-settings/query-analysis-enabled) - (let [card-id (next-card-id-fn) - timer (u/start-timer) - card (query-analysis/->analyzable card-id)] + (let [card-or-id (next-card-id-fn) + card-id (u/the-id card-or-id) + timer (u/start-timer) + card (query-analysis/->analyzable card-or-id)] (if (failure-map/non-retryable? card) (log/warnf "Skipping analysis of Card %s as its query has caused failures in the past." card-id) (try @@ -63,11 +64,11 @@ ([] (analyzer-loop! nil)) ([stop-after] - (analyzer-loop* stop-after query-analysis/next-card-id!)) + (analyzer-loop* stop-after query-analysis/next-card-or-id!)) ([stop-after queue] (analyzer-loop! stop-after queue Long/MAX_VALUE)) ([stop-after queue timeout] - (analyzer-loop* stop-after (partial query-analysis/next-card-id! queue timeout)))) + (analyzer-loop* stop-after (partial query-analysis/next-card-or-id! queue timeout)))) (jobs/defjob ^{DisallowConcurrentExecution true :doc "Analyze "} diff --git a/src/metabase/task/sweep_query_analysis.clj b/src/metabase/task/sweep_query_analysis.clj index bca9d644c5e..804d290ceb6 100644 --- a/src/metabase/task/sweep_query_analysis.clj +++ b/src/metabase/task/sweep_query_analysis.clj @@ -37,6 +37,7 @@ ([] (analyze-cards-without-query-fields! query-analysis/analyze-sync!)) ([analyze-fn] + ;; TODO once we are storing the hash of the query used for analysis, we'll be able to filter this properly. (let [cards (t2/reducible-select [:model/Card :id])] (run! analyze-fn cards)))) diff --git a/test/metabase/models/query_analysis_test.clj b/test/metabase/models/query_analysis_test.clj index e984006e3e2..2ac66955746 100644 --- a/test/metabase/models/query_analysis_test.clj +++ b/test/metabase/models/query_analysis_test.clj @@ -59,9 +59,7 @@ [card-id query] (if (string? query) (t2/update! :model/Card card-id {:dataset_query (mt/native-query {:query query})}) - (t2/update! :model/Card card-id {:dataset_query query})) - ;; TODO remove this hack, adjusting the queue design to handle unsaved cards #45460 - (query-analysis/analyze-card! card-id)) + (t2/update! :model/Card card-id {:dataset_query query}))) ;;;; ;;;; Actual tests -- GitLab