From 376a435175efeeb6b9e2ee36713dc3b046b1bbb3 Mon Sep 17 00:00:00 2001
From: Cam Saul <cam@getluckybird.com>
Date: Thu, 19 Mar 2015 21:24:41 -0700
Subject: [PATCH] implement cumulative sum in QP

---
 .../driver/generic_sql/query_processor.clj    | 68 ++++++++++++++++---
 src/metabase/driver/generic_sql/sync.clj      | 10 +--
 src/metabase/models/field.clj                 |  4 ++
 .../generic_sql/query_processor_test.clj      | 64 ++++++++++++++---
 4 files changed, 125 insertions(+), 21 deletions(-)

diff --git a/src/metabase/driver/generic_sql/query_processor.clj b/src/metabase/driver/generic_sql/query_processor.clj
index feee81aadea..f0e267cb9bb 100644
--- a/src/metabase/driver/generic_sql/query_processor.clj
+++ b/src/metabase/driver/generic_sql/query_processor.clj
@@ -14,13 +14,16 @@
 
 
 (declare apply-form
-         log-query)
+         log-query
+         post-process
+         query-is-cumulative-sum?
+         apply-cumulative-sum)
 
 (def ^{:dynamic true, :private true} *query*
   "Query dictionary that we're currently processing"
   nil)
 
-;; ## Public Functions
+;; # INTERFACE
 
 (defn process
   "Convert QUERY into a korma `select` form."
@@ -45,8 +48,9 @@
          (map? (:query query))
          (= (name (:type query)) "query")]}
   (->> (process query)
-    eval
-    (annotate/annotate query)))
+       eval
+       (post-process query)
+       (annotate/annotate query)))
 
 
 (defn process-and-run
@@ -54,10 +58,12 @@
   [{:keys [type] :as query}]
   ;; we know how to handle :native and :query (structured) type queries
   (case (keyword type)
-    :native (native/process-and-run query)
-    :query (process-structured query)))
+      :native (native/process-and-run query)
+      :query  (process-structured query)))
 
 
+;; # IMPLEMENTATION
+
 ;; ## Query Clause Processors
 
 (defmulti apply-form
@@ -86,8 +92,9 @@
                        :avg      `(aggregate (~'avg ~field) :avg)
                        :distinct `(aggregate (~'count (raw ~(format "DISTINCT(\"%s\")" (name field)))) :count)
                        :stddev   `(fields [(sqlfn :stddev ~field) :stddev])
-                       :sum      `(aggregate (~'sum ~field) :sum)))))
-                 ;; TODO - `:cum_sum` is not yet implemented (!)
+                       :sum      `(aggregate (~'sum ~field) :sum)
+                       :cum_sum  `[(fields ~field)     ; just make sure this field is returned + included in GROUP BY
+                                   (group ~field)])))) ; cumulative sum happens in post-processing (see below)
 
 ;; ### `:breakout`
 ;; ex.
@@ -193,6 +200,51 @@
   nil)
 
 
+;; ## Post Processing
+
+(defn- post-process
+  "Post-processing stage for query results."
+  [{query :query} results]
+  (cond
+    (query-is-cumulative-sum? query) (apply-cumulative-sum query results)
+    :else                            (do results)))
+
+;; ### Cumulative sum
+;; Cumulative sum is a special case. We can't do it in the DB because it's not a SQL function; thus we do it as a post-processing step.
+
+(defn- query-is-cumulative-sum?
+  "Is this a cumulative sum query?"
+  [query]
+  (some->> query
+           :aggregation
+           first
+           (= "cum_sum")))
+
+(defn- cumulative-sum
+  "Recursively cumulative sum a sequence of VALUES."
+  ([values]
+   {:pre [(sequential? values)
+          (every? number? values)]}
+   (cumulative-sum 0 [] values))
+  ([acc-sum acc-values [value & more]]
+   (let [acc-sum (+ acc-sum value)
+         acc-values (conj acc-values acc-sum)]
+     (if-not (seq? more) acc-values
+             (recur acc-sum acc-values more)))))
+
+(defn- apply-cumulative-sum
+  "Apply `cumulative-sum` to values of the aggregate `Field` in RESULTS."
+  {:arglists '([query results])}
+  [{[_ field-id] :aggregation} results]
+  (let [field (field-id->kw field-id)
+        values (->> results          ; make a sequence of cumulative sum values for each row
+                    (map field)
+                    cumulative-sum)]
+    (map (fn [row value]              ; replace the value for each row with the cumulative sum value
+           (assoc row field value))
+         results values)))
+
+
 ;; ## Debugging Functions (Internal)
 
 (defn- log-query
diff --git a/src/metabase/driver/generic_sql/sync.clj b/src/metabase/driver/generic_sql/sync.clj
index fbc60d840e7..98582dfb3b6 100644
--- a/src/metabase/driver/generic_sql/sync.clj
+++ b/src/metabase/driver/generic_sql/sync.clj
@@ -63,15 +63,15 @@
 (defn sync-tables
   [{:keys [id] :as database}]
   (with-jdbc-metadata database                                                                ; with-jdbc-metadata reuses *jdbc-metadata* in any call to it inside the fn passed to it
-    (fn [_]                                                                                   ; by wrapping the entire sync operation in this we can reuse the same connection throughout
+    (fn [_]                                                                                    ; by wrapping the entire sync operation in this we can reuse the same connection throughout
       (->> (table-names database)
         (pmap (fn [table-name]
                 (binding [*entity-overrides* {:transforms [#(assoc % :db (delay database))]}] ; add a korma transform to Table that will assoc :db on results.
                   (let [table (or (sel :one Table :db_id id :name table-name)                 ; Table's post-select only sets :db if it's not already set.
-                                (ins Table                                                    ; This way, we can reuse a single `database` instead of creating
-                                  :db_id id                                                   ; a few dozen duplicate instances of it.
-                                  :name table-name                                            ; We can re-use one korma connection pool instead of
-                                  :active true))]                                             ; creating dozens of them, which was causing issues with too
+                                  (ins Table                                                  ; This way, we can reuse a single `database` instead of creating
+                                    :db_id id                                                 ; a few dozen duplicate instances of it.
+                                    :name table-name                                          ; We can re-use one korma connection pool instead of
+                                    :active true))]                                           ; creating dozens of them, which was causing issues with too
                     (update-table-row-count table)                                            ; many open connections.
                     (sync-fields table)
                     (log/debug "Synced" table-name)))))
diff --git a/src/metabase/models/field.clj b/src/metabase/models/field.clj
index 8e38f66cb87..988876f77c0 100644
--- a/src/metabase/models/field.clj
+++ b/src/metabase/models/field.clj
@@ -94,3 +94,7 @@
   (cond-> (assoc field :updated_at (u/new-sql-timestamp))
     field_type   (assoc :field_type   (name field_type))
     special_type (assoc :special_type (name special_type))))
+
+(defmethod pre-cascade-delete Field [_ {:keys [id]}]
+  (cascade-delete ForeignKey (where (or (= :origin_id id)
+                                        (= :destination_id id)))))
diff --git a/test/metabase/driver/generic_sql/query_processor_test.clj b/test/metabase/driver/generic_sql/query_processor_test.clj
index 2f2874059d5..84a02ebd3ba 100644
--- a/test/metabase/driver/generic_sql/query_processor_test.clj
+++ b/test/metabase/driver/generic_sql/query_processor_test.clj
@@ -135,7 +135,7 @@
 ;; ## "PAGE" CLAUSE
 ;; Test that we can get "pages" of results.
 
-;; Get the first page
+;; ### PAGE - Get the first page
 (expect {:status :completed
          :row_count 5
          :data {:rows [[1 "African"]
@@ -154,7 +154,7 @@
                                    :page 1}
                             :order_by [[(field->id :categories :name) "ascending"]]}}))
 
-;; Get the second page
+;; ### PAGE - Get the second page
 (expect {:status :completed
          :row_count 5
          :data {:rows [[6 "Bakery"]
@@ -235,7 +235,7 @@
           {:special_type "latitude", :base_type "FloatField", :description nil, :name "LATITUDE", :table_id (table->id :venues), :id (field->id :venues :latitude)}
           {:special_type nil, :base_type "TextField", :description nil, :name "NAME", :table_id (table->id :venues), :id (field->id :venues :name)}]))
 
-;; FILTER -- "AND", ">", ">="
+;; ### FILTER -- "AND", ">", ">="
 (expect {:status :completed,
          :row_count 5,
          :data
@@ -256,7 +256,7 @@
                                   :breakout [nil]
                                   :limit nil}}))
 
-;; FILTER -- "AND", "<", ">", "!="
+;; ### FILTER -- "AND", "<", ">", "!="
 (expect
     {:status :completed
      :row_count 2
@@ -275,7 +275,7 @@
                             :breakout [nil]
                             :limit nil}}))
 
-;; FILTER -- "BETWEEN", single subclause (neither "AND" nor "OR")
+;; ### FILTER -- "BETWEEN", single subclause (neither "AND" nor "OR")
 (expect
     {:status :completed
      :row_count 2
@@ -291,7 +291,7 @@
                             :breakout [nil]
                             :limit nil}}))
 
-;; FILTER -- "OR", "<=", "="
+;; ### FILTER -- "OR", "<=", "="
 (expect
     {:status :completed,
      :row_count 4,
@@ -317,6 +317,7 @@
 
 
 ;; ## "BREAKOUT"
+;; ### "BREAKOUT" - SINGLE COLUMN
 (expect {:status :completed,
          :row_count 15,
          :data {:rows [[1 31] [2 70] [3 75] [4 77] [5 69] [6 70] [7 76] [8 81] [9 68] [10 78] [11 74] [12 59] [13 76] [14 62] [15 34]],
@@ -332,7 +333,7 @@
                                   :order_by [[(field->id :checkins :user_id) "ascending"]]
                                   :limit nil}}))
 
-;; ## "BREAKOUT" - MULTIPLE COLUMNS W/ IMPLICT "ORDER_BY"
+;; ### "BREAKOUT" - MULTIPLE COLUMNS W/ IMPLICT "ORDER_BY"
 ;; Fields should be implicitly ordered :ASC for all the fields in `breakout` that are not specified in `order_by`
 (expect {:status :completed,
          :row_count 10,
@@ -349,7 +350,7 @@
                                   :breakout [(field->id :checkins :user_id)
                                              (field->id :checkins :venue_id)]}}))
 
-;; ## "BREAKOUT" - MULTIPLE COLUMNS W/ EXPLICIT "ORDER_BY"
+;; ### "BREAKOUT" - MULTIPLE COLUMNS W/ EXPLICIT "ORDER_BY"
 ;; `breakout` should not implicitly order by any fields specified in `order_by`
 (expect {:status :completed,
          :row_count 10,
@@ -379,3 +380,50 @@
                                   :aggregation ["rows"]
                                   :breakout [nil]
                                   :limit nil}}))
+
+
+;; # POST PROCESSING TESTS
+
+;; ## CUMULATIVE SUM
+;; ### Simple cumulative sum w/o any breakout
+(expect {:status :completed
+         :row_count 15
+         :data {:rows [[1] [3] [6] [10] [15] [21] [28] [36] [45] [55] [66] [78] [91] [105] [120]]
+                :columns ["ID"]
+                :cols [{:special_type "id", :base_type "IntegerField", :description nil, :name "ID", :table_id (table->id :users), :id (field->id :users :id)}]}}
+  (process-and-run {:type :query
+                    :database @db-id
+                    :query {:limit nil
+                            :source_table (table->id :users)
+                            :filter [nil nil]
+                            :breakout [nil]
+                            :aggregation ["cum_sum" (field->id :users :id)]}}))
+
+;; ### Cumulative sum w/ a breakout field
+(expect {:status :completed,
+         :row_count 15,
+         :data {:rows [[4 #inst "2014-01-01T08:00:00.000-00:00"]  ; incidentally this also tests that the QP is casting timestamps -> date
+                       [12 #inst "2014-02-01T08:00:00.000-00:00"]
+                       [13 #inst "2014-04-01T07:00:00.000-00:00"]
+                       [22 #inst "2014-04-03T07:00:00.000-00:00"]
+                       [34 #inst "2014-07-02T07:00:00.000-00:00"]
+                       [44 #inst "2014-07-03T07:00:00.000-00:00"]
+                       [57 #inst "2014-08-01T07:00:00.000-00:00"]
+                       [72 #inst "2014-08-01T07:00:00.000-00:00"]
+                       [78 #inst "2014-08-02T07:00:00.000-00:00"]
+                       [85 #inst "2014-08-02T07:00:00.000-00:00"]
+                       [90 #inst "2014-10-03T07:00:00.000-00:00"]
+                       [104 #inst "2014-10-03T07:00:00.000-00:00"]
+                       [115 #inst "2014-11-01T07:00:00.000-00:00"]
+                       [118 #inst "2014-11-06T08:00:00.000-00:00"]
+                       [120 #inst "2014-12-05T08:00:00.000-00:00"]]
+                :columns ["ID" "CAST(LAST_LOGIN AS DATE)"]
+                :cols [{:special_type "id", :base_type "IntegerField", :description nil, :name "ID", :table_id (table->id :users), :id (field->id :users :id)}
+                       {:special_type nil, :base_type "DateTimeField", :description nil, :name "LAST_LOGIN", :table_id (table->id :users), :id (field->id :users :last_login)}]}}
+  (process-and-run {:type :query
+                    :database @db-id
+                    :query {:limit nil
+                            :source_table (table->id :users)
+                            :filter [nil nil]
+                            :breakout [(field->id :users :last_login)]
+                            :aggregation ["cum_sum" (field->id :users :id)]}}))
-- 
GitLab