Skip to content
Snippets Groups Projects
Commit 316a1151 authored by Cam Saül's avatar Cam Saül
Browse files

Backend support for multiple aggregations :100: [ci drivers]

parent f5ab79fa
No related branches found
No related tags found
No related merge requests found
Showing
with 326 additions and 288 deletions
......@@ -31,6 +31,7 @@
(let-500 1)
(match 1)
(match-$ 1)
(merge-with 1)
(post-select 1)
(pre-cascade-delete 1)
(pre-insert 1)
......
......@@ -26,7 +26,8 @@
[setup :as setup]
[task :as task]
[util :as u])
[metabase.models.user :refer [User]]))
[metabase.models.user :refer [User]])
(:import org.eclipse.jetty.server.Server))
;;; CONFIG
......@@ -180,7 +181,7 @@
[]
(when @jetty-instance
(log/info "Shutting Down Embedded Jetty Webserver")
(.stop ^org.eclipse.jetty.server.Server @jetty-instance)
(.stop ^Server @jetty-instance)
(reset! jetty-instance nil)))
......@@ -195,7 +196,7 @@
(init!)
;; Ok, now block forever while Jetty does its thing
(when (config/config-bool :mb-jetty-join)
(.join ^org.eclipse.jetty.server.Server @jetty-instance))
(.join ^Server @jetty-instance))
(catch Throwable e
(.printStackTrace e)
(log/error "Metabase Initialization FAILED: " (.getMessage e))
......@@ -206,7 +207,7 @@
(defn ^:command migrate
"Run database migrations. Valid options for DIRECTION are `up`, `force`, `down-one`, `print`, or `release-locks`."
[direction]
(db/migrate! @db/db-connection-details (keyword direction)))
(db/migrate! (keyword direction)))
(defn ^:command load-from-h2
"Transfer data from existing H2 database to the newly created MySQL or Postgres DB specified by env vars."
......
......@@ -375,9 +375,11 @@
(log/error "Don't know how to alias: " this))]}
(cond
field (recur field) ; type/DateTime
index (name (let [{{{ag-type :aggregation-type} :aggregation} :query} sqlqp/*query*]
(if (= ag-type :distinct) :count
ag-type)))
index (name (let [{{aggregations :aggregation} :query} sqlqp/*query*
{ag-type :aggregation-type} (nth aggregations index)]
(if (= ag-type :distinct)
:count
ag-type)))
:else (str schema-name \. table-name \. field-name)))
;; We have to override the default SQL implementations of breakout and order-by because BigQuery propogates casting functions in SELECT
......
......@@ -46,10 +46,12 @@
(extend-protocol IRValue
nil (->rvalue [_] nil)
Object (->rvalue [this] this)
AgFieldRef (->rvalue [_] (let [ag-type (or (get-in *query* [:aggregation :aggregation-type])
(throw (Exception. "Unknown aggregation type!")))]
(if (= ag-type :distinct) :distinct___count
ag-type)))
AgFieldRef (->rvalue [{index :index}] (let [ag (nth (:aggregation *query*) index)
ag-type (or (:aggregation-type ag)
(throw (Exception. "Unknown aggregation type!")))]
(if (= ag-type :distinct)
:distinct___count
ag-type)))
Field (->rvalue [this] (:field-name this))
DateTimeField (->rvalue [this] (->rvalue (:field this)))
Value (->rvalue [this] (:value this))
......@@ -142,32 +144,40 @@
([field output-name] (ag:filtered (filter:not (filter:nil? field))
(ag:count output-name))))
(defn- handle-aggregation [query-type {{ag-type :aggregation-type, ag-field :field} :aggregation} druid-query]
(defn- handle-aggregation [query-type {ag-type :aggregation-type, ag-field :field} druid-query]
(when (isa? query-type ::ag-query)
(merge druid-query
(let [ag-type (when-not (= ag-type :rows) ag-type)]
(match [ag-type ag-field]
;; For 'distinct values' queries (queries with a breakout by no aggregation) just aggregate by count, but name it :___count so it gets discarded automatically
[nil nil] {:aggregations [(ag:count :___count)]}
[:count nil] {:aggregations [(ag:count :count)]}
[:count _] {:aggregations [(ag:count ag-field :count)]}
[:avg _] {:aggregations [(ag:count ag-field :___count)
(ag:doubleSum ag-field :___sum)]
:postAggregations [{:type :arithmetic
:name :avg
:fn :/
:fields [{:type :fieldAccess, :fieldName :___sum}
{:type :fieldAccess, :fieldName :___count}]}]}
[:distinct _] {:aggregations [{:type :cardinality
:name :distinct___count
:fieldNames [(->rvalue ag-field)]}]}
[:sum _] {:aggregations [(ag:doubleSum ag-field :sum)]}
[:min _] {:aggregations [(ag:doubleMin ag-field)]}
[:max _] {:aggregations [(ag:doubleMax ag-field)]})))))
(merge-with concat
druid-query
(let [ag-type (when-not (= ag-type :rows) ag-type)]
(match [ag-type ag-field]
;; For 'distinct values' queries (queries with a breakout by no aggregation) just aggregate by count, but name it :___count so it gets discarded automatically
[nil nil] {:aggregations [(ag:count :___count)]}
[:count nil] {:aggregations [(ag:count :count)]}
[:count _] {:aggregations [(ag:count ag-field :count)]}
[:avg _] {:aggregations [(ag:count ag-field :___count)
(ag:doubleSum ag-field :___sum)]
:postAggregations [{:type :arithmetic
:name :avg
:fn :/
:fields [{:type :fieldAccess, :fieldName :___sum}
{:type :fieldAccess, :fieldName :___count}]}]}
[:distinct _] {:aggregations [{:type :cardinality
:name :distinct___count
:fieldNames [(->rvalue ag-field)]}]}
[:sum _] {:aggregations [(ag:doubleSum ag-field :sum)]}
[:min _] {:aggregations [(ag:doubleMin ag-field)]}
[:max _] {:aggregations [(ag:doubleMax ag-field)]})))))
(defn- handle-aggregations [query-type {aggregations :aggregation} druid-query]
(loop [[ag & more] aggregations, query druid-query]
(let [query (handle-aggregation query-type ag query)]
(if-not (seq more)
query
(recur more query)))))
;;; ### handle-breakout
......@@ -427,7 +437,8 @@
(defmethod handle-order-by ::query [_ _ _]
(log/warn (u/format-color 'red "Sorting with Druid is only allowed in queries that have one or more breakout columns. Ignoring :order-by clause.")))
(defmethod handle-order-by ::topN [_ {{ag-type :aggregation-type} :aggregation, [breakout-field] :breakout, [{field :field, direction :direction}] :order-by} druid-query]
(defmethod handle-order-by ::topN [_ {[{ag-type :aggregation-type}] :aggregation, [breakout-field] :breakout, [{field :field, direction :direction}] :order-by} druid-query]
(let [field (->rvalue field)
breakout-field (->rvalue breakout-field)
sort-by-breakout? (= field breakout-field)
......@@ -443,7 +454,7 @@
{:dimension (->rvalue field)
:direction direction}))))
(defmethod handle-order-by ::grouped-timeseries [_ {{ag-type :aggregation-type} :aggregation, [breakout-field] :breakout, [{field :field, direction :direction}] :order-by} druid-query]
(defmethod handle-order-by ::grouped-timeseries [_ {[breakout-field] :breakout, [{field :field, direction :direction}] :order-by} druid-query]
(let [field (->rvalue field)
breakout-field (->rvalue breakout-field)
sort-by-breakout? (= field breakout-field)]
......@@ -515,7 +526,7 @@
(defn- druid-query-type
"What type of Druid query type should we perform?"
[{breakout-fields :breakout, {ag-type :aggregation-type} :aggregation, limit :limit}]
[{breakout-fields :breakout, [{ag-type :aggregation-type}] :aggregation, limit :limit}]
(let [breakouts (condp = (count breakout-fields)
0 :none
1 :one
......@@ -536,7 +547,7 @@
{:pre [(map? query)]}
(let [query-type (druid-query-type query)]
(loop [druid-query (query-type->default-query query-type), [f & more] [handle-source-table
handle-aggregation
handle-aggregations
handle-breakout
handle-filter
handle-order-by
......
......@@ -87,8 +87,8 @@
;; e.g. the ["aggregation" 0] fields we allow in order-by
AgFieldRef
(formatted [_]
(let [{:keys [aggregation-type]} (:aggregation (:query *query*))]
(formatted [{index :index}]
(let [{:keys [aggregation-type]} (nth (:aggregation (:query *query*)) index)]
;; For some arcane reason we name the results of a distinct aggregation "count",
;; everything else is named the same as the aggregation
(if (= aggregation-type :distinct)
......@@ -114,14 +114,19 @@
;;; ## Clause Handlers
(defn apply-aggregation
"Apply an `aggregation` clause to HONEYSQL-FORM. Default implementation of `apply-aggregation` for SQL drivers."
([driver honeysql-form {{:keys [aggregation-type field]} :aggregation}]
(apply-aggregation driver honeysql-form aggregation-type (formatted field)))
"Apply a `aggregation` clauses to HONEYSQL-FORM. Default implementation of `apply-aggregation` for SQL drivers."
([driver honeysql-form {aggregations :aggregation}]
(loop [form honeysql-form, [{:keys [aggregation-type field]} & more] aggregations]
(let [form (apply-aggregation driver form aggregation-type (formatted field))]
(if-not (seq more)
form
(recur form more)))))
([driver honeysql-form aggregation-type field]
(h/merge-select honeysql-form [(if-not field
;; aggregation clauses w/o a field
(do (assert (= aggregation-type :count))
(do (assert (= aggregation-type :count)
(format "Aggregations of type '%s' must specify a field." aggregation-type))
:%count.*)
;; aggregation clauses w/ a Field
(hsql/call (case aggregation-type
......@@ -132,7 +137,8 @@
:sum :sum
:min :min
:max :max)
field))
field))
;; the column alias is always the same as the ag type except for `:distinct` with is called `:count` (WHY?)
(if (= aggregation-type :distinct)
:count
aggregation-type)])))
......
......@@ -91,8 +91,8 @@
(str \$ (field->name this ".")))
AgFieldRef
(->lvalue [_]
(let [{:keys [aggregation-type]} (:aggregation (:query *query*))]
(->lvalue [{:keys [index]}]
(let [{:keys [aggregation-type]} (nth (:aggregation (:query *query*)) index)]
(ag-type->field-name aggregation-type)))
DateTimeField
......@@ -245,6 +245,7 @@
;;; ### aggregation
(defn- aggregation->rvalue [{:keys [aggregation-type field]}]
{:pre [(keyword? aggregation-type)]}
(if-not field
(case aggregation-type
:count {$sum 1})
......@@ -258,35 +259,34 @@
:min {$min (->rvalue field)}
:max {$max (->rvalue field)})))
(defn- handle-breakout+aggregation [{breakout-fields :breakout, {ag-type :aggregation-type, ag-field :field, :as aggregation} :aggregation} pipeline]
(let [aggregation? ag-type
breakout? (seq breakout-fields)]
(when (or aggregation? breakout?)
(let [ag-field-name (ag-type->field-name ag-type)]
(filter identity
[ ;; create a totally sweet made-up column called __group to store the fields we'd like to group by
(when breakout?
{$project (merge
{"_id" "$_id"
"___group" (into {} (for [field breakout-fields]
{(->lvalue field) (->rvalue field)}))}
(when ag-field
{(->lvalue ag-field) (->rvalue ag-field)}))})
;; Now project onto the __group and the aggregation rvalue
{$group (merge {"_id" (when breakout?
"$___group")}
(when aggregation
{ag-field-name (aggregation->rvalue aggregation)}))}
;; Sort by _id (___group)
{$sort {"_id" 1}}
;; now project back to the fields we expect
{$project (merge {"_id" false}
(when aggregation?
{ag-field-name (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)})
(into {} (for [field breakout-fields]
{(->lvalue field) (format "$_id.%s" (->lvalue field))})))}])))))
(defn- handle-breakout+aggregation [{breakout-fields :breakout, aggregations :aggregation} pipeline]
(let [aggregations? (seq aggregations)
breakout? (seq breakout-fields)]
(when (or aggregations? breakout?)
(filter identity
[ ;; create a totally sweet made-up column called __group to store the fields we'd like to group by
(when breakout?
{$project (merge {"_id" "$_id"
"___group" (into {} (for [field breakout-fields]
{(->lvalue field) (->rvalue field)}))}
(into {} (for [{ag-field :field} aggregations
:when ag-field]
{(->lvalue ag-field) (->rvalue ag-field)})))})
;; Now project onto the __group and the aggregation rvalue
{$group (merge {"_id" (when breakout?
"$___group")}
(into {} (for [{ag-type :aggregation-type, :as aggregation} aggregations]
{(ag-type->field-name ag-type) (aggregation->rvalue aggregation)})))}
;; Sort by _id (___group)
{$sort {"_id" 1}}
;; now project back to the fields we expect
{$project (merge {"_id" false}
(into {} (for [{ag-type :aggregation-type} aggregations]
{(ag-type->field-name ag-type) (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)}))
(into {} (for [field breakout-fields]
{(->lvalue field) (format "$_id.%s" (->lvalue field))})))}]))))
;;; ### order-by
......
......@@ -67,11 +67,11 @@
(defn- query-without-aggregations-or-limits?
"Is the given query an MBQL query without a `:limit`, `:aggregation`, or `:page` clause?"
[{{{ag-type :aggregation-type} :aggregation, :keys [limit page]} :query}]
[{{aggregations :aggregation, :keys [limit page]} :query}]
(and (not limit)
(not page)
(or (not ag-type)
(= ag-type :rows))))
(or (empty? aggregations)
(= (:aggregation-type (first aggregations)) :rows))))
(defn- fail [query, ^Throwable e, & [additional-info]]
(merge {:status :failed
......@@ -219,9 +219,9 @@
(update results :rows (partial format-rows settings))))))
(defn- should-add-implicit-fields? [{{:keys [fields breakout], {ag-type :aggregation-type} :aggregation} :query, :as query}]
(defn- should-add-implicit-fields? [{{:keys [fields breakout], aggregations :aggregation} :query, :as query}]
(and (mbql-query? query)
(not (or ag-type
(not (or (seq aggregations)
(seq breakout)
(seq fields)))))
......@@ -277,119 +277,59 @@
(defn- pre-add-implicit-breakout-order-by [qp] (comp qp add-implicit-breakout-order-by))
(defn- pre-cumulative-sum
"Rewrite queries containing a cumulative sum (`cum_sum`) aggregation to simply fetch the values of the aggregate field instead.
(Cumulative sum is a special case; it is implemented in post-processing).
Return a pair of [`cumulative-sum-field?` `query`]."
[{{{ag-type :aggregation-type, ag-field :field} :aggregation, breakout-fields :breakout} :query, :as query}]
(let [cum-sum? (= ag-type :cumulative-sum)
cum-sum-with-breakout? (and cum-sum?
(seq breakout-fields))
cum-sum-with-same-breakout? (and cum-sum-with-breakout?
(= (count breakout-fields) 1)
(= (first breakout-fields) ag-field))]
;; Cumulative sum is only applicable if it has breakout fields
;; For these, store the cumulative sum field under the key :cumulative-sum so we know which one to sum later
;; Cumulative summing happens in post-processing
(cond
;; If there's only one breakout field that is the same as the cum_sum field, re-write this as a "rows" aggregation
;; to just fetch all the values of the field in question.
cum-sum-with-same-breakout? [ag-field (update query :query (fn [query]
(-> query
(dissoc :breakout :aggregation)
(assoc :fields [ag-field]))))]
;; Otherwise if we're breaking out on different fields, rewrite the query as a "sum" aggregation
cum-sum-with-breakout? [ag-field (assoc-in query [:query :aggregation] {:aggregation-type :sum, :field ag-field})]
;; Cumulative sum without any breakout fields should just be treated the same way as "sum". Rewrite query as such
cum-sum? [false (assoc-in query [:query :aggregation] {:aggregation-type :sum, :field ag-field})]
;; Otherwise if this isn't a cumulative sum query return it as-is
:else [false query])))
(defn- post-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
[cum-sum-field {rows :rows, cols :cols, :as results}]
;;; ------------------------------------------------------------ CUMULATIVE-SUM & CUMULATIVE-COUNT ------------------------------------------------------------
(defn- cumulative-aggregation-clause
"Does QUERY have any aggregations of AGGREGATION-TYPE?"
[aggregation-type {{aggregations :aggregation} :query, :as query}]
(when (mbql-query? query)
(some (fn [{ag-type :aggregation-type, :as ag}]
(when (= ag-type aggregation-type)
ag))
aggregations)))
(defn- pre-cumulative-aggregation
"Rewrite queries containing a cumulative aggregation (e.g. `:cumulative-count`) as a different 'basic' aggregation (e.g. `:count`).
This lets various drivers handle the aggregation normallly; we implement actual behavior here in post-processing."
[cumlative-ag-type basic-ag-type ag-field {{aggregations :aggregation, breakout-fields :breakout} :query, :as query}]
(update-in query [:query :aggregation] (fn [aggregations]
(for [{ag-type :aggregation-type, :as ag} aggregations]
(if-not (= ag-type cumlative-ag-type)
ag
{:aggregation-type basic-ag-type, :field ag-field})))))
(defn- post-cumulative-aggregation [basic-ag-type ag-field {rows :rows, cols :cols, :as results}]
(let [ ;; Determine the index of the field we need to cumulative sum
cum-sum-field-index (u/prog1 (u/first-index-satisfying (fn [{:keys [name id]}]
(or (= name "sum")
(= id (:field-id cum-sum-field))))
cols)
(assert (integer? <>)))
field-index (u/prog1 (u/first-index-satisfying (comp (partial = (name basic-ag-type)) :name)
cols)
(assert (integer? <>)))
;; Now make a sequence of cumulative sum values for each row
values (reductions + (for [row rows]
(nth row cum-sum-field-index)))
values (reductions + (for [row rows]
(nth row field-index)))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-sum-field-index value))
rows values)]
rows (map (fn [row value]
(assoc (vec row) field-index value))
rows values)]
(assoc results :rows rows)))
(defn- cumulative-aggregation [cumulative-ag-type basic-ag-type qp]
(let [cumulative-ag-clause (partial cumulative-aggregation-clause cumulative-ag-type)
pre-cumulative-ag (partial pre-cumulative-aggregation cumulative-ag-type basic-ag-type)
post-cumulative-ag (partial post-cumulative-aggregation basic-ag-type)]
(fn [query]
(if-let [{ag-field :field} (cumulative-ag-clause query)]
(post-cumulative-ag ag-field (qp (pre-cumulative-ag ag-field query)))
(qp query)))))
(defn- cumulative-sum [qp]
(fn [query]
(if (mbql-query? query)
(let [[cumulative-sum-field query] (pre-cumulative-sum query)]
(cond->> (qp query)
cumulative-sum-field (post-cumulative-sum cumulative-sum-field)))
;; for non-MBQL queries we do nothing
(qp query))))
(defn- pre-cumulative-count
"Rewrite queries containing a cumulative count (`cum_count`) aggregation as `count` aggregation queries instead.
(Cumulative count is a special case; it is implemented in post-processing).
Returns a pair like `[is-cumulative-count-query? query]`."
[{{{ag-type :aggregation-type} :aggregation, breakout-fields :breakout} :query, :as query}]
(let [cum-count? (= ag-type :cumulative-count)
cum-count-with-breakout? (and cum-count?
(seq breakout-fields))]
;; Cumulative count is only applicable if it has breakout field(s)
;; Cumulative counting happens in post-processing
(cond
;; If we have breakout field(s), rewrite the query as a "count" aggregation
cum-count-with-breakout? [true (assoc-in query [:query :aggregation] {:aggregation-type :count})]
;; Cumulative count without any breakout fields should just be treated the same way as "count". Rewrite query as such
cum-count? [false (assoc-in query [:query :aggregation] {:aggregation-type :count})]
;; Otherwise if this isn't a cumulative count query return it as-is
:else [false query])))
(defn- post-cumulative-count
"Cumulative count the values of the aggregate `Field` in RESULTS."
[{rows :rows, cols :cols, :as results}]
(let [ ;; Determine the index of the count field; this is what we need to cumulative count
cum-count-field-index (u/prog1 (u/first-index-satisfying (comp (partial = "count") :name)
cols)
(assert (integer? <>)))
;; Now make a sequence of cumulative count values for each row
values (reductions + (for [row rows]
(nth row cum-count-field-index)))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-count-field-index value))
rows values)]
(assoc results :rows rows)))
(def ^:private ^{:arglists '([qp])} cumulative-sum
(partial cumulative-aggregation :cumulative-sum :sum))
(def ^:private ^{:arglists '([qp])} cumulative-count
(partial cumulative-aggregation :cumulative-count :count))
(defn- cumulative-count [qp]
(fn [query]
(if (mbql-query? query)
(let [[is-cumulative-count? query] (pre-cumulative-count query)
results (qp query)]
(if is-cumulative-count?
(post-cumulative-count results)
results))
;; for non-MBQL queries we do nothing
(qp query))))
;;; ------------------------------------------------------------ LIMIT, &C. ------------------------------------------------------------
(defn- limit
"Add an implicit `limit` clause to MBQL queries without any aggregations, and limit the maximum number of rows that can be returned in post-processing."
......
......@@ -95,26 +95,27 @@
(defn- add-aggregate-field-if-needed
"Add a Field containing information about an aggregate column such as `:count` or `:distinct` if needed."
[{{ag-type :aggregation-type, ag-field :field} :aggregation} fields]
(if (or (not ag-type)
(= ag-type :rows))
[{aggregations :aggregation} fields]
(if (or (empty? aggregations)
(= (:aggregation-type (first aggregations)) :rows))
fields
(conj fields (merge (let [field-name (ag-type->field-name ag-type)]
{:source :aggregation
:field-name field-name
:field-display-name field-name
:base-type (:base-type ag-field)
:special-type (:special-type ag-field)})
;; Always treat count or distinct count as an integer even if the DB in question returns it as something wacky like a BigDecimal or Float
(when (contains? #{:count :distinct} ag-type)
{:base-type :type/Integer
:special-type :type/Number})
;; For the time being every Expression is an arithmetic operator and returns a floating-point number, so hardcoding these types is fine;
;; In the future when we extend Expressions to handle more functionality we'll want to introduce logic that associates a return type with a given expression.
;; But this will work for the purposes of a patch release.
(when (instance? ExpressionRef ag-field)
{:base-type :type/Float
:special-type :type/Number})))))
(concat fields (for [{ag-type :aggregation-type, ag-field :field} aggregations]
(merge (let [field-name (ag-type->field-name ag-type)]
{:source :aggregation
:field-name field-name
:field-display-name field-name
:base-type (:base-type ag-field)
:special-type (:special-type ag-field)})
;; Always treat count or distinct count as an integer even if the DB in question returns it as something wacky like a BigDecimal or Float
(when (contains? #{:count :distinct} ag-type)
{:base-type :type/Integer
:special-type :type/Number})
;; For the time being every Expression is an arithmetic operator and returns a floating-point number, so hardcoding these types is fine;
;; In the future when we extend Expressions to handle more functionality we'll want to introduce logic that associates a return type with a given expression.
;; But this will work for the purposes of a patch release.
(when (instance? ExpressionRef ag-field)
{:base-type :type/Float
:special-type :type/Number}))))))
(defn- add-unknown-fields-if-needed
"When create info maps for any fields we didn't expect to come back from the query.
......
......@@ -156,15 +156,20 @@
(log/warn "The syntax for aggregate fields has changed in MBQL '98. Instead of `[:aggregation 0]`, please use `[:aggregate-field 0]` instead.")
(aggregate-field index))
;; Handle :aggregation top-level clauses
([query ag :- (s/maybe (s/pred map?))]
(if-not ag
query
(let [ag ((if (:field ag)
i/map->AggregationWithField
i/map->AggregationWithoutField) (update ag :aggregation-type normalize-token))]
(s/validate i/Aggregation ag)
(assoc query :aggregation ag)))))
;; Handle :aggregation top-level clauses. This is either a single map (single aggregation) or a vector of maps (multiple aggregations)
([query ag-or-ags :- (s/maybe (s/cond-pre su/Map [su/Map]))]
(cond
(map? ag-or-ags) (recur query [ag-or-ags])
(empty? ag-or-ags) query
:else (assoc query :aggregation (vec (for [ag ag-or-ags]
(u/prog1 ((if (:field ag)
i/map->AggregationWithField
i/map->AggregationWithoutField) (update ag :aggregation-type normalize-token))
(s/validate i/Aggregation <>)))))))
;; also handle varargs for convenience
([query ag & more]
(aggregation query (cons ag more))))
;;; ## breakout & fields
......
......@@ -159,9 +159,7 @@
"foreign-keys is not supported by this driver."))
datetime-unit :- (s/maybe (apply s/enum datetime-field-units))])
(s/defrecord AgFieldRef [index :- (s/constrained s/Int
zero?
"Ag field index should be 0 -- MBQL currently only supports a single aggregation")])
(s/defrecord AgFieldRef [index :- s/Int])
;; TODO - add a method to get matching expression from the query?
......@@ -248,14 +246,16 @@
"Valid aggregation type")
field :- FieldPlaceholderOrExpressionRef])
(defn- valid-aggregation-for-driver? [{:keys [aggregation-type]}]
(when (= aggregation-type :stddev)
(assert-driver-supports :standard-deviation-aggregations))
true)
(def Aggregation
"Schema for a top-level `aggregation` clause in an MBQL query."
"Schema for an `aggregation` subclause in an MBQL query."
(s/constrained
(s/cond-pre AggregationWithField AggregationWithoutField)
(fn [{:keys [aggregation-type]}]
(when (= aggregation-type :stddev)
(assert-driver-supports :standard-deviation-aggregations))
true)
valid-aggregation-for-driver?
"standard-deviation-aggregations is not supported by this driver."))
......@@ -313,7 +313,7 @@
(def Query
"Schema for an MBQL query."
{(s/optional-key :aggregation) Aggregation
{(s/optional-key :aggregation) [Aggregation]
(s/optional-key :breakout) [FieldPlaceholderOrExpressionRef]
(s/optional-key :fields) [AnyField]
(s/optional-key :filter) Filter
......
(ns metabase.query-processor.macros
(:require [clojure.core.match :refer [match]]
[metabase.db :as db]))
[metabase.db :as db]
[metabase.util :as u]))
(defn- non-empty-clause? [clause]
(and clause
......@@ -39,20 +40,50 @@
(seq addtl) addtl
:else []))
(defn- macroexpand-metric [query-dict]
(if-not (non-empty-clause? (get-in query-dict [:query :aggregation]))
;; aggregation is empty, so no METRIC to expand
(defn- merge-aggregation [aggregations new-ag]
(if (map? aggregations)
(recur [aggregations] new-ag)
(conj aggregations new-ag)))
(defn- merge-aggregations {:style/indent 0} [query-dict [aggregation & more]]
(if-not aggregation
;; no more aggregations? we're done
query-dict
;; otherwise determine if this aggregation is a METRIC and recur
(let [metric-def (match aggregation
["METRIC" (metric-id :guard integer?)] (db/select-one-field :definition 'Metric, :id metric-id)
_ nil)]
(recur (if-not metric-def
;; not a metric, move to next aggregation
query-dict
;; it *is* a metric, insert it into the query appropriately
(-> query-dict
(update-in [:query :aggregation] merge-aggregation (:aggregation metric-def))
(update-in [:query :filter] merge-filter-clauses (:filter metric-def))))
more))))
(defn- remove-metrics [aggregations]
(if-not (and (sequential? aggregations)
(every? coll? aggregations))
(recur [aggregations])
(vec (for [ag aggregations
:when (match ag
["METRIC" (_ :guard integer?)] false
_ true)]
ag))))
(defn- macroexpand-metric [{{aggregations :aggregation} :query, :as query-dict}]
(if-not (seq aggregations)
;; :aggregation is empty, so no METRIC to expand
query-dict
;; we have an aggregation clause, so lets see if we are using a METRIC
(if-let [metric-def (match (get-in query-dict [:query :aggregation])
["METRIC" (metric-id :guard integer?)] (db/select-one-field :definition 'Metric, :id metric-id)
_ nil)]
;; we have a metric, so merge its definition into the existing query-dict
(-> query-dict
(assoc-in [:query :aggregation] (:aggregation metric-def))
(assoc-in [:query :filter] (merge-filter-clauses (get-in query-dict [:query :filter]) (:filter metric-def))))
;; no metric, just use the original query-dict
query-dict)))
;; (since `:aggregation` can be either single or multiple, wrap single ones so `merge-aggregations` can always assume input is multiple)
(merge-aggregations
(update-in query-dict [:query :aggregation] remove-metrics)
(if (and (sequential? aggregations)
(every? coll? aggregations))
aggregations
[aggregations]))))
(defn expand-macros "Expand the macros (SEGMENT, METRIC) in a QUERY-DICT."
[query-dict]
......
......@@ -444,6 +444,7 @@
"Return the index of the first item in COLL where `(pred item)` is logically `true`.
(first-index-satisfying keyword? ['a 'b :c 3 \"e\"]) -> 2"
{:style/indent 1}
[pred coll]
(loop [i 0, [item & more] coll]
(cond
......
......@@ -64,7 +64,7 @@
(query checkins
(ql/aggregation (ql/count))))
(assoc :type "query")
(assoc-in [:query :aggregation] {:aggregation-type "count"})
(assoc-in [:query :aggregation] [{:aggregation-type "count"}])
(assoc :constraints query-constraints))
:started_at true
:finished_at true
......@@ -80,7 +80,7 @@
(query checkins
(ql/aggregation (ql/count))))
(assoc :type "query")
(assoc-in [:query :aggregation] {:aggregation-type "count"})
(assoc-in [:query :aggregation] [{:aggregation-type "count"}])
(assoc :constraints query-constraints))
:started_at true
:finished_at true
......
......@@ -221,10 +221,10 @@
{:database (id)
:type :query
:query {:source-table (id :checkins)
:aggregation {:aggregation-type :sum
:field {:field-id (id :venues :price)
:fk-field-id (id :checkins :venue_id)
:datetime-unit nil}}
:aggregation [{:aggregation-type :sum
:field {:field-id (id :venues :price)
:fk-field-id (id :checkins :venue_id)
:datetime-unit nil}}]
:breakout [{:field-id (id :checkins :date)
:fk-field-id nil
:datetime-unit :day-of-week}]}}
......@@ -234,21 +234,21 @@
:query {:source-table {:schema "PUBLIC"
:name "CHECKINS"
:id (id :checkins)}
:aggregation {:aggregation-type :sum
:field {:description nil
:base-type :type/Integer
:parent nil
:table-id (id :venues)
:special-type :type/Category
:field-name "PRICE"
:field-display-name "Price"
:parent-id nil
:visibility-type :normal
:position nil
:field-id (id :venues :price)
:fk-field-id (id :checkins :venue_id)
:table-name "VENUES__via__VENUE_ID"
:schema-name nil}}
:aggregation [{:aggregation-type :sum
:field {:description nil
:base-type :type/Integer
:parent nil
:table-id (id :venues)
:special-type :type/Category
:field-name "PRICE"
:field-display-name "Price"
:parent-id nil
:visibility-type :normal
:position nil
:field-id (id :venues :price)
:fk-field-id (id :checkins :venue_id)
:table-name "VENUES__via__VENUE_ID"
:schema-name nil}}]
:breakout [{:field {:description nil
:base-type :type/Date
:parent nil
......
......@@ -14,7 +14,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["rows"]
:query {:aggregation [["rows"]]
:filter ["AND" [">" 4 1]]
:breakout [17]}}
(expand-macros {:database 1
......@@ -27,7 +27,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["rows"]
:query {:aggregation [["rows"]]
:filter ["AND" ["AND" ["=" 5 "abc"]] ["OR" ["AND" ["IS_NULL" 7]] [">" 4 1]]]
:breakout [17]}}
(tu/with-temp* [Database [{database-id :id}]
......@@ -46,7 +46,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["count"]
:query {:aggregation [["count"]]
:filter ["AND" ["AND" [">" 4 1]] ["AND" ["=" 5 "abc"]]]
:breakout [17]
:order_by [[1 "ASC"]]}}
......@@ -66,7 +66,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["count"]
:query {:aggregation [["count"]]
:filter ["AND" ["=" 5 "abc"]]
:breakout [17]
:order_by [[1 "ASC"]]}}
......@@ -86,7 +86,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["count"]
:query {:aggregation [["count"]]
:filter ["AND" ["=" 5 "abc"]]
:breakout [17]
:order_by [[1 "ASC"]]}}
......@@ -105,7 +105,7 @@
(expect
{:database 1
:type :query
:query {:aggregation ["sum" 18]
:query {:aggregation [["sum" 18]]
:filter ["AND" ["AND" [">" 4 1] ["AND" ["IS_NULL" 7]]] ["AND" ["=" 5 "abc"] ["AND" ["BETWEEN" 9 0 25]]]]
:breakout [17]
:order_by [[1 "ASC"]]}}
......
......@@ -315,11 +315,11 @@
(tu/resolve-private-vars metabase.query-processor query-without-aggregations-or-limits?)
;; query-without-aggregations-or-limits?
(expect false (query-without-aggregations-or-limits? {:query {:aggregation {:aggregation-type :count}}}))
(expect true (query-without-aggregations-or-limits? {:query {:aggregation {:aggregation-type :rows}}}))
(expect false (query-without-aggregations-or-limits? {:query {:aggregation {:aggregation-type :count}
(expect false (query-without-aggregations-or-limits? {:query {:aggregation [{:aggregation-type :count}]}}))
(expect true (query-without-aggregations-or-limits? {:query {:aggregation [{:aggregation-type :rows}]}}))
(expect false (query-without-aggregations-or-limits? {:query {:aggregation [{:aggregation-type :count}]
:limit 10}}))
(expect false (query-without-aggregations-or-limits? {:query {:aggregation {:aggregation-type :count}
(expect false (query-without-aggregations-or-limits? {:query {:aggregation [{:aggregation-type :count}]
:page 1}}))
......@@ -657,7 +657,7 @@
;;; "BREAKOUT" - MULTIPLE COLUMNS W/ EXPLICIT "ORDER_BY"
;; `breakout` should not implicitly order by any fields specified in `order_by`
(qp-expect-with-all-engines
{:rows [[15 2 1] [15 3 1] [15 7 1] [15 14 1] [15 16 1] [15 18 1] [15 22 1] [15 23 2] [15 24 1] [15 27 1]],
{:rows [[15 2 1] [15 3 1] [15 7 1] [15 14 1] [15 16 1] [15 18 1] [15 22 1] [15 23 2] [15 24 1] [15 27 1]]
:columns [(format-name "user_id")
(format-name "venue_id")
"count"]
......@@ -685,28 +685,28 @@
;; Apply an arbitrary max-results on the query and ensure our results size is appropriately constrained
(expect 1234
(->> (((resolve 'metabase.query-processor/limit) identity) {:constraints {:max-results 1234}
:query {:aggregation {:aggregation-type :count}}
:query {:aggregation [{:aggregation-type :count}]}
:rows (repeat [:ok])})
:rows
count))
;; Apply a max-results-bare-rows limit specifically on :rows type query
(expect [46 46]
(let [res (((resolve 'metabase.query-processor/limit) identity) {:constraints {:max-results 46}
:query {:aggregation {:aggregation-type :rows}}
:rows (repeat [:ok])})]
[(->> res :rows count)
(->> res :query :limit)]))
(let [res (((resolve 'metabase.query-processor/limit) identity) {:constraints {:max-results 46}
:query {:aggregation [{:aggregation-type :rows}]}
:rows (repeat [:ok])})]
[(->> res :rows count)
(->> res :query :limit)]))
;;; ------------------------------------------------------------ CUMULATIVE SUM ------------------------------------------------------------
;;; cum_sum w/o breakout should be treated the same as sum
(qp-expect-with-all-engines
{:rows [[120]]
:columns ["sum"]
:cols [(aggregate-col :sum (users-col :id))]
:native_form true}
{:rows [[120]]
:columns ["sum"]
:cols [(aggregate-col :sum (users-col :id))]
:native_form true}
(->> (run-query users
(ql/aggregation (ql/cum-sum $id)))
booleanize-native-form
......@@ -715,15 +715,31 @@
;;; Simple cumulative sum where breakout field is same as cum_sum field
(qp-expect-with-all-engines
{:rows [[1] [3] [6] [10] [15] [21] [28] [36] [45] [55] [66] [78] [91] [105] [120]]
:columns (->columns "id")
:cols [(users-col :id)]
:native_form true}
(->> (run-query users
(ql/aggregation (ql/cum-sum $id))
(ql/breakout $id))
booleanize-native-form
(format-rows-by [int])))
{:rows [[ 1 1]
[ 2 3]
[ 3 6]
[ 4 10]
[ 5 15]
[ 6 21]
[ 7 28]
[ 8 36]
[ 9 45]
[10 55]
[11 66]
[12 78]
[13 91]
[14 105]
[15 120]]
:columns [(format-name "id")
"sum"]
:cols [(breakout-col (users-col :id))
(aggregate-col :sum (users-col :id))]
:native_form true}
(->> (run-query users
(ql/aggregation (ql/cum-sum $id))
(ql/breakout $id))
booleanize-native-form
(format-rows-by [int int])))
;;; Cumulative sum w/ a different breakout field
......@@ -2024,6 +2040,10 @@
(ql/breakout (ql/expression :x))))))
;;; +----------------------------------------------------------------------------------------------------------------------+
;;; | MULTIPLE JOINS |
;;; +----------------------------------------------------------------------------------------------------------------------+
;;; CAN WE JOIN AGAINST THE SAME TABLE TWICE (MULTIPLE FKS TO A SINGLE TABLE!?)
;; Query should look something like:
;; SELECT USERS__via__SENDER_ID.NAME AS NAME, count(*) AS count
......@@ -2047,3 +2067,22 @@
(ql/aggregation (ql/count))
(ql/breakout $sender_id->users.name)
(ql/filter (ql/= $reciever_id->users.name "Rasta Toucan")))))))
;;; +----------------------------------------------------------------------------------------------------------------------+
;;; | MULTIPLE AGGREGATIONS |
;;; +----------------------------------------------------------------------------------------------------------------------+
;; can we run a simple query with *two* aggregations?
(expect-with-non-timeseries-dbs
[[100 203]]
(format-rows-by [int int]
(rows (run-query venues
(ql/aggregation (ql/count) (ql/sum $price))))))
;; how about with *three* aggregations?
(expect-with-non-timeseries-dbs
[[2 100 203]]
(format-rows-by [int int int]
(rows (run-query venues
(ql/aggregation (ql/avg $price) (ql/count) (ql/sum $price))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment