Skip to content
Snippets Groups Projects
Commit 231a2737 authored by Cam Saül's avatar Cam Saül
Browse files

Reworked QueryExecution table

parent a0df7cd4
No related branches found
No related tags found
No related merge requests found
Showing
with 467 additions and 176 deletions
databaseChangeLog:
- changeSet:
id: 51
author: camsaul
changes:
- createTable:
tableName: query_execution
remarks: 'A log of executed queries, used for calculating historic execution times, auditing, and other purposes.'
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
name: hash
type: binary(32)
remarks: 'The hash of the query dictionary. This is a 256-bit SHA3 hash of the query.'
constraints:
nullable: false
- column:
name: started_at
type: datetime
remarks: 'Timestamp of when this query started running.'
constraints:
nullable: false
- column:
name: running_time
type: integer
remarks: 'The time, in milliseconds, this query took to complete.'
constraints:
nullable: false
- column:
name: result_rows
type: integer
remarks: 'Number of rows in the query results.'
constraints:
nullable: false
- column:
name: native
type: boolean
remarks: 'Whether the query was a native query, as opposed to an MBQL one (e.g., created with the GUI).'
constraints:
nullable: false
- column:
name: context
type: varchar(32)
remarks: 'Short string specifying how this query was executed, e.g. in a Dashboard or Pulse.'
- column:
name: error
type: text
remarks: 'Error message returned by failed query, if any.'
# The following columns are foreign keys, but we don't keep FK constraints on them for a few reasons:
# - We don't want to keep indexes on these columns since they wouldn't be generally useful and for size and performance reasons
# - If a related object (e.g. a Dashboard) is deleted, we don't want to delete the related entries in the QueryExecution log.
# We could do something like make the constraint ON DELETE SET NULL, but that would require a full table scan to handle;
# If the QueryExecution log became tens of millions of rows large it would take a very long time to scan and update records
- column:
name: executor_id
type: integer
remarks: 'The ID of the User who triggered this query execution, if any.'
- column:
name: card_id
type: integer
remarks: 'The ID of the Card (Question) associated with this query execution, if any.'
- column:
name: dashboard_id
type: integer
remarks: 'The ID of the Dashboard associated with this query execution, if any.'
- column:
name: pulse_id
type: integer
remarks: 'The ID of the Pulse associated with this query execution, if any.'
# For things like auditing recently executed queries
- createIndex:
tableName: query_execution
indexName: idx_query_execution_started_at
columns:
column:
name: started_at
# For things like seeing the 10 most recent executions of a certain query
- createIndex:
tableName: query_execution
indexName: idx_query_execution_query_hash_started_at
columns:
column:
name: query_hash
column:
name: started_at
......@@ -356,34 +356,44 @@
(defn run-query-for-card
"Run the query for Card with PARAMETERS and CONSTRAINTS, and return results in the usual format."
[card-id & {:keys [parameters constraints]
:or {constraints dataset-api/default-query-constraints}}]
{:style/indent 1}
[card-id & {:keys [parameters constraints context dashboard-id]
:or {constraints dataset-api/default-query-constraints
context :question}}]
{:pre [(u/maybe? sequential? parameters)]}
(let [card (read-check Card card-id)
query (assoc (:dataset_query card)
:parameters parameters
:constraints constraints)
options {:executed-by *current-user-id*
:card-id card-id}]
options {:executed-by *current-user-id*
:context context
:card-id card-id
:dashboard-id dashboard-id}]
(check-not-archived card)
(qp/dataset-query query options)))
(defendpoint POST "/:card-id/query"
"Run the query associated with a Card."
[card-id :as {{:keys [parameters]} :body}]
[card-id, :as {{:keys [parameters]} :body}]
(run-query-for-card card-id, :parameters parameters))
(defendpoint POST "/:card-id/query/csv"
"Run the query associated with a Card, and return its results as CSV. Note that this expects the parameters as serialized JSON in the 'parameters' parameter"
[card-id parameters]
{parameters (s/maybe su/JSONString)}
(dataset-api/as-csv (run-query-for-card card-id, :parameters (json/parse-string parameters keyword), :constraints nil)))
(dataset-api/as-csv (run-query-for-card card-id
:parameters (json/parse-string parameters keyword)
:constraints nil
:context :csv-download)))
(defendpoint POST "/:card-id/query/json"
"Run the query associated with a Card, and return its results as JSON. Note that this expects the parameters as serialized JSON in the 'parameters' parameter"
[card-id parameters]
{parameters (s/maybe su/JSONString)}
(dataset-api/as-json (run-query-for-card card-id, :parameters (json/parse-string parameters keyword), :constraints nil)))
(dataset-api/as-json (run-query-for-card card-id
:parameters (json/parse-string parameters keyword)
:constraints nil
:context :json-download)))
;;; ------------------------------------------------------------ Sharing is Caring ------------------------------------------------------------
......
......@@ -9,8 +9,9 @@
(metabase.models [card :refer [Card]]
[database :refer [Database]]
[query-execution :refer [QueryExecution]])
(metabase [query-processor :as qp]
[util :as u])
[metabase.query-processor :as qp]
[metabase.query-processor.util :as qputil]
[metabase.util :as u]
[metabase.util.schema :as su]))
(def ^:private ^:const max-results-bare-rows
......@@ -27,27 +28,22 @@
:max-results-bare-rows max-results-bare-rows})
(defendpoint POST "/"
"Execute an MQL query and retrieve the results as JSON."
"Execute a query and retrieve the results in the usual format."
[:as {{:keys [database] :as body} :body}]
(read-check Database database)
;; add sensible constraints for results limits on our query
(let [query (assoc body :constraints default-query-constraints)]
(qp/dataset-query query {:executed-by *current-user-id*})))
(qp/dataset-query query {:executed-by *current-user-id*, :context :ad-hoc})))
(defendpoint POST "/duration"
"Get historical query execution duration."
[:as {{:keys [database] :as query} :body}]
[:as {{:keys [database], :as query} :body}]
(read-check Database database)
;; add sensible constraints for results limits on our query
(let [query (assoc query :constraints default-query-constraints)
running-times (db/select-field :running_time QueryExecution
:query_hash (hash query)
{:order-by [[:started_at :desc]]
:limit 10})]
{:average (if (empty? running-times)
0
(float (/ (reduce + running-times)
(count running-times))))}))
;; try calculating the average for the query as it was given to us, otherwise with the default constraints if there's no data there.
;; if we still can't find relevant info, just default to 0
{:average (or (qputil/query-average-duration query)
(qputil/query-average-duration (assoc query :constraints default-query-constraints))
0)})
(defn as-csv
"Return a CSV response containing the RESULTS of a query."
......@@ -85,7 +81,7 @@
{query su/JSONString}
(let [query (json/parse-string query keyword)]
(read-check Database (:database query))
(as-csv (qp/dataset-query (dissoc query :constraints) {:executed-by *current-user-id*}))))
(as-csv (qp/dataset-query (dissoc query :constraints) {:executed-by *current-user-id*, :context :csv-download}))))
(defendpoint POST "/json"
"Execute a query and download the result data as a JSON file."
......@@ -93,7 +89,7 @@
{query su/JSONString}
(let [query (json/parse-string query keyword)]
(read-check Database (:database query))
(as-json (qp/dataset-query (dissoc query :constraints) {:executed-by *current-user-id*}))))
(as-json (qp/dataset-query (dissoc query :constraints) {:executed-by *current-user-id*, :context :json-download}))))
(define-routes)
......@@ -191,7 +191,7 @@
{:pre [(integer? card-id) (u/maybe? map? embedding-params) (map? token-params) (map? query-params)]}
(let [parameter-values (validate-params embedding-params token-params query-params)
parameters (apply-parameter-values (resolve-card-parameters card-id) parameter-values)]
(apply public-api/run-query-for-card-with-id card-id parameters options)))
(apply public-api/run-query-for-card-with-id card-id parameters, :context :embedded-question, options)))
;;; ------------------------------------------------------------ Dashboard Fns used by both /api/embed and /api/preview_embed ------------------------------------------------------------
......@@ -215,7 +215,7 @@
{:pre [(integer? dashboard-id) (integer? dashcard-id) (integer? card-id) (u/maybe? map? embedding-params) (map? token-params) (map? query-params)]}
(let [parameter-values (validate-params embedding-params token-params query-params)
parameters (apply-parameter-values (resolve-dashboard-parameters dashboard-id dashcard-id card-id) parameter-values)]
(public-api/public-dashcard-results dashboard-id card-id parameters)))
(public-api/public-dashcard-results dashboard-id card-id parameters, :context :embedded-dashboard)))
;;; ------------------------------------------------------------ Other /api/embed-specific utility fns ------------------------------------------------------------
......
......@@ -35,8 +35,8 @@
(defendpoint GET "/"
"Fetch *all* `Metrics`."
[id]
(filter mi/can-read? (-> (db/select Metric, :is_active true)
(hydrate :creator))))
(filter mi/can-read? (-> (db/select Metric, :is_active true, {:order-by [:%lower.name]})
(hydrate :creator))))
(defendpoint PUT "/:id"
......
......@@ -56,7 +56,7 @@
(u/prog1 (-> (let [parameters (if (string? parameters) (json/parse-string parameters keyword) parameters)]
(binding [api/*current-user-permissions-set* (atom #{"/"})
qp/*allow-queries-with-no-executor-id* true]
(apply card-api/run-query-for-card card-id, :parameters parameters, options)))
(apply card-api/run-query-for-card card-id, :parameters parameters, :context :public-question, options)))
(u/select-nested-keys [[:data :columns :cols :rows :rows_truncated] [:json_query :parameters] :error :status]))
;; if the query failed instead of returning anything about the query just return a generic error message
(when (= (:status <>) :failed)
......@@ -169,7 +169,8 @@
(defn public-dashcard-results
"Return the results of running a query with PARAMETERS for Card with CARD-ID belonging to Dashboard with DASHBOARD-ID.
Throws a 404 if the Card isn't part of the Dashboard."
[dashboard-id card-id parameters]
[dashboard-id card-id parameters & {:keys [context]
:or {context :public-dashboard}}]
(api/check-404 (or (db/exists? DashboardCard
:dashboard_id dashboard-id
:card_id card-id)
......@@ -177,7 +178,7 @@
(db/exists? DashboardCardSeries
:card_id card-id
:dashboardcard_id [:in dashcard-ids]))))
(run-query-for-card-with-id card-id parameters))
(run-query-for-card-with-id card-id parameters, :context context, :dashboard-id dashboard-id))
(api/defendpoint GET "/dashboard/:uuid/card/:card-id"
"Fetch the results for a Card in a publically-accessible Dashboard. Does not require auth credentials. Public sharing must be enabled."
......@@ -197,7 +198,7 @@
maxheight (s/maybe su/IntString)
maxwidth (s/maybe su/IntString)}
(let [height (if maxheight (Integer/parseInt maxheight) default-embed-max-height)
width (if maxwidth (Integer/parseInt maxwidth) default-embed-max-width)]
width (if maxwidth (Integer/parseInt maxwidth) default-embed-max-width)]
{:version "1.0"
:type "rich"
:width width
......
......@@ -16,7 +16,8 @@
[metabase.pulse :as p]
[metabase.pulse.render :as render]
[metabase.util :as u]
[metabase.util.schema :as su]))
[metabase.util.schema :as su])
(:import java.io.ByteArrayInputStream))
(defendpoint GET "/"
......@@ -97,7 +98,7 @@
"Get HTML rendering of a `Card` with ID."
[id]
(let [card (read-check Card id)
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*})]
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*, :context :pulse, :card-id id})]
{:status 200, :body (html [:html [:body {:style "margin: 0;"} (binding [render/*include-title* true
render/*include-buttons* true]
(render/render-pulse-card card result))]])}))
......@@ -106,7 +107,7 @@
"Get JSON object containing HTML rendering of a `Card` with ID and other information."
[id]
(let [card (read-check Card id)
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*})
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*, :context :pulse, :card-id id})
data (:data result)
card-type (render/detect-pulse-card-type card data)
card-html (html (binding [render/*include-title* true]
......@@ -120,10 +121,10 @@
"Get PNG rendering of a `Card` with ID."
[id]
(let [card (read-check Card id)
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*})
result (qp/dataset-query (:dataset_query card) {:executed-by *current-user-id*, :context :pulse, :card-id id})
ba (binding [render/*include-title* true]
(render/render-pulse-card-to-png card result))]
{:status 200, :headers {"Content-Type" "image/png"}, :body (new java.io.ByteArrayInputStream ba)}))
{:status 200, :headers {"Content-Type" "image/png"}, :body (ByteArrayInputStream. ba)}))
(defendpoint POST "/test"
"Test send an unsaved pulse."
......
......@@ -129,8 +129,7 @@
lon-col-idx (Integer/parseInt lon-col-idx)
query (json/parse-string query keyword)
updated-query (update query :query (u/rpartial query-with-inside-filter lat-field-id lon-field-id x y zoom))
result (qp/dataset-query updated-query {:executed-by *current-user-id*
:synchronously true})
result (qp/dataset-query updated-query {:executed-by *current-user-id*, :context :map-tiles})
points (for [row (-> result :data :rows)]
[(nth row lat-col-idx) (nth row lon-col-idx)])]
;; manual ring response here. we simply create an inputstream from the byte[] of our image
......
......@@ -6,7 +6,8 @@
CREATE TABLE IF NOT EXISTS ... -- Good
CREATE TABLE ... -- Bad"
(:require [clojure.string :as str]
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[clojure.tools.logging :as log]
[schema.core :as s]
(toucan [db :as db]
......@@ -26,12 +27,14 @@
[permissions :refer [Permissions], :as perms]
[permissions-group :as perm-group]
[permissions-group-membership :refer [PermissionsGroupMembership], :as perm-membership]
[query-execution :refer [QueryExecution], :as query-execution]
[raw-column :refer [RawColumn]]
[raw-table :refer [RawTable]]
[table :refer [Table] :as table]
[setting :refer [Setting], :as setting]
[user :refer [User]])
[metabase.public-settings :as public-settings]
[metabase.query-processor.util :as qputil]
[metabase.util :as u]))
;;; # Migration Helpers
......@@ -327,3 +330,43 @@
(defmigration ^{:author "camsaul", :added "0.23.0"} copy-site-url-setting-and-remove-trailing-slashes
(when-let [site-url (db/select-one-field :value Setting :key "-site-url")]
(public-settings/site-url site-url)))
;;; ------------------------------------------------------------ Migrating QueryExecutions ------------------------------------------------------------
;; We're copying over data from the legacy `query_queryexecution` table to the new `query_execution` table; see #4522 and #4531 for details
;; model definition for the old table to facilitate the data copying process
(models/defmodel ^:private ^:deprecated LegacyQueryExecution :query_queryexecution)
(u/strict-extend (class LegacyQueryExecution)
models/IModel
(merge models/IModelDefaults
{:default-fields (constantly [:executor_id :result_rows :started_at :json_query :error :running_time])
:types (constantly {:json_query :json})}))
(defn- LegacyQueryExecution->QueryExecution
"Convert a LegacyQueryExecution to a format suitable for insertion as a new-format QueryExecution."
[{query :json_query, :as query-execution}]
(-> (assoc query-execution
:hash (qputil/query-hash query)
:native (not (qputil/mbql-query? query)))
;; since error is nullable now remove the old blank error message strings
(update :error (fn [error-message]
(when-not (str/blank? error-message)
error-message)))
(dissoc :json_query)))
;; Migrate entries from the old query execution table to the new one. This might take a few minutes
(defmigration ^{:author "camsaul", :added "0.23.0"} migrate-query-executions
;; migrate the most recent 100,000 entries
;; make sure the DB doesn't get snippy by trying to insert too many records at once. Divide the INSERT statements into chunks of 1,000
(doseq [chunk (partition-all 1000 (db/select LegacyQueryExecution {:limit 100000, :order-by [[:id :desc]]}))]
(db/insert-many! QueryExecution
(for [query-execution chunk]
(LegacyQueryExecution->QueryExecution query-execution)))))
;; drop the legacy QueryExecution table now that we don't need it anymore
(defmigration ^{:author "camsaul", :added "0.23.0"} drop-old-query-execution-table
;; DROP TABLE IF EXISTS should work on Postgres, MySQL, and H2
(jdbc/execute! (db/connection) [(format "DROP TABLE IF EXISTS %s;" ((db/quote-fn) "query_queryexecution"))]))
......@@ -126,7 +126,7 @@
(do
(with-metabot-permissions
(read-check Card card-id))
(do-async (let [attachments (pulse/create-and-upload-slack-attachments! [(pulse/execute-card card-id)])]
(do-async (let [attachments (pulse/create-and-upload-slack-attachments! [(pulse/execute-card card-id, :context :metabot)])]
(slack/post-chat-message! *channel-id*
nil
attachments)))
......
......@@ -341,7 +341,8 @@
(fn [request]
(try (binding [*automatically-catch-api-exceptions* false]
(handler request))
(catch Throwable _
(catch Throwable e
(log/error (.getMessage e))
{:status 400, :body "An error occurred."}))))
(defn message-only-exceptions
......
(ns metabase.models.query-execution
(:require [toucan.models :as models]
(:require [schema.core :as s]
[toucan.models :as models]
[metabase.util :as u]))
(models/defmodel QueryExecution :query_queryexecution)
(models/defmodel QueryExecution :query_execution)
(def ^:dynamic ^Boolean *validate-context*
"Whether we should validate the values of `context` for QueryExecutions when INSERTing them.
(In normal usage, this should always be `true`, but this switch is provided so we can migrating
legacy QueryExecution entries, which have no `context` information)."
true)
(def Context
"Schema for valid values of QueryExecution `:context`."
(s/enum :ad-hoc
:csv-download
:dashboard
:embedded-dashboard
:embedded-question
:json-download
:map-tiles
:metabot
:public-dashboard
:public-question
:pulse
:question))
(defn- pre-insert [{context :context, :as query-execution}]
(u/prog1 query-execution
(when *validate-context*
(s/validate Context context))))
(defn- post-select [{:keys [result_rows] :as query-execution}]
;; sadly we have 2 ways to reference the row count :(
......@@ -12,6 +39,7 @@
(u/strict-extend (class QueryExecution)
models/IModel
(merge models/IModelDefaults
{:default-fields (constantly [:id :uuid :version :json_query :raw_query :status :started_at :finished_at :running_time :error :result_rows])
:types (constantly {:json_query :json, :result_data :json, :status :keyword, :raw_query :clob, :error :clob, :additional_info :clob})
:post-select post-select}))
{:types (constantly {:json_query :json, :status :keyword, :context :keyword, :error :clob})
:pre-insert pre-insert
:pre-update (fn [& _] (throw (Exception. "You cannot update a QueryExecution!")))
:post-select post-select}))
......@@ -19,13 +19,15 @@
;; TODO: this is probably something that could live somewhere else and just be reused
(defn execute-card
"Execute the query for a single card."
[card-id]
"Execute the query for a single card with CARD-ID. OPTIONS are passed along to `dataset-query`."
[card-id & {:as options}]
{:pre [(integer? card-id)]}
(when-let [card (Card card-id)]
(let [{:keys [creator_id dataset_query]} card]
(try
{:card card :result (qp/dataset-query dataset_query {:executed-by creator_id})}
{:card card
:result (qp/dataset-query dataset_query (merge {:executed-by creator_id, :context :pulse, :card-id card-id}
options))}
(catch Throwable t
(log/warn (format "Error running card query (%n)" card-id) t))))))
......@@ -70,10 +72,10 @@
Example:
(send-pulse! pulse) Send to all Channels
(send-pulse! pulse :channel-ids [312]) Send only to Channel with :id = 312"
[{:keys [cards] :as pulse} & {:keys [channel-ids]}]
[{:keys [cards], :as pulse} & {:keys [channel-ids]}]
{:pre [(map? pulse) (every? map? cards) (every? :id cards)]}
(let [results (for [card cards]
(execute-card (:id card)))
(execute-card (:id card), :pulse-id (:id pulse))) ; Pulse ID may be `nil` if the Pulse isn't saved yet
channel-ids (or channel-ids (mapv :id (:channels pulse)))]
(doseq [channel-id channel-ids]
(let [{:keys [channel_type details recipients]} (some #(when (= channel-id (:id %)) %)
......
(ns metabase.query-processor
"Preprocessor that does simple transformations to all incoming queries, simplifing the driver-specific implementations."
(:require [clojure.tools.logging :as log]
[schema.core :as s]
[toucan.db :as db]
[metabase.driver :as driver]
[metabase.models.query-execution :refer [QueryExecution]]
[metabase.models.query-execution :refer [QueryExecution], :as query-execution]
[metabase.query-processor.util :as qputil]
(metabase.query-processor.middleware [add-implicit-clauses :as implicit-clauses]
[add-row-count-and-status :as row-count-and-status]
......@@ -22,7 +23,8 @@
[parameters :as parameters]
[permissions :as perms]
[resolve-driver :as resolve-driver])
[metabase.util :as u]))
[metabase.util :as u]
[metabase.util.schema :as su]))
;;; +-------------------------------------------------------------------------------------------------------+
;;; | QUERY PROCESSOR |
......@@ -99,49 +101,42 @@
(defn- save-query-execution!
"Save (or update) a `QueryExecution`."
[{:keys [id], :as query-execution}]
(if id
;; execution has already been saved, so update it
(u/prog1 query-execution
(db/update! QueryExecution id query-execution))
;; first time saving execution, so insert it
(db/insert! QueryExecution query-execution)))
[query-execution]
(u/prog1 query-execution
(db/insert! QueryExecution (dissoc query-execution :json_query))))
(defn- save-and-return-failed-query!
"Save QueryExecution state and construct a failed query response"
[query-execution error-message]
(let [updates {:status :failed
:error error-message
:finished_at (u/new-sql-timestamp)
:running_time (- (System/currentTimeMillis) (:start_time_millis query-execution))}]
;; record our query execution and format response
(-> query-execution
(dissoc :start_time_millis)
(merge updates)
save-query-execution!
(dissoc :raw_query :result_rows :version)
;; this is just for the response for clien
(assoc :error error-message
:row_count 0
:data {:rows []
:cols []
:columns []}))))
;; record our query execution and format response
(-> query-execution
(dissoc :start_time_millis)
(merge {:error error-message
:running_time (- (System/currentTimeMillis) (:start_time_millis query-execution))})
save-query-execution!
(dissoc :result_rows :hash :executor_id :native :card_id :dashboard_id :pulse_id)
;; this is just for the response for client
(assoc :status :failed
:error error-message
:row_count 0
:data {:rows []
:cols []
:columns []})))
(defn- save-and-return-successful-query!
"Save QueryExecution state and construct a completed (successful) query response"
[query-execution query-result]
;; record our query execution and format response
(-> (assoc query-execution
:status :completed
:finished_at (u/new-sql-timestamp)
:running_time (- (System/currentTimeMillis)
(:start_time_millis query-execution))
:result_rows (get query-result :row_count 0))
(dissoc :start_time_millis)
save-query-execution!
;; at this point we've saved and we just need to massage things into our final response format
(dissoc :error :raw_query :result_rows :version)
(merge query-result)))
(dissoc :error :result_rows :hash :executor_id :native :card_id :dashboard_id :pulse_id)
(merge query-result)
(assoc :status :completed)))
(defn- assert-query-status-successful
......@@ -161,21 +156,20 @@
(defn- query-execution-info
"Return the info for the `QueryExecution` entry for this QUERY."
[{{:keys [uuid executed-by query-hash]} :info, :as query}]
{:uuid (or uuid (throw (Exception. "Missing query UUID!")))
:executor_id executed-by
[{{:keys [executed-by query-hash query-type context card-id dashboard-id pulse-id]} :info, :as query}]
{:pre [(instance? (Class/forName "[B") query-hash)
(string? query-type)]}
{:executor_id executed-by
:card_id card-id
:dashboard_id dashboard-id
:pulse_id pulse-id
:context context
:hash (or query-hash (throw (Exception. "Missing query hash!")))
:native (= query-type "native")
:json_query (dissoc query :info)
:query_hash (or query-hash (throw (Exception. "Missing query hash!")))
:version 0
:error ""
:started_at (u/new-sql-timestamp)
:finished_at (u/new-sql-timestamp)
:running_time 0
:result_rows 0
:result_file ""
:result_data "{}"
:raw_query ""
:additional_info ""
:start_time_millis (System/currentTimeMillis)})
(defn- run-and-save-query!
......@@ -190,31 +184,31 @@
(log/warn (u/format-color 'red "Query failure: %s\n%s" (.getMessage e) (u/pprint-to-str (u/filtered-stacktrace e))))
(save-and-return-failed-query! query-execution (.getMessage e))))))
(defn dataset-query
(def ^:private DatasetQueryOptions
"Schema for the options map for the `dataset-query` function."
(s/constrained {:context query-execution/Context
(s/optional-key :executed-by) (s/maybe su/IntGreaterThanZero)
(s/optional-key :card-id) (s/maybe su/IntGreaterThanZero)
(s/optional-key :dashboard-id) (s/maybe su/IntGreaterThanZero)
(s/optional-key :pulse-id) (s/maybe su/IntGreaterThanZero)}
(fn [{:keys [executed-by]}]
(or (integer? executed-by)
*allow-queries-with-no-executor-id*))
"executed-by cannot be nil unless *allow-queries-with-no-executor-id* is true"))
(s/defn ^:always-validate dataset-query
"Process and run a json based dataset query and return results.
Takes 2 arguments:
1. the json query as a dictionary
2. query execution options specified in a dictionary
1. the json query as a map
2. query execution options (and context information) specified as a map
Depending on the database specified in the query this function will delegate to a driver specific implementation.
For the purposes of tracking we record each call to this function as a QueryExecution in the database.
Possible caller-options include:
:executed-by [int] (User ID of caller)
:card-id [int] (ID of Card associated with this execution)"
{:arglists '([query options])}
[query {:keys [executed-by card-id]}]
{:pre [(or (integer? executed-by)
*allow-queries-with-no-executor-id*)
(u/maybe? integer? card-id)]}
(let [query-uuid (str (java.util.UUID/randomUUID))
query-hash (hash query)
query (assoc query :info {:executed-by executed-by
:card-id card-id
:uuid query-uuid
:query-hash query-hash
:query-type (if (qputil/mbql-query? query) "MBQL" "native")})]
(run-and-save-query! query)))
OPTIONS must conform to the `DatasetQueryOptions` schema; refer to that for more details."
[query, options :- DatasetQueryOptions]
(run-and-save-query! (assoc query :info (assoc options
:query-hash (qputil/query-hash query)
:query-type (if (qputil/mbql-query? query) "MBQL" "native")))))
(ns metabase.query-processor.util
"Utility functions used by the global query processor and middleware functions.")
"Utility functions used by the global query processor and middleware functions."
(:require (buddy.core [codecs :as codecs]
[hash :as hash])
[cheshire.core :as json]
[toucan.db :as db]
[metabase.models.query-execution :refer [QueryExecution]]))
(defn mbql-query?
"Is the given query an MBQL query?"
......@@ -23,5 +28,39 @@
(defn query->remark
"Genarate an approparite REMARK to be prepended to a query to give DBAs additional information about the query being executed.
See documentation for `mbql->native` and [issue #2386](https://github.com/metabase/metabase/issues/2386) for more information."
^String [{{:keys [executed-by uuid query-hash query-type], :as info} :info}]
(format "Metabase:: userID: %s executionID: %s queryType: %s queryHash: %s" executed-by uuid query-type query-hash))
^String [{{:keys [executed-by query-hash query-type], :as info} :info}]
(str "Metabase" (when info
(assert (instance? (Class/forName "[B") query-hash))
(format ":: userID: %s queryType: %s queryHash: %s" executed-by query-type (codecs/bytes->hex query-hash)))))
;;; ------------------------------------------------------------ Hashing ------------------------------------------------------------
(defn- select-keys-for-hashing
"Return QUERY with only the keys relevant to hashing kept.
(This is done so irrelevant info or options that don't affect query results doesn't result in the same query producing different hashes.)"
[query]
{:pre [(map? query)]}
(let [{:keys [constraints parameters], :as query} (select-keys query [:database :type :query :parameters :constraints])]
(cond-> query
(empty? constraints) (dissoc :constraints)
(empty? parameters) (dissoc :parameters))))
(defn query-hash
"Return a 256-bit SHA3 hash of QUERY as a key for the cache. (This is returned as a byte array.)"
[query]
(hash/sha3-256 (json/generate-string (select-keys-for-hashing query))))
;;; ------------------------------------------------------------ Historic Duration Info ------------------------------------------------------------
(defn query-average-duration
"Return the average running time of QUERY over the last 10 executions in milliseconds.
Returns `nil` if there's not available data."
^Float [query]
(when-let [running-times (db/select-field :running_time QueryExecution
:hash (query-hash query)
{:order-by [[:started_at :desc]]
:limit 10})]
(float (/ (reduce + running-times)
(count running-times)))))
......@@ -297,7 +297,7 @@
(defn- executions-chunk
"Fetch the chunk of QueryExecutions whose ID is greater than STARTING-ID."
[starting-id]
(db/select [QueryExecution :id :executor_id :running_time :status]
(db/select [QueryExecution :id :executor_id :running_time :error]
:id [:> starting-id]
{:order-by [:id], :limit executions-chunk-size}))
......@@ -316,7 +316,9 @@
([summary execution]
(-> summary
(update :executions u/safe-inc)
(update-in [:by_status (:status execution)] u/safe-inc)
(update-in [:by_status (if (:error execution)
"failed"
"completed")] u/safe-inc)
(update-in [:num_per_user (:executor_id execution)] u/safe-inc)
(update-in [:num_by_latency (bin-large-number (/ (:running_time execution) 1000))] u/safe-inc))))
......
......@@ -40,85 +40,85 @@
(defn format-response [m]
(into {} (for [[k v] m]
(cond
(contains? #{:id :uuid :started_at :finished_at :running_time} k) [k (boolean v)]
(contains? #{:id :started_at :running_time :hash} k) [k (boolean v)]
(= :data k) [k (if-not (contains? v :native_form)
v
(update v :native_form boolean))]
:else [k v]))))
(defn- most-recent-query-execution [] (db/select-one QueryExecution {:order-by [[:id :desc]]}))
;;; ## POST /api/meta/dataset
;; Just a basic sanity check to make sure Query Processor endpoint is still working correctly.
(expect
;; the first result is directly from the api call
;; the second result is checking our QueryExection log to ensure it captured the right details
[{:data {:rows [[1000]]
[;; API call response
{:data {:rows [[1000]]
:columns ["count"]
:cols [{:base_type "type/Integer", :special_type "type/Number", :name "count", :display_name "count", :id nil, :table_id nil,
:description nil, :target nil, :extra_info {}, :source "aggregation"}]
:native_form true}
:row_count 1
:status "completed"
:id true
:uuid true
:context "ad-hoc"
:json_query (-> (wrap-inner-query
(query checkins
(ql/aggregation (ql/count))))
(ql/aggregation (ql/count))))
(assoc :type "query")
(assoc-in [:query :aggregation] [{:aggregation-type "count", :custom-name nil}])
(assoc :constraints default-query-constraints))
:started_at true
:finished_at true
:running_time true}
{:row_count 1
;; QueryExecution record in the DB
{:hash true
:row_count 1
:result_rows 1
:status :completed
:error ""
:context :ad-hoc
:executor_id (user->id :rasta)
:native false
:pulse_id nil
:card_id nil
:dashboard_id nil
:error nil
:id true
:uuid true
:raw_query ""
:json_query (-> (wrap-inner-query
(query checkins
(ql/aggregation (ql/count))))
(assoc :type "query")
(assoc-in [:query :aggregation] [{:aggregation-type "count", :custom-name nil}])
(assoc :constraints default-query-constraints))
:started_at true
:finished_at true
:running_time true
:version 0}]
:running_time true}]
(let [result ((user->client :rasta) :post 200 "dataset" (wrap-inner-query
(query checkins
(ql/aggregation (ql/count)))))]
(ql/aggregation (ql/count)))))]
[(format-response result)
(format-response (QueryExecution :uuid (:uuid result)))]))
(format-response (most-recent-query-execution))]))
;; Even if a query fails we still expect a 200 response from the api
(expect
;; the first result is directly from the api call
;; the second result is checking our QueryExection log to ensure it captured the right details
(let [output {:data {:rows []
:columns []
:cols []}
:row_count 0
:status "failed"
:error true
:id true
:uuid true
:json_query {:database (id)
:type "native"
:native {:query "foobar"}
:constraints default-query-constraints}
:started_at true
:finished_at true
:running_time true}]
[output
(-> output
(dissoc :data)
(assoc :status :failed
:version 0
:raw_query ""
:result_rows 0))])
[;; API call response
{:data {:rows []
:columns []
:cols []}
:row_count 0
:status "failed"
:context "ad-hoc"
:error true
:json_query {:database (id)
:type "native"
:native {:query "foobar"}
:constraints default-query-constraints}
:started_at true
:running_time true}
;; QueryExecution entry in the DB
{:hash true
:id true
:result_rows 0
:row_count 0
:context :ad-hoc
:error true
:started_at true
:running_time true
:executor_id (user->id :rasta)
:native true
:pulse_id nil
:card_id nil
:dashboard_id nil}]
;; Error message's format can differ a bit depending on DB version and the comment we prepend to it, so check that it exists and contains the substring "Syntax error in SQL statement"
(let [check-error-message (fn [output]
(update output :error (fn [error-message]
......@@ -127,4 +127,4 @@
:type "native"
:native {:query "foobar"}})]
[(check-error-message (format-response result))
(check-error-message (format-response (QueryExecution :uuid (:uuid result))))]))
(check-error-message (format-response (most-recent-query-execution)))]))
......@@ -8,20 +8,25 @@
;; Check to make sure we're migrating all of our entities.
;; This fetches the `metabase.cmd.load-from-h2/entities` and compares it all existing entities
(defn- migrated-entity-names []
(defn- migrated-model-names []
(set (map :name @(resolve 'metabase.cmd.load-from-h2/entities))))
(defn- all-entity-names []
(def ^:private models-to-exclude
"Models that should *not* be migrated in `load-from-h2`."
#{"LegacyQueryExecution"})
(defn- all-model-names []
(set (for [ns (ns-find/find-namespaces (classpath/classpath))
:when (or (re-find #"^metabase\.models\." (name ns))
(= (name ns) "metabase.db.migrations"))
:when (not (re-find #"test" (name ns)))
[_ varr] (do (require ns)
(ns-interns ns))
:let [entity (var-get varr)]
:when (models/model? entity)]
(:name entity))))
:let [{model-name :name, :as model} (var-get varr)]
:when (and (models/model? model)
(not (contains? models-to-exclude model-name)))]
model-name)))
(expect
(all-entity-names)
(migrated-entity-names))
(all-model-names)
(migrated-model-names))
......@@ -679,9 +679,9 @@
;; admin should see all 3
(expect-with-test-data
["DB 1 Count of Venues"
"DB 2 Count of Venues"
"DB 2 Count of Users"]
["DB 1 Count of Venues"
"DB 2 Count of Users"
"DB 2 Count of Venues"]
(GET-metric :crowberto))
;; regular should only see metric for DB 1
......
......@@ -15,3 +15,82 @@
:limit 10}}))
(expect false (qputil/query-without-aggregations-or-limits? {:query {:aggregation [{:aggregation-type :count}]
:page 1}}))
;;; ------------------------------------------------------------ Tests for qputil/query-hash ------------------------------------------------------------
(defn- array= {:style/indent 0}
([a b]
(java.util.Arrays/equals a b))
([a b & more]
(and (array= a b)
(apply array= b more))))
;; qputil/query-hash should always hash something the same way, every time
(expect
(array=
(byte-array [124 17 52 -28 71 -73 107 4 -108 39 42 -6 15 36 58 46 93 -59 103 -123 101 78 15 63 -10 -110 55 100 91 122 71 -23])
(qputil/query-hash {:query :abc})))
(expect
(array=
(qputil/query-hash {:query :def})
(qputil/query-hash {:query :def})))
;; different queries should produce different hashes
(expect
false
(array=
(qputil/query-hash {:query :abc})
(qputil/query-hash {:query :def})))
(expect
false
(array=
(qputil/query-hash {:query :abc, :database 1})
(qputil/query-hash {:query :abc, :database 2})))
(expect
false
(array=
(qputil/query-hash {:query :abc, :type "query"})
(qputil/query-hash {:query :abc, :type "native"})))
(expect
false
(array=
(qputil/query-hash {:query :abc, :parameters [1]})
(qputil/query-hash {:query :abc, :parameters [2]})))
(expect
false
(array=
(qputil/query-hash {:query :abc, :constraints {:max-rows 1000}})
(qputil/query-hash {:query :abc, :constraints nil})))
;; ... but keys that are irrelevant to the query should be ignored by qputil/query-hash
(expect
(array=
(qputil/query-hash {:query :abc, :random :def})
(qputil/query-hash {:query :abc, :random :xyz})))
;; empty `:parameters` lists should not affect the hash
(expect
(array=
(qputil/query-hash {:query :abc})
(qputil/query-hash {:query :abc, :parameters []})
(qputil/query-hash {:query :abc, :parameters nil})))
;; ...but non-empty ones should
(expect
false
(array=
(qputil/query-hash {:query :abc})
(qputil/query-hash {:query :abc, :parameters ["ABC"]})))
;; similarly, the presence of a `nil` value for `:constraints` should produce the same hash as not including the key at all
(expect
(array=
(qputil/query-hash {:query :abc})
(qputil/query-hash {:query :abc, :constraints nil})
(qputil/query-hash {:query :abc, :constraints {}})))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment