Skip to content
Snippets Groups Projects
Commit 64e7f471 authored by Cam Saül's avatar Cam Saül Committed by GitHub
Browse files

Merge pull request #4465 from metabase/qp-cleanup-2000

Fix initial nil value in native queries having no type info
parents 5c7417dc e668c371
No related branches found
No related tags found
No related merge requests found
Showing
with 485 additions and 585 deletions
......@@ -74,7 +74,6 @@
[ring/ring-jetty-adapter "1.5.1"] ; Ring adapter using Jetty webserver (used to run a Ring server for unit tests)
[ring/ring-json "0.4.0"] ; Ring middleware for reading/writing JSON automatically
[stencil "0.5.0"] ; Mustache templates for Clojure
[swiss-arrows "1.0.0"] ; 'Magic wand' macro -<>, etc.
[toucan "1.0.2" ; Model layer, hydration, and DB utilities
:exclusions [honeysql]]]
:repositories [["bintray" "https://dl.bintray.com/crate/crate"]] ; Repo for Crate JDBC driver
......
......@@ -16,8 +16,8 @@
[field :as field]
[table :as table])
[metabase.sync-database.analyze :as analyze]
[metabase.query-processor :as qp]
metabase.query-processor.interface
[metabase.query-processor.util :as qputil]
[metabase.util :as u]
[metabase.util.honeysql-extensions :as hx])
(:import (java.util Collections Date)
......@@ -298,9 +298,9 @@
:mbql? true})))
(defn- execute-query [{{{:keys [dataset-id]} :details, :as database} :database, {sql :query, params :params, :keys [table-name mbql?]} :native, :as outer-query}]
(let [sql (str "-- " (qp/query->remark outer-query) "\n" (if (seq params)
(unprepare/unprepare (cons sql params))
sql))
(let [sql (str "-- " (qputil/query->remark outer-query) "\n" (if (seq params)
(unprepare/unprepare (cons sql params))
sql))
results (process-native* database sql)
results (if mbql?
(post-process-mbql dataset-id table-name results)
......
......@@ -5,7 +5,6 @@
[clojure.tools.logging :as log]
[cheshire.core :as json]
[metabase.driver.druid.js :as js]
[metabase.query-processor :as qp]
(metabase.query-processor [annotate :as annotate]
[interface :as i])
[metabase.util :as u])
......@@ -81,11 +80,11 @@
:granularity :all
:context {:timeout 60000}}]
{::select (merge defaults {:queryType :select
:pagingSpec {:threshold qp/absolute-max-results}})
:pagingSpec {:threshold i/absolute-max-results}})
::total (merge defaults {:queryType :timeseries})
::grouped-timeseries (merge defaults {:queryType :timeseries})
::topN (merge defaults {:queryType :topN
:threshold qp/absolute-max-results})
:threshold i/absolute-max-results})
::groupBy (merge defaults {:queryType :groupBy})}))
......
......@@ -127,8 +127,8 @@
(result-set-read-column [x _ _] (PersistentVector/adopt x)))
(def ^:dynamic ^:private connection-pools
"A map of our currently open connection pools, keyed by DATABASE `:id`."
(def ^:dynamic ^:private database-id->connection-pool
"A map of our currently open connection pools, keyed by Database `:id`."
(atom {}))
(defn- create-connection-pool
......@@ -147,10 +147,10 @@
"We are being informed that a DATABASE has been updated, so lets shut down the connection pool (if it exists) under
the assumption that the connection details have changed."
[_ {:keys [id]}]
(when-let [pool (get @connection-pools id)]
(when-let [pool (get @database-id->connection-pool id)]
(log/debug (u/format-color 'red "Closing connection pool for database %d ..." id))
;; remove the cached reference to the pool so we don't try to use it anymore
(swap! connection-pools dissoc id)
(swap! database-id->connection-pool dissoc id)
;; now actively shut down the pool so that any open connections are closed
(.close ^ComboPooledDataSource (:datasource pool))))
......@@ -158,12 +158,12 @@
"Return a JDBC connection spec that includes a cp30 `ComboPooledDataSource`.
Theses connection pools are cached so we don't create multiple ones to the same DB."
[{:keys [id], :as database}]
(if (contains? @connection-pools id)
(if (contains? @database-id->connection-pool id)
;; we have an existing pool for this database, so use it
(get @connection-pools id)
(get @database-id->connection-pool id)
;; create a new pool and add it to our cache, then return it
(u/prog1 (create-connection-pool database)
(swap! connection-pools assoc id <>))))
(swap! database-id->connection-pool assoc id <>))))
(defn db->jdbc-connection-spec
"Return a JDBC connection spec for DATABASE. This will have a C3P0 pool as its datasource."
......@@ -216,10 +216,8 @@
(into [(hx/unescape-dots sql)] args)))
(defn- qualify+escape ^clojure.lang.Keyword
([table]
(hx/qualify-and-escape-dots (:schema table) (:name table)))
([table field]
(hx/qualify-and-escape-dots (:schema table) (:name table) (:name field))))
([table] (hx/qualify-and-escape-dots (:schema table) (:name table)))
([table field] (hx/qualify-and-escape-dots (:schema table) (:name table) (:name field))))
(defn- query
......@@ -329,6 +327,7 @@
;;; ## Database introspection methods used by sync process
;; TODO - clojure.java.jdbc now ships with a `metadata-query` function we could use here. See #2918
(defmacro with-metadata
"Execute BODY with `java.sql.DatabaseMetaData` for DATABASE."
[[binding _ database] & body]
......@@ -422,7 +421,7 @@
(require 'metabase.driver.generic-sql.query-processor)
{:active-tables fast-active-tables
:apply-aggregation (resolve 'metabase.driver.generic-sql.query-processor/apply-aggregation) ; don't resolve the vars yet so during interactive dev if the
:apply-breakout (resolve 'metabase.driver.generic-sql.query-processor/apply-breakout) ; underlying impl changes we won't have to reload all the drivers
:apply-breakout (resolve 'metabase.driver.generic-sql.query-processor/apply-breakout) ; underlying impl changes we won't have to reload all the drivers
:apply-fields (resolve 'metabase.driver.generic-sql.query-processor/apply-fields)
:apply-filter (resolve 'metabase.driver.generic-sql.query-processor/apply-filter)
:apply-join-tables (resolve 'metabase.driver.generic-sql.query-processor/apply-join-tables)
......
......@@ -11,9 +11,9 @@
(metabase [config :as config]
[driver :as driver])
[metabase.driver.generic-sql :as sql]
[metabase.query-processor :as qp]
(metabase.query-processor [annotate :as annotate]
interface)
[interface :as i]
[util :as qputil])
[metabase.util :as u]
[metabase.util.honeysql-extensions :as hx])
(:import java.sql.Timestamp
......@@ -289,7 +289,7 @@
[driverr {inner-query :query}]
{:pre [(map? inner-query)]}
(u/prog1 (apply-clauses driverr {} inner-query)
(when-not qp/*disable-qp-logging*
(when-not i/*disable-qp-logging*
(log/debug "HoneySQL Form: 🍯\n" (u/pprint-to-str 'cyan <>)))))
(defn mbql->native
......@@ -361,7 +361,7 @@
(defn execute-query
"Process and run a native (raw SQL) QUERY."
[driver {:keys [database settings], query :native, :as outer-query}]
(let [query (assoc query :remark (qp/query->remark outer-query))]
(let [query (assoc query :remark (qputil/query->remark outer-query))]
(do-with-try-catch
(fn []
(let [db-connection (sql/db->jdbc-connection-spec database)]
......
......@@ -10,9 +10,8 @@
[operators :refer :all])
[metabase.driver.mongo.util :refer [with-mongo-connection *mongo-connection* values->base-type]]
[metabase.models.table :refer [Table]]
[metabase.query-processor :as qp]
(metabase.query-processor [annotate :as annotate]
[interface :refer [qualified-name-components map->DateTimeField map->DateTimeValue]])
[interface :as i])
[metabase.util :as u])
(:import java.sql.Timestamp
java.util.Date
......@@ -35,7 +34,7 @@
(def ^:dynamic ^:private *query* nil)
(defn- log-monger-form [form]
(when-not qp/*disable-qp-logging*
(when-not i/*disable-qp-logging*
(log/debug (u/format-color 'green "\nMONGO AGGREGATION PIPELINE:\n%s\n"
(->> form
(walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
......@@ -68,7 +67,7 @@
(defn- field->name
"Return a single string name for FIELD. For nested fields, this creates a combined qualified name."
^String [^Field field, ^String separator]
(s/join separator (rest (qualified-name-components field))))
(s/join separator (rest (i/qualified-name-components field))))
(defmacro ^:private mongo-let
{:style/indent 1}
......@@ -196,8 +195,8 @@
RelativeDateTimeValue
(->rvalue [{:keys [amount unit field]}]
(->rvalue (map->DateTimeValue {:value (u/relative-date (or unit :day) amount)
:field field}))))
(->rvalue (i/map->DateTimeValue {:value (u/relative-date (or unit :day) amount)
:field field}))))
;;; ## CLAUSE APPLICATION
......
This diff is collapsed.
(ns metabase.query-processor.annotate
"Code that analyzes the results of running a query and adds relevant type information about results (including foreign key information).
TODO - The code in this namespace could definitely use a little cleanup to make it a little easier to wrap one's head around :)"
(:require [clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
......@@ -6,33 +8,12 @@
[toucan.db :as db]
[metabase.driver :as driver]
[metabase.models.field :refer [Field]]
[metabase.query-processor.interface :as i]
(metabase.query-processor [interface :as i]
[sort :as sort])
[metabase.util :as u])
(:import (metabase.query_processor.interface Expression
ExpressionRef)))
;; Fields should be returned in the following order:
;; 1. Breakout Fields
;;
;; 2. Aggregation Fields (e.g. sum, count)
;;
;; 3. Fields clause Fields, if they were added explicitly
;;
;; 4. All other Fields, sorted by:
;; A. :position (ascending)
;; Users can manually specify default Field ordering for a Table in the Metadata admin. In that case, return Fields in the specified
;; order; most of the time they'll have the default value of 0, in which case we'll compare...
;;
;; B. :special_type "group" -- :id Fields, then :name Fields, then everyting else
;; Attempt to put the most relevant Fields first. Order the Fields as follows:
;; 1. :id Fields
;; 2. :name Fields
;; 3. all other Fields
;;
;; C. Field Name
;; When two Fields have the same :position and :special_type "group", fall back to sorting Fields alphabetically by name.
;; This is arbitrary, but it makes the QP deterministic by keeping the results in a consistent order, which makes it testable.
;;; ## Field Resolution
(defn collect-fields
......@@ -194,53 +175,6 @@
(concat fields (for [k missing-keys]
(info-for-missing-key fields k)))))
;;; ## Field Sorting (TODO - Maybe move this into a separate namespace (`metabase.query-processor.sort`?)
;; We sort Fields with a "importance" vector like [source-importance position special-type-importance name]
(defn- source-importance-fn
"Create a function to return a importance for FIELD based on its source clause in the query.
e.g. if a Field comes from a `:breakout` clause, we should return that column first in the results."
[{:keys [fields-is-implicit]}]
(fn [{:keys [source]}]
(cond
(= source :breakout) :0-breakout
(= source :aggregation) :1-aggregation
(and (not fields-is-implicit)
(= source :fields)) :2-fields
:else :3-other)))
(defn- special-type-importance
"Return a importance for FIELD based on the relative importance of its `:special-type`.
i.e. a Field with special type `:id` should be sorted ahead of all other Fields in the results."
[{:keys [special-type]}]
(cond
(isa? special-type :type/PK) :0-id
(isa? special-type :type/Name) :1-name
:else :2-other))
(defn- field-importance-fn
"Create a function to return an \"importance\" vector for use in sorting FIELD."
[query]
(let [source-importance (source-importance-fn query)]
(fn [{:keys [position clause-position field-name source], :as field}]
[(source-importance field)
(or position
(when (contains? #{:fields :breakout} source)
clause-position)
Integer/MAX_VALUE)
(special-type-importance field)
field-name])))
(defn- sort-fields
"Sort FIELDS by their \"importance\" vectors."
[query fields]
(let [field-importance (field-importance-fn query)]
(when-not @(resolve 'metabase.query-processor/*disable-qp-logging*)
(log/debug (u/format-color 'yellow "Sorted fields:\n%s" (u/pprint-to-str (sort (map field-importance fields))))))
(sort-by field-importance fields)))
(defn- convert-field-to-expected-format
"Rename keys, provide default values, etc. for FIELD so it is in the format expected by the frontend."
[field]
......@@ -310,25 +244,26 @@
(add-aggregate-fields-if-needed query)
(map (u/rpartial update :field-name keyword))
(add-unknown-fields-if-needed result-keys)
(sort-fields query)
(sort/sort-fields query)
(map convert-field-to-expected-format)
(filter (comp (partial contains? result-keys) :name))
(m/distinct-by :name)
add-extra-info-to-fk-fields)))
(defn annotate
(defn annotate-and-sort
"Post-process a structured query to add metadata to the results. This stage:
1. Sorts the results according to the rules at the top of this page
2. Resolves the Fields returned in the results and adds information like `:columns` and `:cols`
expected by the frontend."
[query {:keys [columns rows]}]
[query {:keys [columns rows], :as results}]
(let [row-maps (for [row rows]
(zipmap columns row))
cols (resolve-sort-and-format-columns (:query query) (set columns))
columns (mapv :name cols)]
{:cols (vec (for [col cols]
(update col :name name)))
:columns (mapv name columns)
:rows (for [row row-maps]
(mapv row columns))}))
(assoc results
:cols (vec (for [col cols]
(update col :name name)))
:columns (mapv name columns)
:rows (for [row row-maps]
(mapv row columns)))))
......@@ -8,7 +8,7 @@
[schema.core :as s]
[toucan.db :as db]
[metabase.models.table :refer [Table]]
[metabase.query-processor.interface :refer [*driver*], :as i]
[metabase.query-processor.interface :as i]
[metabase.util :as u]
[metabase.util.schema :as su])
(:import (metabase.query_processor.interface AgFieldRef
......
......@@ -9,6 +9,24 @@
(:import clojure.lang.Keyword
java.sql.Timestamp))
;;; # ------------------------------------------------------------ CONSTANTS ------------------------------------------------------------
(def ^:const absolute-max-results
"Maximum number of rows the QP should ever return.
This is coming directly from the max rows allowed by Excel for now ...
https://support.office.com/en-nz/article/Excel-specifications-and-limits-1672b34d-7043-467e-8e27-269d656771c3"
1048576)
;;; # ------------------------------------------------------------ DYNAMIC VARS ------------------------------------------------------------
(def ^:dynamic ^Boolean *disable-qp-logging*
"Should we disable logging for the QP? (e.g., during sync we probably want to turn it off to keep logs less cluttered)."
false)
(def ^:dynamic *driver*
"The driver that will be used to run the query we are currently parsing.
Used by `assert-driver-supports` and other places.
......
(ns metabase.query-processor.middleware.add-implicit-clauses
"Middlware for adding an implicit `:fields` and `:order-by` clauses to certain queries."
(:require [clojure.tools.logging :as log]
[toucan.db :as db]
[metabase.models.field :refer [Field]]
(metabase.query-processor [interface :as i]
[resolve :as resolve]
[util :as qputil])))
(defn- fields-for-source-table
"Return the all fields for SOURCE-TABLE, for use as an implicit `:fields` clause."
[{source-table-id :id, :as source-table}]
(for [field (db/select [Field :name :display_name :base_type :special_type :visibility_type :table_id :id :position :description]
:table_id source-table-id
:visibility_type [:not-in ["sensitive" "retired"]]
:parent_id nil
{:order-by [[:position :asc]
[:id :desc]]})]
(let [field (resolve/resolve-table (i/map->Field (resolve/rename-mb-field-keys field))
{[nil source-table-id] source-table})]
(if (qputil/datetime-field? field)
(i/map->DateTimeField {:field field, :unit :default})
field))))
(defn- should-add-implicit-fields? [{{:keys [fields breakout], aggregations :aggregation} :query, :as query}]
(and (qputil/mbql-query? query)
(not (or (seq aggregations)
(seq breakout)
(seq fields)))))
(defn- add-implicit-fields [{{:keys [source-table]} :query, :as query}]
(if-not (should-add-implicit-fields? query)
query
;; this is a structured `:rows` query, so lets add a `:fields` clause with all fields from the source table + expressions
(let [fields (fields-for-source-table source-table)
expressions (for [[expression-name] (get-in query [:query :expressions])]
(i/strict-map->ExpressionRef {:expression-name (name expression-name)}))]
(when-not (seq fields)
(log/warn (format "Table '%s' has no Fields associated with it." (:name source-table))))
(-> query
(assoc-in [:query :fields-is-implicit] true)
(assoc-in [:query :fields] (concat fields expressions))))))
(defn- add-implicit-breakout-order-by
"`Fields` specified in `breakout` should add an implicit ascending `order-by` subclause *unless* that field is *explicitly* referenced in `order-by`."
[{{breakout-fields :breakout, order-by :order-by} :query, :as query}]
(if-not (qputil/mbql-query? query)
query
(let [order-by-fields (set (map :field order-by))
implicit-breakout-order-by-fields (filter (partial (complement contains?) order-by-fields)
breakout-fields)]
(cond-> query
(seq implicit-breakout-order-by-fields) (update-in [:query :order-by] concat (for [field implicit-breakout-order-by-fields]
{:field field, :direction :ascending}))))))
(defn add-implicit-clauses
"Add an implicit `fields` clause to queries with no `:aggregation`, `breakout`, or explicit `:fields` clauses.
Add implicit `:order-by` clauses for fields specified in a `:breakout`."
[qp]
(comp qp add-implicit-fields add-implicit-breakout-order-by))
(ns metabase.query-processor.middleware.add-row-count-and-status
"Middleware for adding `:row_count` and `:status` info to QP results."
(:require (metabase.query-processor [interface :as i]
[util :as qputil])))
(defn add-row-count-and-status
"Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and `status`)."
[qp]
(fn [{{:keys [max-results max-results-bare-rows]} :constraints, :as query}]
(let [results-limit (or (when (qputil/query-without-aggregations-or-limits? query)
max-results-bare-rows)
max-results
i/absolute-max-results)
results (qp query)
num-results (count (:rows results))]
(cond-> {:row_count num-results
:status :completed
:data results}
;; Add :rows_truncated if we've hit the limit so the UI can let the user know
(= num-results results-limit) (assoc-in [:data :rows_truncated] results-limit)))))
(ns metabase.query-processor.middleware.add-settings
"Middleware for adding a `:settings` map to a query before it is processed."
(:require [medley.core :as m]
[metabase.driver :as driver]))
(defn- add-settings* [{:keys [driver] :as query}]
(let [settings {:report-timezone (when (driver/driver-supports? driver :set-timezone)
(let [report-tz (driver/report-timezone)]
(when-not (empty? report-tz)
report-tz)))}]
(assoc query :settings (m/filter-vals (complement nil?) settings))))
(defn add-settings
"Adds the `:settings` map to the query which can contain any fixed properties that would be useful at execution time.
Currently supports a settings object like:
{:report-timezone \"US/Pacific\"}"
[qp]
(comp qp add-settings*))
(ns metabase.query-processor.middleware.annotate-and-sort
"Middleware for annotating (adding type information to) the results of a query and sorting the columns in the results."
(:require [metabase.driver :as driver]
(metabase.query-processor [annotate :as annotate]
[util :as qputil])))
(def ^:private ^:const ^Integer max-rows-to-scan-for-column-type-inference
"Maximum number of rows to scan to look for a non-`nil` value to determine type information.
This number is meant to be a good balance between not giving up prematurely and not scanning the entire set of results returned
(which can be millions of rows in some cases)."
100)
(defn- vals->base-type
"Given a sequence of VALS, return the Field base type of the first non-`nil` value, scanning up to `max-rows-to-scan-for-column-type-inference` results."
[vs]
(or (some (fn [v]
(when-not (nil? v)
(driver/class->base-type (class v))))
(take max-rows-to-scan-for-column-type-inference vs))
:type/*))
(defn- infer-column-types
"Infer the types of columns by looking at the first value for each in the results, and add the relevant information in `:cols`.
This is used for native queries, which don't have the type information from the original `Field` objects used in the query, which is added to the results by `annotate`."
[{:keys [columns rows], :as results}]
(assoc results
:columns (mapv name columns)
:cols (vec (for [i (range (count columns))]
{:name (name (nth columns i))
:base_type (vals->base-type (for [row rows]
(nth row i)))}))))
(defn annotate-and-sort
"Middleware for adding type information to columns returned by running a query, and sorting the columns in the results."
[qp]
(fn [query]
(let [results (qp query)]
(-> (if-not (or (qputil/mbql-query? query)
(:annotate? results))
(infer-column-types results)
(annotate/annotate-and-sort query results))
(dissoc :annotate?)))))
(ns metabase.query-processor.middleware.catch-exceptions
"Middleware for catching exceptions thrown by the query processor and returning them in a friendlier format."
(:require schema.utils
[metabase.query-processor.util :as qputil]
[metabase.query-processor.middleware.expand-resolve :as expand-resolve]
[metabase.util :as u])
(:import (schema.utils NamedError ValidationError)))
(defn- fail [query, ^Throwable e, & [additional-info]]
(merge {:status :failed
:class (class e)
:error (or (.getMessage e) (str e))
:stacktrace (u/filtered-stacktrace e)
:query (dissoc query :database :driver)
:expanded-query (when (qputil/mbql-query? query)
(u/ignore-exceptions
(dissoc (expand-resolve/expand-and-resolve query) :database :driver)))}
(when-let [data (ex-data e)]
{:ex-data data})
additional-info))
(defn- explain-schema-validation-error
"Return a nice error message to explain the schema validation error."
[error]
(cond
(instance? NamedError error) (let [nested-error (.error ^NamedError error)] ; recurse until we find the innermost nested named error, which is the reason we actually failed
(if (instance? NamedError nested-error)
(recur nested-error)
(or (when (map? nested-error)
(explain-schema-validation-error nested-error))
(.name ^NamedError error))))
(map? error) (first (for [e (vals error)
:when (or (instance? NamedError e)
(instance? ValidationError e))
:let [explanation (explain-schema-validation-error e)]
:when explanation]
explanation))
;; When an exception is thrown, a ValidationError comes back like (throws? ("foreign-keys is not supported by this driver." 10))
;; Extract the message if applicable
(instance? ValidationError error) (let [explanation (schema.utils/validation-error-explain error)]
(or (when (list? explanation)
(let [[reason [msg]] explanation]
(when (= reason 'throws?)
msg)))
explanation))))
(defn catch-exceptions
"Middleware for catching exceptions thrown by the query processor and returning them in a normal format."
[qp]
(fn [query]
(try (qp query)
(catch clojure.lang.ExceptionInfo e
(fail query e (when-let [data (ex-data e)]
(when (= (:type data) :schema.core/error)
(when-let [error (explain-schema-validation-error (:error data))]
{:error error})))))
(catch Throwable e
(fail query e)))))
(ns metabase.query-processor.middleware.cumulative-aggregations
"Middlware for handling `cumulative-count` and `cumulative-sum` aggregations."
(:require [metabase.query-processor.util :as qputil]
[metabase.util :as u]))
(defn- cumulative-aggregation-clause
"Does QUERY have any aggregations of AGGREGATION-TYPE?"
[aggregation-type {{aggregations :aggregation} :query, :as query}]
(when (qputil/mbql-query? query)
(some (fn [{ag-type :aggregation-type, :as ag}]
(when (= ag-type aggregation-type)
ag))
aggregations)))
(defn- pre-cumulative-aggregation
"Rewrite queries containing a cumulative aggregation (e.g. `:cumulative-count`) as a different 'basic' aggregation (e.g. `:count`).
This lets various drivers handle the aggregation normallly; we implement actual behavior here in post-processing."
[cumlative-ag-type basic-ag-type ag-field {{aggregations :aggregation, breakout-fields :breakout} :query, :as query}]
(update-in query [:query :aggregation] (fn [aggregations]
(for [{ag-type :aggregation-type, :as ag} aggregations]
(if-not (= ag-type cumlative-ag-type)
ag
{:aggregation-type basic-ag-type, :field ag-field})))))
(defn- post-cumulative-aggregation [basic-ag-type ag-field {rows :rows, cols :cols, :as results}]
(let [ ;; Determine the index of the field we need to cumulative sum
field-index (u/prog1 (u/first-index-satisfying (comp (partial = (name basic-ag-type)) :name)
cols)
(assert (integer? <>)))
;; Now make a sequence of cumulative sum values for each row
values (reductions + (for [row rows]
(nth row field-index)))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) field-index value))
rows values)]
(assoc results :rows rows)))
(defn- cumulative-aggregation [cumulative-ag-type basic-ag-type qp]
(let [cumulative-ag-clause (partial cumulative-aggregation-clause cumulative-ag-type)
pre-cumulative-ag (partial pre-cumulative-aggregation cumulative-ag-type basic-ag-type)
post-cumulative-ag (partial post-cumulative-aggregation basic-ag-type)]
(fn [query]
(if-let [{ag-field :field} (cumulative-ag-clause query)]
(post-cumulative-ag ag-field (qp (pre-cumulative-ag ag-field query)))
(qp query)))))
(def ^:private ^{:arglists '([qp])} cumulative-sum
"Handle `cumulative-sum` aggregations, which is done by rewriting the aggregation as a `:sum` in pre-processing and acculumlating the results in post-processing."
(partial cumulative-aggregation :cumulative-sum :sum))
(def ^:private ^{:arglists '([qp])} cumulative-count
"Handle `cumulative-count` aggregations, which is done by rewriting the aggregation as a `:count` in pre-processing and acculumlating the results in post-processing."
(partial cumulative-aggregation :cumulative-count :count))
(def ^{:arglists '([qp])} handle-cumulative-aggregations
"Handle `cumulative-sum` and `cumulative-count` aggregations by rewriting the aggregations appropriately in pre-processing and accumulating the results in post-processing."
(comp cumulative-sum cumulative-count))
(ns metabase.query-processor.middleware.dev
"Middleware that's only active in dev and test scenarios. These middleware functions do additional checks
of query processor behavior that are undesirable in normal production use."
(:require [schema.core :as s]
(metabase [config :as config]
[util :as u])))
;; The following are just assertions that check the behavior of the QP. It doesn't make sense to run them on prod because at best they
;; just waste CPU cycles and at worst cause a query to fail when it would otherwise succeed.
(def QPResultsFormat
"Schema for the expected format of results returned by a query processor."
{:columns [(s/cond-pre s/Keyword s/Str)]
(s/optional-key :cols) [{s/Keyword s/Any}] ; This is optional because QPs don't neccesarily have to add it themselves; annotate will take care of that
:rows [[s/Any]]
s/Keyword s/Any})
(def ^{:arglists '([results])} validate-results
"Validate that the RESULTS of executing a query match the `QPResultsFormat` schema.
Throws an `Exception` if they are not; returns RESULTS as-is if they are."
(s/validator QPResultsFormat))
(def ^{:arglists '([qp])} check-results-format
"Make sure the results of a QP execution are in the expected format.
This takes place *after* the 'annotation' stage of post-processing.
This check is skipped in prod to avoid wasting CPU cycles."
(if config/is-prod?
identity
(fn [qp]
(comp validate-results qp))))
(def ^{:arglists '([qp])} guard-multiple-calls
"Throw an exception if a QP function accidentally calls (QP QUERY) more than once.
This test is skipped in prod to avoid wasting CPU cycles."
(if config/is-prod?
identity
(fn [qp]
(comp qp (let [called? (atom false)]
(fn [query]
(u/prog1 query
(assert (not @called?) "(QP QUERY) IS BEING CALLED MORE THAN ONCE!")
(reset! called? true))))))))
(ns metabase.query-processor.middleware.driver-specific
"Middleware that hands off to a driver's implementation of `process-query-in-context`, if any.
If implemented, this effectively lets one inject custom driver-specific middleware for the QP.
Drivers can use it to different things like rewrite queries as needed or perform special permissions checks."
(:require [metabase.driver :as driver]))
(defn process-query-in-context
"Middleware that runs the query using the driver's `process-query-in-context` implementation, if any.
(Implementing this method effectively allows drivers to inject their own QP middleware functions.)"
[qp]
(fn [{driver :driver, :as query}]
((driver/process-query-in-context driver qp) query)))
(ns metabase.query-processor.middleware.expand-macros
"Middleware for expanding `METRIC` and `SEGMENT` 'macros' in *unexpanded* MBQL queries."
(:require [clojure.tools.logging :as log]
(metabase.query-processor [interface :as i]
[macros :as macros]
[util :as qputil])
[metabase.util :as u]))
(defn- expand-macros* [query]
(if-not (qputil/mbql-query? query)
query
(u/prog1 (macros/expand-macros query)
(when (and (not i/*disable-qp-logging*)
(not= <> query))
(log/debug (u/format-color 'cyan "\n\nMACRO/SUBSTITUTED: %s\n%s" (u/emoji "😻") (u/pprint-to-str <>)))))))
(defn expand-macros
"Look for `METRIC` and `SEGMENT` macros in an unexpanded MBQL query and substitute the macros for their contents."
[qp] (comp qp expand-macros*))
(ns metabase.query-processor.middleware.expand-resolve
"Middleware for converting a MBQL query into an 'expanded' form that contains additional information needed by drivers for running queries,
and resolving various referenced Fields and Tables."
(:require [toucan.db :as db]
[metabase.models.database :refer [Database]]
(metabase.query-processor [expand :as expand]
[resolve :as resolve]
[util :as qputil])))
(def ^{:arglists '([query])} expand-and-resolve
"Expand and resolve a QUERY.
(This function is *not* middleware; use `expand-resolve` for that purpose. This is provided for cases where we want to return the expanded/resolved
query in error messages)."
(comp resolve/resolve expand/expand))
(defn- expand-resolve*
[{database-id :database, :as query}]
(let [resolved-db (db/select-one [Database :name :id :engine :details], :id database-id)
query (if-not (qputil/mbql-query? query)
query
(expand-and-resolve query))]
(assoc query :database resolved-db)))
(defn expand-resolve
"Middleware that transforms an MBQL into an expanded form with more information and structure. Also resolves references to fields, tables,
etc, into their concrete details which are necessary for query formation by the executing driver."
[qp]
(comp qp expand-resolve*))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment