Skip to content
Snippets Groups Projects
Commit 7b17d93c authored by Cam Saul's avatar Cam Saul
Browse files

basic cumulative sum aggregations

parent 7c57e1a5
Branches
Tags
No related merge requests found
......@@ -125,8 +125,11 @@
[query]
{:pre [(map? query)]}
(binding [qp/*query* query]
(i/process-query (database-id->driver (:database query))
(qp/preprocess query))))
(let [driver (database-id->driver (:database query))
query (qp/preprocess query)]
(->> query
(i/process-query driver)
(qp/post-process driver query)))))
;; ## Query Execution Stuff
......
......@@ -15,10 +15,7 @@
(declare apply-form
log-query
post-process
query-is-cumulative-sum?
apply-cumulative-sum)
log-query)
;; # INTERFACE
......@@ -46,7 +43,6 @@
(try
(->> (process query)
eval
(post-process query)
(annotate/annotate query))
(catch java.sql.SQLException e
{:status :failed
......@@ -95,9 +91,7 @@
"count" `(aggregate (~'count ~field) :count)
"distinct" `(aggregate (~'count (sqlfn :DISTINCT ~field)) :count)
"stddev" `(fields [(sqlfn :stddev ~field) :stddev])
"sum" `(aggregate (~'sum ~field) :sum)
"cum_sum" `[(fields ~field) ; just make sure this field is returned + included in GROUP BY
(group ~field)])))) ; cumulative sum happens in post-processing (see below)
"sum" `(aggregate (~'sum ~field) :sum))))) ; cumulative sum happens in post-processing (see below)
;; ### `:breakout`
;; ex.
......@@ -108,6 +102,12 @@
`[(group ~@field-names)
(fields ~@field-names)]))
;; ### :cum_sum
;; Nothing to do here since this is just used internally to mark the we're doing a cumulative sum aggregation,
;; and all functionality is handled by pre-processing / post-processing in metabase.driver.query-processor
(defmethod apply-form :cum_sum [_]
nil)
;; ### `:fields`
;; ex.
;;
......@@ -198,38 +198,6 @@
nil)
;; ## Post Processing
(defn- post-process
"Post-processing stage for query results."
[{query :query} results]
(cond
(query-is-cumulative-sum? query) (apply-cumulative-sum query results)
:else (do results)))
;; ### Cumulative sum
;; Cumulative sum is a special case. We can't do it in the DB because it's not a SQL function; thus we do it as a post-processing step.
(defn- query-is-cumulative-sum?
"Is this a cumulative sum query?"
[query]
(some->> (:aggregation query)
first
(= "cum_sum")))
(defn- apply-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
{:arglists '([query results])}
[{[_ field-id] :aggregation} results]
(let [field (field-id->kw field-id)
values (->> results ; make a sequence of cumulative sum values for each row
(map field)
(reductions +))]
(map (fn [row value] ; replace the value for each row with the cumulative sum value
(assoc row field value))
results values)))
;; ## Debugging Functions (Internal)
(defn- log-query
......
......@@ -127,8 +127,25 @@
"sum" {$sum (field-id->$string field-id)}}}
{$project {"_id" false, "sum" true}}))
(defaggregation ["cum_sum" field-id]
nil) ; TODO
(def db
(delay (sel :one Database :engine "mongo" :name "Mongo Test")))
(def users-table
(delay (sel :one Table :name "users" :db_id (:id @db))))
(def users-id-field
(delay (sel :one Field :name "_id" :table_id (:id @users-table))))
(defn x []
(driver/process-query
{:type :query
:database (:id @db)
:query {:limit nil,
:source_table (:id @users-table)
:filter [nil nil],
:breakout [nil],
:aggregation ["cum_sum" (:id @users-id-field)]}}))
(defmacro match-aggregation [aggregation]
`(match ~aggregation
......@@ -180,6 +197,12 @@
(defclause :breakout field-ids
nil)
;; #### cum_sum
;; Don't need to do anything here <3
(defclause :cum_sum _
nil)
;; ### fields
;; TODO - this still returns _id, even if we don't ask for it :/
(defclause :fields field-ids
`[(fields ~(mapv field-id->kw field-ids))])
......
(ns metabase.driver.query-processor
"Preprocessor that does simple transformations to all incoming queries, simplifing the driver-specific implementations."
(:require [metabase.db :refer :all]
(:require [clojure.core.match :refer [match]]
[metabase.db :refer :all]
[metabase.driver.interface :as i]
[metabase.models.field :refer [Field field->fk-table]]))
(declare add-implicit-breakout-order-by
get-special-column-info
preprocess-cumulative-sum
preprocess-structured
remove-empty-clauses)
......@@ -20,9 +23,14 @@
:native query))
(defn preprocess-structured [query]
(update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by)))
(let [pp (update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by
preprocess-cumulative-sum))]
(println (colorize.core/cyan "******************** PREPROCESSED: ********************\n"
(with-out-str (clojure.pprint/pprint pp)) "\n"
"*******************************************************\n"))
pp))
;; ## PREPROCESSOR FNS
......@@ -65,6 +73,63 @@
(assoc query :order_by)))))
;; ### PREPROCESS-CUMULATIVE-SUM
(defn preprocess-cumulative-sum
"`cum_sum` queries are a special case, since they're implemented in Clojure-land. Check to see if we're doing a `cum_sum` aggregation,
and if so, rewrite the query as needed, run it, and do post processing."
[{aggregation :aggregation, :as query}]
(match aggregation
["cum_sum" field-id] (assoc query
:cum_sum true
:aggregation ["rows"]
:fields [field-id])
_ query))
;; ## POSTPROCESSOR FNS
;; ### POST-PROCESS-CUMULATIVE-SUM
(defn post-process-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
{:arglists '([driver query results])}
[driver {cumulative-sum? :cum_sum, :as query} {data :data, :as results}]
;; (println (colorize.core/magenta "---------------------------------------- POSTPROCESSING: ----------------------------------------\n"
;; (with-out-str (clojure.pprint/pprint query)) "\n"
;; "=================================================================================================\n"
;; (with-out-str (clojure.pprint/pprint results)) "\n"
;; "-------------------------------------------------------------------------------------------------\n"))
(if-not cumulative-sum? results
(let [[field-id] (:fields query)
_ (assert (integer? field-id))
;; Determine the index of the cum_sum field by matching field-id in the result columns
field-index (->> (:cols data)
(map-indexed (fn [i column]
(when (= (:id column) field-id)
i)))
(filter identity)
first)
_ (assert (integer? field-index))
;; Make a sequence of cumulative sum values for each row
rows (:rows data)
values (->> rows
(map #(nth % field-index))
(reductions +))]
;; Replace the value in each row
(->> (map (fn [row value]
(assoc (vec row) field-index value))
rows values)
(assoc-in results [:data :rows])))))
(defn post-process [driver query results]
(case (keyword (:type query))
:native results
:query (let [query (:query query)]
(->> results
(post-process-cumulative-sum driver query)))))
;; # COMMON ANNOTATION FNS
(defn get-column-info
......
......@@ -122,25 +122,6 @@
:breakout [nil]
:limit nil}}))
;; # POST PROCESSING TESTS
;; ## CUMULATIVE SUM
;; ### Simple cumulative sum w/o any breakout
(expect {:status :completed
:row_count 15
:data {:rows [[1] [3] [6] [10] [15] [21] [28] [36] [45] [55] [66] [78] [91] [105] [120]]
:columns ["ID"]
:cols [{:extra_info {} :special_type :id, :base_type :BigIntegerField, :description nil, :name "ID", :table_id (table->id :users), :id (field->id :users :id)}]}}
(driver/process-query {:type :query
:database @db-id
:query {:limit nil
:source_table (table->id :users)
:filter [nil nil]
:breakout [nil]
:aggregation ["cum_sum" (field->id :users :id)]}}))
;; ### Cumulative sum w/ a breakout field
(expect {:status :completed
:row_count 15
......
......@@ -236,6 +236,32 @@
:table_id (->table-id :checkins)
:id (->field-id :checkins :user_id)}))
;; #### users
(defn users-col [col]
(case col
:id {:extra_info {}
:special_type :id
:base_type (id-field-type *driver-data*)
:description nil
:name (format-name *driver-data* "id")
:table_id (->table-id :users)
:id (->field-id :users :id)}
:name {:extra_info {}
:special_type nil
:base_type :TextField
:description nil
:name (format-name *driver-data* "name")
:table_id (->table-id :users)
:id (->field-id :users :name)}
:last_login {:extra_info {}
:special_type :category
:base_type :DateTimeField
:description nil
:name (format-name *driver-data* "last_login")
:table_id (->table-id :users)
:id (->field-id :users :last_login)}))
;; # THE TESTS THEMSELVES (!)
;; ### "COUNT" AGGREGATION
......@@ -475,3 +501,19 @@
:breakout [nil]
:limit 10
:order_by [[(->field-id :venues :id) "ascending"]]})
;; # POST PROCESSING TESTS
;; ## CUMULATIVE SUM
;; ### Simple cumulative sum w/o any breakout
(qp-expect-with-all-drivers
{:rows [[1] [3] [6] [10] [15] [21] [28] [36] [45] [55] [66] [78] [91] [105] [120]]
:columns (->columns "id")
:cols [(users-col :id)]}
{:limit nil
:source_table (->table-id :users)
:filter [nil nil]
:breakout [nil]
:aggregation ["cum_sum" (->field-id :users :id)]})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment