Skip to content
Snippets Groups Projects
Commit 2ffeb7dc authored by Cam Saül's avatar Cam Saül
Browse files

Backend implementation for cumulative count aggregations :cry:

parent 2dadec61
No related branches found
No related tags found
No related merge requests found
......@@ -6,10 +6,11 @@
[medley.core :as m]
schema.utils
[swiss.arrows :refer [<<-]]
[metabase.db :as db]
[metabase.driver :as driver]
[metabase.models.field :refer [Field]]
[metabase.models.query-execution :refer [QueryExecution]]
(metabase [config :as config]
[db :as db]
[driver :as driver])
(metabase.models [field :refer [Field]]
[query-execution :refer [QueryExecution]])
(metabase.query-processor [annotate :as annotate]
[expand :as expand]
[interface :refer :all]
......@@ -131,7 +132,7 @@
;; if necessary, handle macro substitution
(let [query (if-not (mbql-query? query)
query
;; for structured queries run our macro expansion
;; for MBQL queries run our macro expansion
(u/prog1 (macros/expand-macros query)
(when (and (not *disable-qp-logging*)
(not= <> query))
......@@ -147,7 +148,7 @@
;; if necessary, expand/resolve the query
(let [query (if-not (mbql-query? query)
query
;; for structured queries we expand first, then resolve
;; for MBQL queries we expand first, then resolve
(resolve/resolve (expand/expand query)))]
(qp query))))
......@@ -208,9 +209,8 @@
:parent_id nil
(k/order :position :asc)
(k/order :id :desc))]
(let [field (-> (resolve/rename-mb-field-keys field)
map->Field
(resolve/resolve-table {source-table-id source-table}))]
(let [field (resolve/resolve-table (map->Field (resolve/rename-mb-field-keys field))
{source-table-id source-table})]
(if (datetime-field? field)
(map->DateTimeField {:field field, :unit :default})
field))))
......@@ -244,14 +244,13 @@
(qp (cond-> query
(seq implicit-breakout-order-by-fields) (update-in [:query :order-by] concat (for [field implicit-breakout-order-by-fields]
{:field field, :direction :ascending})))))
;; for non-structured queries we do nothing
;; for non-MBQL queries we do nothing
(qp query))))
(defn- pre-cumulative-sum
"Rewrite queries containing a cumulative sum (`cum_sum`) aggregation to simply fetch the values of the aggregate field instead.
(Cumulative sum is a special case; it is implemented in post-processing).
Return a pair of [`cumulative-sum-field?` `query`]."
[{{{ag-type :aggregation-type, ag-field :field} :aggregation, breakout-fields :breakout} :query, :as query}]
(let [cum-sum? (= ag-type :cumulative-sum)
......@@ -267,9 +266,10 @@
(cond
;; If there's only one breakout field that is the same as the cum_sum field, re-write this as a "rows" aggregation
;; to just fetch all the values of the field in question.
cum-sum-with-same-breakout? [ag-field (update-in query [:query] #(-> %
(dissoc :breakout :aggregation)
(assoc :fields [ag-field])))]
cum-sum-with-same-breakout? [ag-field (update query :query (fn [query]
(-> query
(dissoc :breakout :aggregation)
(assoc :fields [ag-field]))))]
;; Otherwise if we're breaking out on different fields, rewrite the query as a "sum" aggregation
cum-sum-with-breakout? [ag-field (assoc-in query [:query :aggregation] {:aggregation-type :sum, :field ag-field})]
......@@ -277,7 +277,7 @@
;; Cumulative sum without any breakout fields should just be treated the same way as "sum". Rewrite query as such
cum-sum? [false (assoc-in query [:query :aggregation] {:aggregation-type :sum, :field ag-field})]
;; Otherwise if this isn't a cum_sum query return it as-is
;; Otherwise if this isn't a cumulative sum query return it as-is
:else [false query])))
......@@ -285,14 +285,14 @@
"Cumulative sum the values of the aggregate `Field` in RESULTS."
[cum-sum-field {rows :rows, cols :cols, :as results}]
(let [ ;; Determine the index of the field we need to cumulative sum
cum-sum-field-index (u/first-index-satisfying #(or (= (:name %) "sum")
(= (:id %) (:field-id cum-sum-field)))
cols)
_ (assert (integer? cum-sum-field-index))
cum-sum-field-index (u/prog1 (u/first-index-satisfying (fn [{:keys [name id]}]
(or (= name "sum")
(= id (:field-id cum-sum-field))))
cols)
(assert (integer? <>)))
;; Now make a sequence of cumulative sum values for each row
values (->> rows
(map #(nth % cum-sum-field-index))
(reductions +))
values (reductions + (for [row rows]
(nth row cum-sum-field-index)))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-sum-field-index value))
......@@ -306,7 +306,59 @@
(let [[cumulative-sum-field query] (pre-cumulative-sum query)]
(cond->> (qp query)
cumulative-sum-field (post-cumulative-sum cumulative-sum-field)))
;; for non-structured queries we do nothing
;; for non-MBQL queries we do nothing
(qp query))))
(defn- pre-cumulative-count
"Rewrite queries containing a cumulative count (`cum_count`) aggregation as `count` aggregation queries instead.
(Cumulative count is a special case; it is implemented in post-processing).
Returns a pair like `[is-cumulative-count-query? query]`."
[{{{ag-type :aggregation-type} :aggregation, breakout-fields :breakout} :query, :as query}]
(let [cum-count? (= ag-type :cumulative-count)
cum-count-with-breakout? (and cum-count?
(seq breakout-fields))]
;; Cumulative count is only applicable if it has breakout field(s)
;; Cumulative counting happens in post-processing
(cond
;; If we have breakout field(s), rewrite the query as a "count" aggregation
cum-count-with-breakout? [true (assoc-in query [:query :aggregation] {:aggregation-type :count})]
;; Cumulative count without any breakout fields should just be treated the same way as "count". Rewrite query as such
cum-count? [false (assoc-in query [:query :aggregation] {:aggregation-type :count})]
;; Otherwise if this isn't a cumulative count query return it as-is
:else [false query])))
(defn- post-cumulative-count
"Cumulative count the values of the aggregate `Field` in RESULTS."
[{rows :rows, cols :cols, :as results}]
(let [ ;; Determine the index of the count field; this is what we need to cumulative count
cum-count-field-index (u/prog1 (u/first-index-satisfying (comp (partial = "count") :name)
cols)
(assert (integer? <>)))
;; Now make a sequence of cumulative count values for each row
values (reductions + (for [row rows]
(nth row cum-count-field-index)))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-count-field-index value))
rows values)]
(assoc results :rows rows)))
(defn- cumulative-count [qp]
(fn [query]
(if (mbql-query? query)
(let [[is-cumulative-count? query] (pre-cumulative-count query)
results (qp query)]
(if is-cumulative-count?
(post-cumulative-count results)
results))
;; for non-MBQL queries we do nothing
(qp query))))
......@@ -329,9 +381,9 @@
(fn [query]
(let [results (qp query)]
(if-not (mbql-query? query)
;; non-structured queries are not affected
;; non-MBQL queries are not affected
results
;; for structured queries capture the results and annotate
;; for MBQL queries capture the results and annotate
(annotate/annotate query results)))))
......@@ -353,14 +405,18 @@
(qp query)))
(defn- wrap-guard-multiple-calls
"Throw an exception if a QP function accidentally calls (QP QUERY) more than once."
[qp]
(let [called? (atom false)]
(fn [query]
(assert (not @called?) "(QP QUERY) IS BEING CALLED MORE THAN ONCE!")
(reset! called? true)
(qp query))))
(def ^:private guard-multiple-calls
"Throw an exception if a QP function accidentally calls (QP QUERY) more than once (this is only done during testing)."
(if-not config/is-test?
;; if this isn't testing then we'll just return the `qp` function as-is
identity
(fn [qp]
(let [called? (atom false)]
(fn [query]
(assert (not @called?) "(QP QUERY) IS BEING CALLED MORE THAN ONCE!")
(reset! called? true)
(qp query))))))
;; TODO - maybe the QP middleware pattern should have a way to inject arbitary middleware? Then this function can be injected by the test code and not even be present in the normal MB code
;;; +-------------------------------------------------------------------------------------------------------+
......@@ -416,10 +472,11 @@
pre-add-implicit-fields
pre-add-implicit-breakout-order-by
cumulative-sum
cumulative-count
limit
post-annotate
pre-log-query
wrap-guard-multiple-calls
guard-multiple-calls
driver-process-query) (assoc query :driver driver))))))
......
......@@ -136,6 +136,11 @@
([] (i/strict-map->AggregationWithoutField {:aggregation-type :count}))
([f] (ag-with-field :count f)))
(s/defn ^:ql ^:always-validate cum-count :- i/Aggregation
"Aggregation clause. Return the cumulative row count (presumably broken out in some way)."
[]
(i/strict-map->AggregationWithoutField {:aggregation-type :cumulative-count}))
(defn ^:ql ^:deprecated rows
"Bare rows aggregation. This is the default behavior, so specifying it is deprecated."
[]
......
......@@ -247,7 +247,8 @@
;;; # ------------------------------------------------------------ CLAUSE SCHEMAS ------------------------------------------------------------
(s/defrecord AggregationWithoutField [aggregation-type :- (s/eq :count)])
(s/defrecord AggregationWithoutField [aggregation-type :- (s/named (s/enum :count :cumulative-count)
"Valid aggregation type")])
(s/defrecord AggregationWithField [aggregation-type :- (s/named (s/enum :avg :count :cumulative-sum :distinct :max :min :stddev :sum)
"Valid aggregation type")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment