Skip to content
Snippets Groups Projects
Commit 45081ec0 authored by Cam Saul's avatar Cam Saul
Browse files

cum_sum is now working correctly

parent 28d7bca9
No related branches found
No related tags found
No related merge requests found
......@@ -127,7 +127,8 @@
(binding [qp/*query* query]
(let [driver (database-id->driver (:database query))
query (qp/preprocess query)
results (i/process-query driver (dissoc-in query [:query :cum_sum]))] ; strip out things that individual impls don't need to know about / deal with
results (binding [qp/*query* query]
(i/process-query driver (dissoc-in query [:query :cum_sum])))] ; strip out things that individual impls don't need to know about / deal with
(qp/post-process driver query results))))
......@@ -205,6 +206,7 @@
(query-complete query-execution query-result (:cache_result options)))
(catch Exception ex
(log/warn ex)
(.printStackTrace ex)
(query-fail query-execution (.getMessage ex))))))
......
......@@ -85,7 +85,8 @@
(aggregate (count :*) :count)
(where {(keyword (:name field)) [not= nil]})) first :count)]
(if (= total-non-null-count 0) 0.0
(let [url-count (-> (select korma-table
(aggregate (count :*) :count)
(where {(keyword (:name field)) [like "http%://_%.__%"]})) first :count)]
(let [url-count (or (-> (select korma-table
(aggregate (count :*) :count)
(where {(keyword (:name field)) [like "http%://_%.__%"]})) first :count)
0)]
(float (/ url-count total-non-null-count)))))))
......@@ -98,9 +98,14 @@
;;
;; [1412 1413]
(defmethod apply-form :breakout [[_ field-ids]]
(let [field-names (map field-id->kw field-ids)]
(let [ ;; Group by all the breakout fields
field-names (map field-id->kw field-ids)
;; Add fields form only for fields that weren't specified in :fields clause -- we don't want to include it twice, or korma will barf
fields-not-in-fields-clause-names (->> field-ids
(filter (partial (complement contains?) (set (:fields (:query qp/*query*)))))
(map field-id->kw))]
`[(group ~@field-names)
(fields ~@field-names)]))
(fields ~@fields-not-in-fields-clause-names)]))
;; ### `:fields`
......
......@@ -2,6 +2,7 @@
"Functions related to annotating results returned by the Query Processor."
(:require [metabase.db :refer :all]
[metabase.driver.query-processor :as qp]
[metabase.driver.generic-sql.util :as gsu]
[metabase.models.field :refer [Field field->fk-table]]))
(declare get-column-names
......@@ -22,7 +23,7 @@
:data {:rows (->> results
(map #(map % ; pull out the values in each result in the same order we got from get-column-names
(map keyword column-names))))
:columns column-names
:columns (map uncastify column-names)
:cols (get-column-info query column-names)}}))
(defn- order-columns
......@@ -37,10 +38,10 @@
(let [field-ids (-> query :query :fields)
fields-clause-fields (when-not (or (empty? field-ids)
(= field-ids [nil]))
(let [field-id->name (->> (sel :many [Field :id :name]
:id [in field-ids]) ; Fetch names of fields from `fields` clause
(map (fn [{:keys [id name]}] ; build map of field-id -> field-name
{id (keyword name)}))
(let [field-id->name (->> (sel :many [Field :id :name :base_type]
:id [in field-ids]) ; Fetch names of fields from `fields` clause
(map (fn [{:keys [id name base_type]}] ; build map of field-id -> field-name
{id (gsu/field-name+base-type->castified-key name base_type)}))
(into {}))]
(map field-id->name field-ids))) ; now get names in same order as the IDs
other-fields (->> (first results)
......
......@@ -90,7 +90,6 @@
(sel :one Table :id table-id)
(throw (Exception. (format "Table with ID %d doesn't exist!" table-id))))))
(defn castify-field
"Wrap Field in a SQL `CAST` statement if needed (i.e., it's a `:DateTimeField`).
......@@ -103,6 +102,15 @@
(if (contains? #{:DateField :DateTimeField} field-base-type) `(korma/raw ~(format "CAST(\"%s\" AS DATE)" field-name))
(keyword field-name)))
(defn field-name+base-type->castified-key
"Like `castify-field`, but returns a keyword that should match the one returned in results."
[field-name field-base-type]
{:pre [(string? field-name)
(keyword? field-base-type)]
:post [(keyword? %)]}
(if (contains? #{:DateField :DateTimeField} field-base-type) (keyword (format "CAST(%s AS DATE)" field-name))
(keyword field-name)))
(def field-id->kw
"Given a metabase `Field` ID, return a keyword for use in the Korma form (or a casted raw string for date fields)."
(memoize ; This can be memozied since the names and base_types of Fields never change
......
......@@ -22,7 +22,7 @@
(defn- table->column-names
"Return a set of the column names for TABLE."
[table]
(with-mongo-connection [conn @(:db table)]
(with-mongo-connection [^com.mongodb.DBApiLayer conn @(:db table)]
(->> (mc/find-maps conn (:name table))
(r/map keys)
(r/map set)
......@@ -43,7 +43,7 @@
IDriver
;;; ### Connection
(can-connect? [_ database]
(with-mongo-connection [conn database]
(with-mongo-connection [^com.mongodb.DBApiLayer conn database]
(= (-> (cmd/db-stats conn)
(conv/from-db-object :keywordize)
:ok)
......
......@@ -44,15 +44,15 @@
(defn preprocess-structured
"Preprocess a strucuted QUERY dict."
[query]
(let [pp (update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by
preprocess-cumulative-sum))]
(let [preprocessed-query (update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by
preprocess-cumulative-sum))]
(when-not *disable-qp-logging*
(log/debug (colorize.core/cyan "******************** PREPROCESSED: ********************\n"
(with-out-str (clojure.pprint/pprint pp)) "\n"
(log/debug (colorize.core/cyan "\n******************** PREPROCESSED: ********************\n"
(with-out-str (clojure.pprint/pprint preprocessed-query)) "\n"
"*******************************************************\n")))
pp))
preprocessed-query))
;; ## PREPROCESSOR FNS
......@@ -100,21 +100,27 @@
(defn preprocess-cumulative-sum
"Rewrite queries containing a cumulative sum (`cum_sum`) aggregation to simply fetch the values of the aggregate field instead.
(Cumulative sum is a special case; it is implemented in post-processing)."
[{aggregation :aggregation, :as query}]
(match aggregation
["cum_sum" field-id] (merge query
;; Mark the Query dict as cum_sum so we know to apply post-processing later
{:cum_sum true}
;; A Query that combines breakout + cum_sum needs to be rewritten as a query that gets the
;; sum of the aggregate field for each value of the breakout field
(if (:breakout query) {:breakout [field-id]
:aggregation ["sum" field-id]
:order_by (conj (or (vec (:order_by query)) [])
[field-id "ascending"])}
;; Otherwise we are only interested in the values of the aggregate field itself
{:aggregation ["rows"]
:fields [field-id]}))
_ query))
[{[ag-type ag-field :as aggregation] :aggregation, breakout-fields :breakout, order-by :order_by, :as query}]
(let [cum-sum? (= ag-type "cum_sum")
has-breakout? (not (empty? breakout-fields))]
(cond
;; Cumulative sum is only applicable if it has breakout fields
;; Rewrite the query as a simple "rows" aggregation that fetches the fields in cum_sum and breakout
;; cum_sum will happen in post-processing
;; Store the cumulative sum field under the key :cum_sum so we know which one to sum later
(and cum-sum?
has-breakout?) (-> query
(dissoc :breakout)
(assoc :cum_sum ag-field
:aggregation ["rows"]
:fields (distinct (concat breakout-fields [ag-field]))))
;; Cumulative sum without any breakout fields should just be treated the same way as "sum". Rewrite query as such
cum-sum? (assoc query
:aggregation ["sum" ag-field])
;; Otherwise if this isn't a cum_sum query return it as-is
:else query)))
;; # POSTPROCESSOR
......@@ -123,29 +129,47 @@
(defn post-process-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
{:arglists '([driver query results])}
[driver {cumulative-sum? :cum_sum, :as query} {data :data, :as results}]
(if-not cumulative-sum? results
(let [field-id (or (first (:fields query))
(second (:aggregation query)))
_ (assert (integer? field-id))
;; Determine the index of the cum_sum field by matching field-id in the result columns
field-index (->> (:cols data)
(map-indexed (fn [i column]
(when (= (:id column) field-id)
i)))
(filter identity)
first)
_ (assert (integer? field-index))
;; Make a sequence of cumulative sum values for each row
rows (:rows data)
values (->> rows
(map #(nth % field-index))
(reductions +))]
;; Replace the value in each row
(->> (map (fn [row value]
(assoc (vec row) field-index value))
rows values)
{:arglists '([query results])}
[{cum-sum-field :cum_sum, :as query} {{rows :rows, cols :cols, :as data} :data, :as results}]
(if-not cum-sum-field results
(let [ ;; Determine the index of the field we need to cumulative sum
cum-sum-field-index (->> cols
(map-indexed (fn [i {field-id :id}]
(when (= field-id cum-sum-field)
i)))
(filter identity)
first)
_ (assert (integer? cum-sum-field-index))
;; Now make a sequence of cumulative sum values for each row
values (->> rows
(map #(nth % cum-sum-field-index))
(reductions +))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-sum-field-index value))
rows values)
;; We only want to return a single row for each value of the breakout columns.
;; e.g.
;; 2014-04-03 3 2014-04-03 8 ; only return one row for 2014-04-03
;; 2014-04-03 5 ---> 2014-04-04 18
;; 2014-04-04 10
;;
;; We'll reverse the sequence of rows, then filter out all rows whose breakout field values were the same as the last.
;; Then we'll reverse the sequence again to restore the original order.
remove-cum-sum-value (fn [row]
(concat (subvec row 0 cum-sum-field-index)
(subvec row (inc cum-sum-field-index) (count row))))
previous-row-breakout-field-values (atom nil)
same-breakout-field-values-as-previous-row? (fn [row]
(let [breakout-values (remove-cum-sum-value row)
previous-values @previous-row-breakout-field-values]
(reset! previous-row-breakout-field-values breakout-values)
(and (not (empty? breakout-values))
(= breakout-values previous-values))))]
(->> rows
reverse
(filter (complement same-breakout-field-values-as-previous-row?))
reverse
(assoc-in results [:data :rows])))))
(defn post-process
......@@ -155,7 +179,7 @@
:native results
:query (let [query (:query query)]
(->> results
(post-process-cumulative-sum driver query)))))
(post-process-cumulative-sum query)))))
;; # COMMON ANNOTATION FNS
......
......@@ -38,20 +38,3 @@
:aggregation ["stddev" (field->id :venues :latitude)]
:breakout [nil]
:limit nil}}))
;; ### Cumulative sum w/ a breakout field
(expect {:status :completed
:row_count 15
:data
{:rows [[4 4M] [12 8M] [13 1M] [22 9M] [34 12M] [44 10M] [57 13M] [72 15M] [78 6M] [85 7M] [90 5M] [104 14M] [115 11M] [118 3M] [120 2M]]
:columns ["ID" "sum"]
:cols [{:extra_info {}, :special_type :id, :base_type :BigIntegerField, :description nil, :name "ID", :table_id (table->id :users), :id (field->id :users :id)}
{:base_type :BigIntegerField, :special_type :id, :name "sum", :id nil, :table_id nil, :description nil}]}}
(driver/process-query {:type :query
:database @db-id
:query {:limit nil
:source_table (table->id :users)
:filter [nil nil]
:breakout [(field->id :users :last_login)]
:aggregation ["cum_sum" (field->id :users :id)]}}))
......@@ -262,7 +262,7 @@
:table_id (id :users)
:id (id :users :id)}
:name {:extra_info {}
:special_type nil
:special_type :category
:base_type :TextField
:description nil
:name (format-name *driver-dataset* "name")
......@@ -606,13 +606,45 @@
;; ## CUMULATIVE SUM
;; ### Simple cumulative sum w/o any breakout
;; ### cum_sum w/o breakout should be treated the same as sum
(qp-expect-with-all-drivers
{:rows [[(case (id-field-type *driver-dataset*)
:IntegerField 120
:BigIntegerField 120M)]]
:columns ["sum"]
:cols [{:base_type (id-field-type *driver-dataset*), :special_type :id, :name "sum", :id nil, :table_id nil, :description nil}]}
{:source_table (id :users)
:aggregation ["cum_sum" (id :users :id)]})
;; ### Simple cumulative sum where breakout field is same as cum_sum field
(qp-expect-with-all-drivers
{:rows [[1] [3] [6] [10] [15] [21] [28] [36] [45] [55] [66] [78] [91] [105] [120]]
:columns (->columns "id")
:cols [(users-col :id)]}
{:limit nil
:source_table (id :users)
:filter [nil nil]
:breakout [nil]
{:source_table (id :users)
:breakout [(id :users :id)]
:aggregation ["cum_sum" (id :users :id)]})
;; ### Cumulative sum w/ a different breakout field
(qp-expect-with-all-drivers
{:rows [["Broen Olujimi" 14]
["Conchúr Tihomir" 21]
["Dwight Gresham" 34]
["Felipinho Asklepios" 36]
["Frans Hevel" 46]
["Kaneonuskatew Eiran" 49]
["Kfir Caj" 61]
["Nils Gotam" 70]
["Plato Yeshua" 71]
["Quentin Sören" 76]
["Rüstem Hebel" 91]
["Shad Ferdynand" 97]
["Simcha Yan" 101]
["Spiros Teofil" 112]
["Szymon Theutrich" 120]]
:columns (->columns "name" "id")
:cols [(users-col :name)
(users-col :id)]}
{:source_table (id :users)
:breakout [(id :users :name)]
:aggregation ["cum_sum" (id :users :id)]})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment