Skip to content
Snippets Groups Projects
Unverified Commit a16ed561 authored by Cam Saul's avatar Cam Saul
Browse files

Move 'userland' QP function logic into new middleware :twisted_rightwards_arrows:

[ci bigquery]
parent 17c1c5c1
No related merge requests found
Showing
with 376 additions and 315 deletions
......@@ -227,7 +227,6 @@
:query {:source-table (data/id :venues)
:limit 1}
:info {:executed-by 1000
:query-type "MBQL"
:query-hash (byte-array [1 2 3 4])}})
@native-query)))
......@@ -248,5 +247,4 @@
:query {:source-table (data/id :venues)
:limit 1}
:info {:executed-by 1000
:query-type "MBQL"
:query-hash (byte-array [1 2 3 4])}}))
......@@ -194,7 +194,8 @@
:exclusions [expectations]]]
:injections
[(require 'metabase.test-setup ; for test setup stuff
[(require 'expectation-options ; expectations customizations
'metabase.test-setup ; for test setup stuff
'metabase.test.util)] ; for the toucan.util.test default values for temp models
:resource-paths
......
......@@ -8,7 +8,6 @@
[metabase
[events :as events]
[public-settings :as public-settings]
[query-processor :as qp]
[related :as related]
[util :as u]]
[metabase.api
......@@ -32,6 +31,7 @@
[util :as qputil]]
[metabase.query-processor.middleware
[cache :as cache]
[constraints :as constraints]
[results-metadata :as results-metadata]]
[metabase.sync.analyze.query-results :as qr]
[metabase.util
......@@ -610,7 +610,7 @@
Exception if preconditions (such as read perms) are not met before returning a channel."
{:style/indent 1}
[card-id & {:keys [parameters constraints context dashboard-id middleware]
:or {constraints qp/default-query-constraints
:or {constraints constraints/default-query-constraints
context :question}}]
{:pre [(u/maybe? sequential? parameters)]}
(let [card (api/read-check (Card card-id))
......
......@@ -4,7 +4,6 @@
[compojure.core :refer [DELETE GET POST PUT]]
[metabase
[events :as events]
[query-processor :as qp]
[related :as related]
[util :as u]]
[metabase.api.common :as api]
......@@ -18,6 +17,7 @@
[interface :as mi]
[query :as query :refer [Query]]
[revision :as revision]]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.query-processor.util :as qp-util]
[metabase.util.schema :as su]
[schema.core :as s]
......@@ -59,7 +59,7 @@
(api/defendpoint POST "/"
"Create a new `Dashboard`."
"Create a new Dashboard."
[:as {{:keys [name description parameters collection_id collection_position], :as dashboard} :body}]
{name su/NonBlankString
parameters [su/Map]
......@@ -137,10 +137,10 @@
[{:keys [dataset_query]}]
(u/ignore-exceptions
[(qp-util/query-hash dataset_query)
(qp-util/query-hash (assoc dataset_query :constraints qp/default-query-constraints))]))
(qp-util/query-hash (assoc dataset_query :constraints constraints/default-query-constraints))]))
(defn- dashcard->query-hashes
"Return a sequence of all the query hashes for this DASHCARD, including the top-level Card and any Series."
"Return a sequence of all the query hashes for this `dashcard`, including the top-level Card and any Series."
[{:keys [card series]}]
(reduce concat
(card->query-hashes card)
......@@ -148,13 +148,13 @@
(card->query-hashes card))))
(defn- dashcards->query-hashes
"Return a sequence of all the query hashes used in a DASHCARDS."
"Return a sequence of all the query hashes used in a `dashcards`."
[dashcards]
(apply concat (for [dashcard dashcards]
(dashcard->query-hashes dashcard))))
(defn- hashes->hash-vec->avg-time
"Given some query HASHES, return a map of hashes (as normal Clojure vectors) to the average query durations.
"Given some query `hashes`, return a map of hashes (as normal Clojure vectors) to the average query durations.
(The hashes are represented as normal Clojure vectors because identical byte arrays aren't considered equal to one
another, and thus do not work as one would expect when used as map keys.)"
[hashes]
......@@ -163,7 +163,7 @@
{(vec k) v}))))
(defn- add-query-average-duration-to-card
"Add `:query_average_duration` info to a CARD (i.e., the `:card` property of a DashCard or an entry in its `:series`
"Add `:query_average_duration` info to a `card` (i.e., the `:card` property of a DashCard or an entry in its `:series`
array)."
[card hash-vec->avg-time]
(assoc card :query_average_duration (some (fn [query-hash]
......@@ -171,7 +171,7 @@
(card->query-hashes card))))
(defn- add-query-average-duration-to-dashcards
"Add `:query_average_duration` to the top-level Card and any Series in a sequence of DASHCARDS."
"Add `:query_average_duration` to the top-level Card and any Series in a sequence of `dashcards`."
([dashcards]
(add-query-average-duration-to-dashcards dashcards (hashes->hash-vec->avg-time (dashcards->query-hashes dashcards))))
([dashcards hash-vec->avg-time]
......@@ -183,13 +183,13 @@
(add-query-average-duration-to-card card hash-vec->avg-time))))))))
(defn add-query-average-durations
"Add a `average_execution_time` field to each card (and series) belonging to DASHBOARD."
"Add a `average_execution_time` field to each card (and series) belonging to `dashboard`."
[dashboard]
(update dashboard :ordered_cards add-query-average-duration-to-dashcards))
(defn- get-dashboard
"Get `Dashboard` with ID."
"Get Dashboard with ID."
[id]
(-> (Dashboard id)
api/check-404
......@@ -201,7 +201,7 @@
(api/defendpoint POST "/:from-dashboard-id/copy"
"Copy a `Dashboard`."
"Copy a Dashboard."
[from-dashboard-id :as {{:keys [name description collection_id collection_position], :as dashboard} :body}]
{name (s/maybe su/NonBlankString)
description (s/maybe s/Str)
......@@ -231,7 +231,7 @@
;;; --------------------------------------------- Fetching/Updating/Etc. ---------------------------------------------
(api/defendpoint GET "/:id"
"Get `Dashboard` with ID."
"Get Dashboard with ID."
[id]
(u/prog1 (get-dashboard id)
(events/publish-event! :dashboard-read (assoc <> :actor_id api/*current-user-id*))))
......@@ -247,7 +247,7 @@
(api/check-superuser)))
(api/defendpoint PUT "/:id"
"Update a `Dashboard`.
"Update a Dashboard.
Usually, you just need write permissions for this Dashboard to do this (which means you have appropriate
permissions for the Cards belonging to this Dashboard), but to change the value of `enable_embedding` you must be a
......@@ -293,7 +293,7 @@
;; TODO - We can probably remove this in the near future since it should no longer be needed now that we're going to
;; be setting `:archived` to `true` via the `PUT` endpoint instead
(api/defendpoint DELETE "/:id"
"Delete a `Dashboard`."
"Delete a Dashboard."
[id]
(log/warn (str "DELETE /api/dashboard/:id is deprecated. Instead of deleting a Dashboard, you should change its "
"`archived` value via PUT /api/dashboard/:id."))
......@@ -305,7 +305,7 @@
;; TODO - param should be `card_id`, not `cardId` (fix here + on frontend at the same time)
(api/defendpoint POST "/:id/cards"
"Add a `Card` to a `Dashboard`."
"Add a `Card` to a Dashboard."
[id :as {{:keys [cardId parameter_mappings series], :as dashboard-card} :body}]
{cardId (s/maybe su/IntGreaterThanZero)
parameter_mappings [su/Map]}
......@@ -320,7 +320,7 @@
;; TODO - we should use schema to validate the format of the Cards :D
(api/defendpoint PUT "/:id/cards"
"Update `Cards` on a `Dashboard`. Request body should have the form:
"Update `Cards` on a Dashboard. Request body should have the form:
{:cards [{:id ...
:sizeX ...
......@@ -337,7 +337,7 @@
(api/defendpoint DELETE "/:id/cards"
"Remove a `DashboardCard` from a `Dashboard`."
"Remove a `DashboardCard` from a Dashboard."
[id dashcardId]
{dashcardId su/IntStringGreaterThanZero}
(api/check-not-archived (api/write-check Dashboard id))
......@@ -347,14 +347,14 @@
(api/defendpoint GET "/:id/revisions"
"Fetch `Revisions` for `Dashboard` with ID."
"Fetch `Revisions` for Dashboard with ID."
[id]
(api/read-check Dashboard id)
(revision/revisions+details Dashboard id))
(api/defendpoint POST "/:id/revert"
"Revert a `Dashboard` to a prior `Revision`."
"Revert a Dashboard to a prior `Revision`."
[id :as {{:keys [revision_id]} :body}]
{revision_id su/IntGreaterThanZero}
(api/write-check Dashboard id)
......
......@@ -10,10 +10,10 @@
[card :refer [Card]]
[database :as database :refer [Database]]
[query :as query]]
[metabase.query-processor :as qp]
[metabase.query-processor
[async :as qp.async]
[util :as qputil]]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.util
[date :as du]
[export :as ex]
......@@ -22,7 +22,6 @@
[schema.core :as s])
(:import clojure.core.async.impl.channels.ManyToManyChannel))
;;; -------------------------------------------- Running a Query Normally --------------------------------------------
(defn- query->source-card-id
......@@ -164,7 +163,7 @@
{:average (or
(some (comp query/average-execution-time-ms qputil/query-hash)
[query
(assoc query :constraints qp/default-query-constraints)])
(assoc query :constraints constraints/default-query-constraints)])
0)})
......
......@@ -6,7 +6,6 @@
[medley.core :as m]
[metabase
[db :as mdb]
[query-processor :as qp]
[util :as u]]
[metabase.api
[card :as card-api]
......@@ -80,9 +79,11 @@
{:style/indent 2}
[card-id parameters & options]
;; run this query with full superuser perms
(let [in-chan (binding [api/*current-user-permissions-set* (atom #{"/"})
qp/*allow-queries-with-no-executor-id* true]
(apply card-api/run-query-for-card-async card-id, :parameters parameters, :context :public-question, options))
(let [in-chan (binding [api/*current-user-permissions-set* (atom #{"/"})]
(apply card-api/run-query-for-card-async card-id
:parameters parameters
:context :public-question
options))
out-chan (a/chan 1 (map transform-results))]
(async.u/single-value-pipe in-chan out-chan)
out-chan))
......
......@@ -2,13 +2,13 @@
(:require [clojure.java
[io :as io]
[shell :as shell]]
[clojure.string :as s]
[clojure.string :as str]
[environ.core :as environ])
(:import clojure.lang.Keyword))
(def ^Boolean is-windows?
"Are we running on a Windows machine?"
(s/includes? (s/lower-case (System/getProperty "os.name")) "win"))
(str/includes? (str/lower-case (System/getProperty "os.name")) "win"))
(def ^:private app-defaults
"Global application defaults"
......@@ -60,12 +60,13 @@
(defn- version-info-from-shell-script []
(try
(let [[tag hash branch date] (-> (shell/sh "./bin/version") :out s/trim (s/split #" "))]
(let [[tag hash branch date] (-> (shell/sh "./bin/version") :out str/trim (str/split #" "))]
{:tag (or tag "?")
:hash (or hash "?")
:branch (or branch "?")
:date (or date "?")})
;; if ./bin/version fails (e.g., if we are developing on Windows) just return something so the whole thing doesn't barf
;; if ./bin/version fails (e.g., if we are developing on Windows) just return something so the whole thing doesn't
;; barf
(catch Throwable _
{:tag "?", :hash "?", :branch "?", :date "?"})))
......
......@@ -621,17 +621,36 @@
"Additional options that can be used to toggle middleware on or off."
{;; should we skip adding results_metadata to query results after running the query? Used by
;; `metabase.query-processor.middleware.results-metadata`; default `false`
(s/optional-key :skip-results-metadata?) s/Bool
(s/optional-key :skip-results-metadata?)
s/Bool
;; should we skip converting datetime types to ISO-8601 strings with appropriate timezone when post-processing
;; results? Used by `metabase.query-processor.middleware.format-rows`; default `false`
(s/optional-key :format-rows?) s/Bool
(s/optional-key :format-rows?)
s/Bool
;; disable the MBQL->native middleware. If you do this, the query will not work at all, so there are no cases where
;; you should set this yourself. This is only used by the `qp/query->preprocessed` function to get the fully
;; pre-processed query without attempting to convert it to native.
(s/optional-key :disable-mbql->native?) s/Bool
(s/optional-key :disable-mbql->native?)
s/Bool
;; Userland queries are ones ran as a result of an API call, Pulse, MetaBot query, or the like. Special handling is
;; done in the `process-userland-query` middleware for such queries -- results are returned in a slightly different
;; format, and QueryExecution entries are normally saved, unless you pass `:no-save` as the option.
(s/optional-key :userland-query?)
(s/maybe s/Bool)
;; Whether to add some default `max-results` and `max-results-bare-rows` constraints. By default, none are added,
;; although the functions that ultimately power most API endpoints tend to set this to `true`. See
;; `add-constraints` middleware for more details.
(s/optional-key :add-default-userland-constraints?)
(s/maybe s/Bool)
;; other middleware options might be used somewhere, but I don't know about them. Add them if you come across them
;; for documentation purposes
s/Keyword s/Any})
s/Keyword
s/Any})
;;; ------------------------------------------------------ Info ------------------------------------------------------
......@@ -656,8 +675,8 @@
:question
:xlsx-download))
;; TODO - this schema is somewhat misleading because if you use a function like `qp/process-query-and-save-with-max-results-constraints!`
;; some of these keys (e.g. `:context`) are in fact required
;; TODO - this schema is somewhat misleading because if you use a function like
;; `qp/process-query-and-save-with-max-results-constraints!` some of these keys (e.g. `:context`) are in fact required
(def Info
"Schema for query `:info` dictionary, which is used for informational purposes to record information about how a query
was executed in QueryExecution and other places. It is considered bad form for middleware to change its behavior
......@@ -670,13 +689,10 @@
(s/optional-key :dashboard-id) (s/maybe su/IntGreaterThanZero)
(s/optional-key :pulse-id) (s/maybe su/IntGreaterThanZero)
(s/optional-key :nested?) (s/maybe s/Bool)
;; `:hash` and `:query-type` get added automatically by `process-query-and-save-execution!`, so don't try passing
;; `:hash` gets added automatically by `process-query-and-save-execution!`, so don't try passing
;; these in yourself. In fact, I would like this a lot better if we could take these keys out of `:info` entirely
;; and have the code that saves QueryExceutions figure out their values when it goes to save them
(s/optional-key :query-hash) (s/maybe (Class/forName "[B"))
;; TODO - this key is pointless since we can just look at `:type`; let's normalize it out and remove it entirely
;; when we get a chance
(s/optional-key :query-type) (s/enum "MBQL" "native")})
(s/optional-key :query-hash) (s/maybe (Class/forName "[B"))})
(def SourceQueryMetadata
"Schema for the expected keys in metadata about source query columns if it is passed in to the query."
......
......@@ -34,8 +34,8 @@
(let [{:keys [creator_id dataset_query]} card]
{:card card
:result (qp/process-query-and-save-with-max-results-constraints! dataset_query
(merge {:executed-by creator_id,
:context :pulse,
(merge {:executed-by creator_id
:context :pulse
:card-id card-id}
options))}))
(catch Throwable t
......
(ns metabase.query-processor
"Preprocessor that does simple transformations to all incoming queries, simplifing the driver-specific
implementations."
(:require [clojure.tools.logging :as log]
[medley.core :as m]
[metabase
[driver :as driver]
[util :as u]]
(:require [medley.core :as m]
[metabase.driver :as driver]
[metabase.driver.util :as driver.u]
[metabase.mbql.schema :as mbql.s]
[metabase.models
[query :as query]
[query-execution :as query-execution :refer [QueryExecution]]]
[metabase.query-processor.middleware
[add-dimension-projections :as add-dim]
[add-implicit-clauses :as implicit-clauses]
......@@ -23,6 +17,7 @@
[cache :as cache]
[catch-exceptions :as catch-exceptions]
[check-features :as check-features]
[constraints :as constraints]
[cumulative-aggregations :as cumulative-ags]
[desugar :as desugar]
[dev :as dev]
......@@ -36,6 +31,7 @@
[normalize-query :as normalize]
[parameters :as parameters]
[permissions :as perms]
[process-userland-query :as process-userland-query]
[reconcile-breakout-and-order-by-bucketing :as reconcile-bucketing]
[resolve-database :as resolve-database]
[resolve-driver :as resolve-driver]
......@@ -47,12 +43,8 @@
[store :as store]
[validate :as validate]
[wrap-value-literals :as wrap-value-literals]]
[metabase.query-processor.util :as qputil]
[metabase.util
[date :as du]
[i18n :refer [trs tru]]]
[schema.core :as s]
[toucan.db :as db]))
[metabase.util.i18n :refer [tru]]
[schema.core :as s]))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | QUERY PROCESSOR |
......@@ -103,7 +95,6 @@
;; ▲▲▲ NATIVE-ONLY POINT ▲▲▲ Query converted from MBQL to native here; f will see a native query instead of MBQL
mbql-to-native/mbql->native
annotate/result-rows-maps->vectors
;; TODO - should we log the fully preprocessed query here?
check-features/check-features
wrap-value-literals/wrap-value-literals
annotate/add-column-info
......@@ -147,7 +138,9 @@
log-query/log-results-metadata
validate/validate-query
normalize/normalize
catch-exceptions/catch-exceptions))
catch-exceptions/catch-exceptions
process-userland-query/process-userland-query
constraints/add-default-userland-constraints))
;; ▲▲▲ PRE-PROCESSING ▲▲▲ happens from BOTTOM-TO-TOP, e.g. the results of `expand-macros` are passed to
;; `substitute-parameters`
......@@ -233,176 +226,31 @@
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | DATASET-QUERY PUBLIC API |
;;; | Userland Queries (Public Interface) |
;;; +----------------------------------------------------------------------------------------------------------------+
;; The only difference between `process-query` and `process-query-and-save-execution!` (below) is that the
;; latter records a `QueryExecution` (inserts a new row) recording some stats about this Query run including
;; execution time and type of query ran
;; The difference between `process-query` and the versions below is that the ones below are meant to power various
;; things like API endpoints and pulses, while `process-query` is more of a low-level internal function.
;;
;; `process-query-and-save-execution!` is the function used by various things like API endpoints and pulses;
;; `process-query` is more of an internal function
;; Many moons ago the two sets of functions had different QP pipelines; these days the functions below are simply
;; convenience wrappers for `process-query` that include a few options to activate appropriate middleware for userland
;; queries. This middleware does things like saving QueryExecutions and adding max results constraints.
(defn- save-query-execution!
"Save a `QueryExecution` and update the average execution time for the corresponding `Query`."
[{query :json_query, :as query-execution}]
(u/prog1 query-execution
(query/save-query-and-update-average-execution-time! query (:hash query-execution) (:running_time query-execution))
(db/insert! QueryExecution (dissoc query-execution :json_query))))
(defn- save-and-return-failed-query!
"Save QueryExecution state and construct a failed query response"
[query-execution, ^Throwable e]
;; record our query execution and format response
(-> query-execution
(dissoc :start_time_millis)
(merge {:error (.getMessage e)
:running_time (- (System/currentTimeMillis) (:start_time_millis query-execution))})
save-query-execution!
(dissoc :result_rows :hash :executor_id :native :card_id :dashboard_id :pulse_id)
;; this is just for the response for client
(assoc :status :failed
:error (.getMessage e)
:row_count 0
:data {:rows []
:cols []
:columns []})
;; include stacktrace and preprocessed/native stages of the query if available in the response which should make
;; debugging queries a bit easier
(merge (some-> (ex-data e)
(select-keys [:stacktrace :preprocessed :native])
(m/dissoc-in [:preprocessed :info])))))
(defn- save-and-return-successful-query!
"Save QueryExecution state and construct a completed (successful) query response"
[query-execution query-result]
(let [query-execution (-> (assoc query-execution
:running_time (- (System/currentTimeMillis)
(:start_time_millis query-execution))
:result_rows (get query-result :row_count 0))
(dissoc :start_time_millis))]
;; only insert a new record into QueryExecution if the results *were not* cached (i.e., only if a Query was
;; actually ran)
(when-not (:cached query-result)
(save-query-execution! query-execution))
;; ok, now return the results in the normal response format
(merge (dissoc query-execution :error :result_rows :hash :executor_id :native :card_id :dashboard_id :pulse_id)
query-result
{:status :completed
:average_execution_time (when (:cached query-result)
(query/average-execution-time-ms (:hash query-execution)))})))
(defn- assert-query-status-successful
"Make sure QUERY-RESULT `:status` is something other than `nil`or `:failed`, or throw an Exception."
[query-result]
(when-not (contains? query-result :status)
(throw (ex-info (str (tru "Invalid response from database driver. No :status provided."))
query-result)))
(when (= :failed (:status query-result))
(if (= InterruptedException (:class query-result))
(log/info (trs "Query canceled"))
(log/warn (trs "Query failure")
(u/pprint-to-str 'red query-result)))
(throw (ex-info (str (get query-result :error (tru "General error")))
query-result))))
(def ^:dynamic ^Boolean *allow-queries-with-no-executor-id*
"Should we allow running queries (via `dataset-query`) without specifying the `executed-by` User ID? By default
this is `false`, but this constraint can be disabled for running queries not executed by a specific user
(e.g., public Cards)."
false)
(defn- query-execution-info
"Return the info for the `QueryExecution` entry for this QUERY."
{:arglists '([query])}
[{{:keys [executed-by query-hash query-type context card-id dashboard-id pulse-id]} :info
database-id :database
:as query}]
{:pre [(instance? (Class/forName "[B") query-hash)
(string? query-type)]}
{:database_id database-id
:executor_id executed-by
:card_id card-id
:dashboard_id dashboard-id
:pulse_id pulse-id
:context context
:hash (or query-hash (throw (Exception. (str (tru "Missing query hash!")))))
:native (= query-type "native")
:json_query (dissoc query :info)
:started_at (du/new-sql-timestamp)
:running_time 0
:result_rows 0
:start_time_millis (System/currentTimeMillis)})
(defn- run-and-save-query!
"Run QUERY and save appropriate `QueryExecution` info, and then return results (or an error message) in the usual
format."
[query]
(let [query-execution (query-execution-info query)]
(try
(let [result (process-query query)]
(assert-query-status-successful result)
(save-and-return-successful-query! query-execution result))
;; canceled query
(catch Throwable e
(save-and-return-failed-query! query-execution e)))))
(s/defn ^:private assoc-query-info [query, options :- mbql.s/Info]
(assoc query :info (assoc options
:query-hash (qputil/query-hash query)
:query-type (if (qputil/mbql-query? query) "MBQL" "native"))))
;; TODO - couldn't saving the query execution be done by MIDDLEWARE?
(s/defn process-query-and-save-execution!
"Process and run a json based dataset query and return results.
Takes 2 arguments:
1. the json query as a map
2. query execution options (and context information) specified as a map
Depending on the database specified in the query this function will delegate to a driver specific implementation.
For the purposes of tracking we record each call to this function as a QueryExecution in the database.
OPTIONS must conform to the `mbql.s/Info` schema; refer to that for more details."
"Process and run a 'userland' MBQL query (e.g. one ran as the result of an API call, scheduled Pulse, MetaBot query,
etc.). Returns results in a format appropriate for consumption by FE client. Saves QueryExecution row in application
DB."
{:style/indent 1}
[query, options :- mbql.s/Info]
(run-and-save-query! (assoc-query-info query options)))
(def ^:private ^:const max-results-bare-rows
"Maximum number of rows to return specifically on :rows type queries via the API."
2000)
(def ^:private ^:const max-results
"General maximum number of rows to return from an API query."
10000)
(def default-query-constraints
"Default map of constraints that we apply on dataset queries executed by the api."
{:max-results max-results
:max-results-bare-rows max-results-bare-rows})
(defn- add-default-constraints
"Add default values of `:max-results` and `:max-results-bare-rows` to `:constraints` map `m`."
[m]
(merge
default-query-constraints
;; `:max-results-bare-rows` must be less than or equal to `:max-results`, so if someone sets `:max-results` but not
;; `:max-results-bare-rows` use the same value for both. Otherwise the default bare rows value could end up being
;; higher than the custom `:max-rows` value, causing an error
(when-let [max-results (:max-results m)]
{:max-results-bare-rows max-results})
m))
(process-query
(-> query
(update :info merge options)
(assoc-in [:middleware :userland-query?] true))))
(s/defn process-query-and-save-with-max-results-constraints!
"Same as `process-query-and-save-execution!` but will include the default max rows returned as a constraint. (This
function is ulitmately what powers most API endpoints that run queries, including `POST /api/dataset`.)"
{:style/indent 1}
[query, options :- mbql.s/Info]
(process-query-and-save-execution! (update query :constraints add-default-constraints) options))
(s/defn process-query-without-save!
"Invokes `process-query` with info needed for the included remark."
[user query]
(process-query (assoc-query-info query {:executed-by user})))
(let [query (assoc-in query [:middleware :add-default-userland-constraints?] true)]
(process-query-and-save-execution! query options)))
......@@ -19,7 +19,9 @@
[semaphore-channel :as semaphore-channel]
[util :as async.u]]
[metabase.models.setting :refer [defsetting]]
[metabase.query-processor.interface :as qpi]
[metabase.query-processor
[interface :as qpi]
[util :as qputil]]
[metabase.util.i18n :refer [trs]]
[schema.core :as s])
(:import clojure.core.async.impl.channels.ManyToManyChannel))
......@@ -77,13 +79,6 @@
[query options]
(do-async (:database query) qp/process-query-and-save-with-max-results-constraints! query options))
(defn process-query-without-save!
"Async version of `metabase.query-processor/process-query-without-save!`. Runs query asynchronously, and returns a
`core.async` channel that can be used to fetch the results once the query finishes running. Closing the channel will
cancel the query."
[user query]
(do-async (:database query) qp/process-query-without-save! user query))
;;; ------------------------------------------------ Result Metadata -------------------------------------------------
......@@ -103,13 +98,20 @@
;; set up a pipe to get the async QP results and pipe them thru to out-chan
(async.u/single-value-pipe
(binding [qpi/*disable-qp-logging* true]
(process-query-without-save!
api/*current-user-id*
(process-query
;; for purposes of calculating the actual Fields & types returned by this query we really only need the first
;; row in the results
(-> query
(assoc-in [:constrains :max-results] 1)
(assoc-in [:constrains :max-results-bare-rows] 1))))
(let [query (-> query
(assoc-in [:constraints :max-results] 1)
(assoc-in [:constraints :max-results-bare-rows] 1)
(assoc-in [:info :executed-by] api/*current-user-id*))]
;; need add the constraints above before calculating hash because those affect the hash
;;
;; (normally middleware takes care of calculating query hashes for 'userland' queries but this is not
;; technically a userland query -- we don't want to save a QueryExecution -- so we need to add `executed-by`
;; and `query-hash` ourselves so the remark gets added)
(assoc-in query [:info :query-hash] (qputil/query-hash query)))
))
out-chan)
;; return out-chan
out-chan))
......@@ -22,6 +22,7 @@
[metabase.query-processor.util :as qputil]
[metabase.util.date :as du]))
;; TODO - Why not make this an option in the query itself? :confused:
(def ^:dynamic ^Boolean *ignore-cached-results*
"Should we force the query to run, ignoring cached results even if they're available?
Setting this to `true` will run the query again and will still save the updated results."
......@@ -104,7 +105,7 @@
results))
(defn- run-query-with-cache [qp {:keys [cache-ttl], :as query}]
;; TODO - Query should already have a `info.hash`, shouldn't it?
;; TODO - Query will already have `info.hash` if it's a userland query. I'm not 100% sure it will be the same hash.
(let [query-hash (qputil/query-hash query)]
(or (cached-results query-hash cache-ttl)
(run-query-and-save-results-if-successful! query-hash qp query))))
......
(ns metabase.query-processor.middleware.constraints)
(def ^:private max-results-bare-rows
"Maximum number of rows to return specifically on :rows type queries via the API."
2000)
(def ^:private max-results
"General maximum number of rows to return from an API query."
10000)
(def default-query-constraints
"Default map of constraints that we apply on dataset queries executed by the api."
{:max-results max-results
:max-results-bare-rows max-results-bare-rows})
(defn- merge-default-constraints [{:keys [max-results], :as constraints}]
(merge
default-query-constraints
;; `:max-results-bare-rows` must be less than or equal to `:max-results`, so if someone sets `:max-results` but not
;; `:max-results-bare-rows` use the same value for both. Otherwise the default bare rows value could end up being
;; higher than the custom `:max-rows` value, causing an error
(when max-results
{:max-results-bare-rows max-results})
constraints))
(defn- add-default-userland-constraints*
"Add default values of `:max-results` and `:max-results-bare-rows` to `:constraints` map `m`."
[{{:keys [add-default-userland-constraints?]} :middleware, :as query}]
(cond-> query
add-default-userland-constraints? (update :constraints merge-default-constraints)))
(defn add-default-userland-constraints
"Middleware that optionally adds default `max-results` and `max-results-bare-rows` constraints to queries, meant for
use with `process-query-and-save-with-max-results-constraints!`, which ultimately powers most QP API endpoints."
[qp]
(comp qp add-default-userland-constraints*))
(ns metabase.query-processor.middleware.process-userland-query
"Middleware related to doing extra steps for queries that are ran via API endpoints (i.e., most of them -- as opposed
to queries ran internally e.g. as part of the sync process). These include things like saving QueryExecutions and
formatting the results."
(:require [clojure.tools.logging :as log]
[medley.core :as m]
[metabase.models
[query :as query]
[query-execution :as query-execution :refer [QueryExecution]]]
[metabase.query-processor.util :as qputil]
[metabase.util :as u]
[metabase.util
[date :as du]
[i18n :refer [trs tru]]]
[toucan.db :as db]))
(defn- add-running-time [{start-time-ms :start_time_millis, :as query-execution}]
(-> query-execution
(assoc :running_time (- (System/currentTimeMillis) start-time-ms))
(dissoc :start_time_millis)))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Save Query Execution |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- save-query-execution!
"Save a `QueryExecution` and update the average execution time for the corresponding `Query`."
[{query :json_query, :as query-execution}]
(let [query-execution (add-running-time query-execution)]
(query/save-query-and-update-average-execution-time! query (:hash query-execution) (:running_time query-execution))
(db/insert! QueryExecution (dissoc query-execution :json_query))))
(defn- save-successful-query-execution! [query-execution {cached? :cached, result-rows :row_count}]
;; only insert a new record into QueryExecution if the results *were not* cached (i.e., only if a Query was
;; actually ran)
(when-not cached?
(save-query-execution! (assoc query-execution :result_rows (or result-rows 0)))))
(defn- save-failed-query-execution! [query-execution message]
(save-query-execution! (assoc query-execution :error (str message))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Format Response |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- success-response [{query-hash :hash, :as query-execution} {cached? :cached, :as result}]
(merge
(-> query-execution
add-running-time
(dissoc :error :hash :executor_id :card_id :dashboard_id :pulse_id :result_rows :native))
result
{:status :completed
:average_execution_time (when cached?
(query/average-execution-time-ms query-hash))}))
(defn- failure-response [query-execution message result]
(merge
(-> query-execution
add-running-time
(dissoc :result_rows :hash :executor_id :card_id :dashboard_id :pulse_id :native))
{:status :failed
:error message
:row_count 0
:data {:rows []
:cols []
:columns []}}
;; include stacktrace and preprocessed/native stages of the query if available in the response which should
;; make debugging queries a bit easier
(-> (select-keys result [:stacktrace :preprocessed :native])
(m/dissoc-in [:preprocessed :info]))))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Handle Response |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- succeed [query-execution result]
(save-successful-query-execution! query-execution result)
(success-response query-execution result))
(defn- fail [query-execution message result]
(save-failed-query-execution! query-execution message)
(failure-response query-execution message result))
(defn- format-userland-query-result
"Make sure `query-result` `:status` is something other than `nil`or `:failed`, or throw an Exception."
[query-execution {:keys [status], :as result}]
(cond
(not status)
(fail query-execution (tru "Invalid response from database driver. No :status provided.") result)
(and (= status :failed)
(instance? InterruptedException (:class result)))
(log/info (trs "Query canceled"))
(= status :failed)
(do
(log/warn (trs "Query failure") (u/pprint-to-str 'red result))
(fail query-execution (get result :error (tru "Unknown error")) result))
(= status :completed)
(succeed query-execution result)))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Middleware |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- query-execution-info
"Return the info for the QueryExecution entry for this `query`."
{:arglists '([query])}
[{{:keys [executed-by query-hash context card-id dashboard-id pulse-id]} :info
database-id :database
query-type :type
:as query}]
{:pre [(instance? (Class/forName "[B") query-hash)]}
{:database_id database-id
:executor_id executed-by
:card_id card-id
:dashboard_id dashboard-id
:pulse_id pulse-id
:context context
:hash query-hash
:native (= (keyword query-type) :native)
:json_query (dissoc query :info)
:started_at (du/new-sql-timestamp)
:running_time 0
:result_rows 0
:start_time_millis (System/currentTimeMillis)})
(defn process-userland-query
"Do extra handling 'userland' queries (i.e. ones ran as a result of a user action, e.g. an API call, scheduled Pulse,
etc.). This includes recording QueryExecution entries and returning the results in an FE-client-friendly format."
[qp]
(fn [{{:keys [userland-query?]} :middleware, :as query}]
(if-not userland-query?
(qp query)
;; add calculated hash to query
(let [query (assoc-in query [:info :query-hash] (qputil/query-hash query))]
(format-userland-query-result (query-execution-info query) (qp query))))))
......@@ -27,11 +27,15 @@
"Generate an approparite REMARK to be prepended to a query to give DBAs additional information about the query being
executed. See documentation for `mbql->native` and [issue #2386](https://github.com/metabase/metabase/issues/2386)
for more information."
^String [{{:keys [executed-by query-hash query-type], :as info} :info}]
(str "Metabase" (when info
^String [{{:keys [executed-by query-hash], :as info} :info, query-type :type}]
(str "Metabase" (when executed-by
(assert (instance? (Class/forName "[B") query-hash))
(format ":: userID: %s queryType: %s queryHash: %s"
executed-by query-type (codecs/bytes->hex query-hash)))))
executed-by
(case (keyword query-type)
:query "MBQL"
:native "native")
(codecs/bytes->hex query-hash)))))
;;; ------------------------------------------------- Normalization --------------------------------------------------
......@@ -74,7 +78,7 @@
(empty? constraints) (dissoc :constraints)
(empty? parameters) (dissoc :parameters))))
(defn query-hash
(s/defn query-hash :- (Class/forName "[B")
"Return a 256-bit SHA3 hash of QUERY as a key for the cache. (This is returned as a byte array.)"
[query]
(hash/sha3-256 (json/generate-string (select-keys-for-hashing query))))
......
......@@ -7,52 +7,46 @@
[dk.ative.docjure.spreadsheet :as spreadsheet]
[expectations :refer :all]
[medley.core :as m]
[metabase
[query-processor :as qp]
[util :as u]]
[metabase.models
[card :refer [Card]]
[database :as database]
[query-execution :refer [QueryExecution]]]
[metabase.test
[data :as data :refer :all]
[util :as tu]]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.test.data :as data]
[metabase.test.data
[dataset-definitions :as defs]
[datasets :refer [expect-with-driver]]
[users :refer :all]]
[users :as test-users]]
[metabase.test.util.log :as tu.log]
[metabase.util :as u]
[toucan.db :as db]
[toucan.util.test :as tt]))
(defn user-details [user]
(tu/match-$ user
{:email $
:date_joined $
:first_name $
:last_name $
:last_login $
:is_superuser $
:is_qbnewb $
:common_name $}))
(defn format-response [m]
[toucan.util.test :as tt])
(:import com.fasterxml.jackson.core.JsonGenerator))
(defn- format-response [m]
(into {} (for [[k v] (-> m
(m/dissoc-in [:data :results_metadata])
(m/dissoc-in [:data :insights]))]
(cond
(contains? #{:id :started_at :running_time :hash} k) [k (boolean v)]
(= :data k) [k (if-not (contains? v :native_form)
v
(update v :native_form boolean))]
:else [k v]))))
(contains? #{:id :started_at :running_time :hash} k)
[k (boolean v)]
(and (= :data k) (contains? v :native_form))
[k (update v :native_form boolean)]
:else
[k v]))))
(defn- most-recent-query-execution [] (db/select-one QueryExecution {:order-by [[:id :desc]]}))
(def ^:private query-defaults
{:constraints constraints/default-query-constraints
:middleware {:add-default-userland-constraints? true, :userland-query? true}})
;;; ## POST /api/meta/dataset
;; Just a basic sanity check to make sure Query Processor endpoint is still working correctly.
(expect
[ ;; API call response
{:api-response
{:data {:rows [[1000]]
:columns ["count"]
:cols [{:base_type "type/Integer"
......@@ -66,37 +60,41 @@
:context "ad-hoc"
:json_query (-> (data/mbql-query checkins
{:aggregation [[:count]]})
(assoc :type "query")
(assoc-in [:query :aggregation] [["count"]])
(assoc :constraints qp/default-query-constraints))
(assoc :type "query")
(merge query-defaults))
:started_at true
:running_time true
:average_execution_time nil
:database_id (id)}
;; QueryExecution record in the DB
:database_id (data/id)}
:query-execution
{:hash true
:row_count 1
:result_rows 1
:context :ad-hoc
:executor_id (user->id :rasta)
:executor_id (test-users/user->id :rasta)
:native false
:pulse_id nil
:card_id nil
:dashboard_id nil
:error nil
:id true
:database_id (id)
:database_id (data/id)
:started_at true
:running_time true}]
(let [result ((user->client :rasta) :post 200 "dataset" (data/mbql-query checkins
{:aggregation [[:count]]}))]
[(format-response result)
(format-response (most-recent-query-execution))]))
:running_time true}}
(let [result ((test-users/user->client :rasta) :post 200 "dataset" (data/mbql-query checkins
{:aggregation [[:count]]}))]
{:api-response
(format-response result)
:query-execution
(format-response (most-recent-query-execution))}))
;; Even if a query fails we still expect a 200 response from the api
(expect
[ ;; API call response
{:api-response
{:data {:rows []
:columns []
:cols []}
......@@ -104,47 +102,56 @@
:status "failed"
:context "ad-hoc"
:error true
:json_query {:database (id)
:type "native"
:native {:query "foobar"}
:constraints qp/default-query-constraints}
:database_id (id)
:json_query (merge
query-defaults
{:database (data/id)
:type "native"
:native {:query "foobar"}})
:database_id (data/id)
:started_at true
:running_time true}
;; QueryExecution entry in the DB
:query-execution
{:hash true
:id true
:result_rows 0
:row_count 0
:context :ad-hoc
:error true
:database_id (id)
:database_id (data/id)
:started_at true
:running_time true
:executor_id (user->id :rasta)
:executor_id (test-users/user->id :rasta)
:native true
:pulse_id nil
:card_id nil
:dashboard_id nil}]
:dashboard_id nil}}
;; Error message's format can differ a bit depending on DB version and the comment we prepend to it, so check that
;; it exists and contains the substring "Syntax error in SQL statement"
(let [check-error-message (fn [output]
(update output :error (fn [error-message]
(boolean (re-find #"Syntax error in SQL statement" error-message)))))
(some->>
error-message
(re-find #"Syntax error in SQL statement")
boolean))))
result (tu.log/suppress-output
((user->client :rasta) :post 200 "dataset" {:database (id)
:type "native"
:native {:query "foobar"}}))]
[(check-error-message (dissoc (format-response result) :stacktrace))
(check-error-message (format-response (most-recent-query-execution)))]))
((test-users/user->client :rasta) :post 200 "dataset" {:database (data/id)
:type "native"
:native {:query "foobar"}}))]
{:api-response
(check-error-message (dissoc (format-response result) :stacktrace))
:query-execution
(check-error-message (format-response (most-recent-query-execution)))}))
;;; Make sure that we're piggybacking off of the JSON encoding logic when encoding strange values in XLSX (#5145, #5220, #5459)
;;; Make sure that we're piggybacking off of the JSON encoding logic when encoding strange values in XLSX (#5145,
;;; #5220, #5459)
(defrecord ^:private SampleNastyClass [^String v])
(generate/add-encoder
SampleNastyClass
(fn [obj, ^com.fasterxml.jackson.core.JsonGenerator json-generator]
(fn [obj, ^JsonGenerator json-generator]
(.writeString json-generator (:v obj))))
(defrecord ^:private AnotherNastyClass [^String v])
......@@ -175,7 +182,7 @@
["3" "2014-09-15" "8" "56"]
["4" "2014-03-11" "5" "4"]
["5" "2013-05-05" "3" "49"]]
(let [result ((user->client :rasta) :post 200 "dataset/csv" :query
(let [result ((test-users/user->client :rasta) :post 200 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins)))]
(take 5 (parse-and-sort-csv result))))
......@@ -186,8 +193,8 @@
["3" "2014-09-15" "" "8" "56"]
["4" "2014-03-11" "" "5" "4"]
["5" "2013-05-05" "" "3" "49"]]
(with-db (get-or-create-database! defs/test-data-with-null-date-checkins)
(let [result ((user->client :rasta) :post 200 "dataset/csv" :query
(data/with-db (data/get-or-create-database! defs/test-data-with-null-date-checkins)
(let [result ((test-users/user->client :rasta) :post 200 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins)))]
(take 5 (parse-and-sort-csv result)))))
......@@ -198,7 +205,7 @@
["3" "2014-09-15" "8" "56"]
["4" "2014-03-11" "5" "4"]
["5" "2013-05-05" "3" "49"]]
(let [result ((user->client :rasta) :post 200 "dataset/csv" :query
(let [result ((test-users/user->client :rasta) :post 200 "dataset/csv" :query
(json/generate-string (data/mbql-query checkins)))]
(take 5 (parse-and-sort-csv result))))
......@@ -209,17 +216,17 @@
["3" "Kaneonuskatew Eiran" "2014-11-06T16:15:00.000Z"]
["4" "Simcha Yan" "2014-01-01T08:30:00.000Z"]
["5" "Quentin Sören" "2014-10-03T17:30:00.000Z"]]
(let [result ((user->client :rasta) :post 200 "dataset/csv" :query
(let [result ((test-users/user->client :rasta) :post 200 "dataset/csv" :query
(json/generate-string (data/mbql-query users)))]
(take 5 (parse-and-sort-csv result))))
;; Check that we can export the results of a nested query
(expect
16
(tt/with-temp Card [card {:dataset_query {:database (id)
(tt/with-temp Card [card {:dataset_query {:database (data/id)
:type :native
:native {:query "SELECT * FROM USERS;"}}}]
(let [result ((user->client :rasta) :post 200 "dataset/csv"
(let [result ((test-users/user->client :rasta) :post 200 "dataset/csv"
:query (json/generate-string
{:database database/virtual-id
:type :query
......
......@@ -2,12 +2,11 @@
(:require [clojure
[string :as str]
[walk :as walk]]
[expectations :refer :all]
[expectations :refer [expect]]
[medley.core :as m]
[metabase
[email-test :as et]
[pulse :refer :all]
[query-processor :as qp]]
[pulse :refer :all]]
[metabase.integrations.slack :as slack]
[metabase.models
[card :refer [Card]]
......@@ -16,6 +15,7 @@
[pulse-channel :refer [PulseChannel]]
[pulse-channel-recipient :refer [PulseChannelRecipient]]]
[metabase.pulse.render :as render]
[metabase.query-processor.middleware.constraints :as constraints]
[metabase.test
[data :as data]
[util :as tu]]
......@@ -124,7 +124,7 @@
(send-pulse! (retrieve-pulse pulse-id))
(et/summarize-multipart-email #"Pulse Name" #"More results have been included" #"ID</th>"))))
;; Validate pulse queries are limited by qp/default-query-constraints
;; Validate pulse queries are limited by `default-query-constraints`
(expect
31 ;; Should return 30 results (the redef'd limit) plus the header row
(tt/with-temp* [Card [{card-id :id} (checkins-query {:aggregation nil})]
......@@ -137,8 +137,8 @@
PulseChannelRecipient [_ {:user_id (rasta-id)
:pulse_channel_id pc-id}]]
(email-test-setup
(with-redefs [qp/default-query-constraints {:max-results 10000
:max-results-bare-rows 30}]
(with-redefs [constraints/default-query-constraints {:max-results 10000
:max-results-bare-rows 30}]
(send-pulse! (retrieve-pulse pulse-id))
;; Slurp in the generated CSV and count the lines found in the file
(-> @et/inbox
......
(ns metabase.query-processor-test.failure-test
"Tests for how the query processor as a whole handles failures."
(:require [expectations :refer [expect]]
[medley.core :as m]
[metabase.query-processor :as qp]
[metabase.query-processor.interface :as qp.i]
[metabase.test.data :as data]))
......@@ -40,13 +41,16 @@
:native bad-query:native}
(-> (qp/process-query (bad-query))
(update :error (every-pred string? seq))
(update :stacktrace valid-stacktrace?)))
(update :stacktrace valid-stacktrace?)
;; don't care about query hash + type
(m/dissoc-in [:query :info])
(m/dissoc-in [:preprocessed :info])))
;; running via `process-query-and-save-execution!` should return similar info and a bunch of other nonsense too
(expect
{:database_id (data/id)
:started_at true
:json_query (bad-query)
:json_query (assoc-in (bad-query) [:middleware :userland-query?] true)
:native bad-query:native
:status :failed
:stacktrace true
......@@ -54,10 +58,12 @@
:error true
:row_count 0
:running_time true
:preprocessed (bad-query:preprocessed)
:preprocessed (assoc-in (bad-query:preprocessed) [:middleware :userland-query?] true)
:data {:rows [], :cols [], :columns []}}
(-> (qp/process-query-and-save-execution! (bad-query) {:context :question})
(update :error (every-pred string? seq))
(update :started_at (partial instance? java.util.Date))
(update :stacktrace valid-stacktrace?)
(update :running_time (complement neg?))))
(update :running_time (complement neg?))
(m/dissoc-in [:query :info])
(m/dissoc-in [:preprocessed :info])))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment