Skip to content
Snippets Groups Projects
Commit e042cb21 authored by Cam Saul's avatar Cam Saul
Browse files

Unix timestamp aggregation support

parent cef33d7b
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@
[metabase.db :refer [exists? ins sel upd]]
(metabase.driver [interface :as i]
[query-processor :as qp])
[metabase.driver.query-processor.expand :as expand]
(metabase.models [database :refer [Database]]
[query-execution :refer [QueryExecution]])
[metabase.models.setting :refer [defsetting]]
......@@ -134,6 +135,7 @@
{:pre [(map? query)]}
(try
(binding [qp/*query* query
qp/*expanded-query* (expand/expand query)
qp/*internal-context* (atom {})]
(let [driver (database-id->driver (:database query))
query (qp/preprocess query)
......
......@@ -39,7 +39,11 @@
;; # DYNAMIC VARS
(def ^:dynamic *query*
"The query we're currently processing (i.e., the body of the query API call)."
"The query we're currently processing, in its original, unexpanded form."
nil)
(def ^:dynamic *expanded-query*
"The query we're currently processing, in its expanded form."
nil)
(def ^:dynamic *disable-qp-logging*
......@@ -80,7 +84,7 @@
;; ## PREPROCESSOR FNS
;; ### REMOVE-EMPTY-CLAUSES
(def ^:const clause->empty-forms
(def ^:private ^:const clause->empty-forms
"Clause values that should be considered empty and removed during preprocessing."
{:breakout #{[nil]}
:filter #{[nil nil]}})
......@@ -150,12 +154,12 @@
[{:keys [field], {timestamp :value, special-type :special-type, :as value} :value}]
;; The timestamps we create 00:00 on the day in question, re-write the filter as a ["BETWEEN" field timestamp (+ timestamp 1-day)]
(expand/map->Filter:Between {:type :between
:field field
:min value
:max (expand/map->Value (assoc value
:value (+ timestamp (case special-type
:timestamp_seconds seconds-per-day
:timestamp_milliseconds milliseconds-per-day))))}))
:field field
:min value
:max (expand/map->Value (assoc value
:value (+ timestamp (case special-type
:timestamp_seconds seconds-per-day
:timestamp_milliseconds milliseconds-per-day))))}))
(defn preprocess-rewrite-timestamp-equals-filter
"In order for `=` filter clauses to work with timestamps (allowing the user to match a given day) we need to rewrite them as
......@@ -165,16 +169,14 @@
;; If there's no filter clause there's nothing to do
query
;; Otherwise rewrite as needed
(update-in query [:filter] (fn [filter-clause]
(-> filter-clause
expand/expand-filter
(update-in [:subclauses] #(for [{:keys [filter-type], {:keys [special-type]} :field, :as subclause} %]
(if (and (= filter-type :=)
(contains? #{:timestamp_seconds
:timestamp_milliseconds} special-type))
(rewrite-timestamp-filter= subclause)
subclause)))
expand/collapse)))))
(assoc query :filter (-> (:filter *expanded-query*)
(update-in [:subclauses] #(for [{:keys [filter-type], {:keys [special-type]} :field, :as subclause} %]
(if (and (= filter-type :=)
(contains? #{:timestamp_seconds
:timestamp_milliseconds} special-type))
(rewrite-timestamp-filter= subclause)
subclause)))
expand/collapse))))
;; ### PREPROCESS-CUMULATIVE-SUM
......@@ -217,6 +219,46 @@
;; # POSTPROCESSOR
;; ### PERFORM-UNIX-TIMESTAMP-AGGREGATION
(defn perform-unix-timestamp-aggregation
"Unix timestamp support is implemented entirely in Clojure-land -- Databases themeselves are working directly with
integers as far as they're concerned. Some functionality, like aggregations, must be implemented in Clojure.
Check and see if we need to do any post-processing to aggregate a Unix timestamp column, and, if so, perform
the appropriate aggregation."
[results]
(let [{{[breakout-field] :breakout} :query} *expanded-query*]
(if-not (contains? #{:timestamp_seconds :timestamp_milliseconds} (:special-type breakout-field)) results
(let [ ;; Procure an appropriate function to perform the aggregation for the resulting rows
ag-fn (case (-> *expanded-query* :query :aggregation :aggregation-type)
:count count
:avg #(/ (reduce + %) (count %))
:distinct #(count (set %)) ; (!!!)
:sum (partial reduce +))
;; Convert the dates returned by the rows to their string equivalent so we can aggregate them
;; appropriately. Each row should look like [#inst<...> value]; convert to rows that look like
;; ["YYYY-MM-DD" value].
rows (for [[timestamp val] (:rows results)]
[(u/date->yyyy-mm-dd timestamp) val])
;; Rows already come back in the correct order; lazily partition the sequence into separate sequences
;; for each distinct date value.
;; That will give us a sequence like:
;; [[["06-01-2015" val1] ["06-01-2015" val2]]
;; [["06-02-2015" val3]]
;; ...]
partitions (partition-by first rows)
;; Now take each of these partitions and convert them back into single rows by grabbing the
;; date string from the first row and then applying AG-FN to the values
aggregated-rows (for [[[date-str] :as rows] partitions]
[date-str (ag-fn (map second rows))])]
;; Return the updated results
(assoc results :rows aggregated-rows)))))
;; ### POST-PROCESS-CUMULATIVE-SUM
(defn post-process-cumulative-sum
......@@ -301,11 +343,12 @@
(count (set (:columns results))))
(format "Duplicate columns in results: %s" (vec (:columns results))))
(->> results
convert-unix-timestamps-to-dates
perform-unix-timestamp-aggregation
limit-max-result-rows
(#(case (keyword (:type query))
:native %
:query (post-process-cumulative-sum (:query query) %)))
convert-unix-timestamps-to-dates
add-row-count-and-status))
......
......@@ -50,7 +50,9 @@
[metabase.util :as u])
(:import (clojure.lang Keyword)))
(declare parse-filter
(declare parse-aggregation
parse-breakout
parse-filter
with-resolved-fields)
;; ## -------------------- Protocols --------------------
......@@ -76,9 +78,23 @@
IResolveField {:resolve-field (fn [this _] this)}
ICollapse {:collapse-one identity})
(extend nil
IResolveField {:resolve-field (constantly nil)})
;; ## -------------------- Public Interface --------------------
(defn- parse [query-dict]
(update-in query-dict [:query] #(assoc %
:aggregation (parse-aggregation (:aggregation %))
:breakout (parse-breakout (:breakout %))
:filter (parse-filter (:filter %)))))
(defn expand
"Expand a query-dict."
[query-dict]
(with-resolved-fields parse query-dict))
(defn expand-filter
"Expand a `filter` clause."
[filter-clause]
......@@ -203,8 +219,34 @@
"Convenience for writing a parser function, i.e. one that pattern-matches against a lone argument."
[fn-name & match-forms]
`(defn ~(vary-meta fn-name assoc :private true) [form#]
(match form#
~@match-forms)))
(when (and form#
(or (not (sequential? form#))
(seq form#)))
(match form#
~@match-forms))))
;; ## -------------------- Aggregation --------------------
(defrecord Aggregation [^Keyword aggregation-type
^Field field])
(defparser parse-aggregation
["rows"] (->Aggregation :rows nil)
["count"] (->Aggregation :count nil)
["avg" field-id] (->Aggregation :avg (ph field-id))
["count" field-id] (->Aggregation :count (ph field-id))
["distinct" field-id] (->Aggregation :distinct (ph field-id))
["stddev" field-id] (->Aggregation :stddev (ph field-id))
["sum" field-id] (->Aggregation :sum (ph field-id))
["cum_sum" field-id] (->Aggregation :cumulative-sum (ph field-id)))
;; ## -------------------- Breakout --------------------
(defrecord Breakout [fields])
(defparser parse-breakout
[& field-ids] (mapv ph field-ids))
;; ## -------------------- Filter --------------------
......@@ -237,7 +279,7 @@
(collapse-one [_]
["BETWEEN" field min-val max-val]))
(defn- collapse-filter-type [^clojure.lang.Keyword filter-type]
(defn- collapse-filter-type [^Keyword filter-type]
(-> filter-type
name
(s/replace #"-" "_")
......
......@@ -46,16 +46,21 @@
(coerce/to-long)
(java.sql.Date.)))
(def ^:private ^java.text.SimpleDateFormat simple-date-format
(def ^:private ^java.text.SimpleDateFormat yyyy-mm-dd-simple-date-format
(java.text.SimpleDateFormat. "yyyy-MM-dd"))
(defn parse-date-yyyy-mm-dd
"Parse a date in the `yyyy-mm-dd` format and return a `java.sql.Date`."
^java.sql.Date [^String date]
(-> (.parse simple-date-format date)
(-> (.parse yyyy-mm-dd-simple-date-format date)
.getTime
java.sql.Date.))
(defn date->yyyy-mm-dd
"Convert a date to a `YYYY-MM-DD` string."
^String [^java.util.Date date]
(.format yyyy-mm-dd-simple-date-format date))
(defn date-yyyy-mm-dd->unix-timestamp
"Convert a string DATE in the `YYYY-MM-DD` format to a Unix timestamp in seconds."
^Float [^String date]
......
......@@ -78,63 +78,69 @@
(defn get-or-create-database!
"Create DBMS database associated with DATABASE-DEFINITION, create corresponding Metabase `Databases`/`Tables`/`Fields`, and sync the `Database`.
DATASET-LOADER should be an object that implements `IDatasetLoader`."
[dataset-loader {:keys [database-name], :as ^DatabaseDefinition database-definition}]
(let [engine (engine dataset-loader)]
(or (metabase-instance database-definition engine)
(do
;; Create the database
(log/info (color/blue (format "Creating %s database %s..." (name engine) database-name)))
(create-physical-db! dataset-loader database-definition)
;; Load data
(log/info (color/blue "Loading data..."))
(doseq [^TableDefinition table-definition (:table-definitions database-definition)]
(log/info (color/blue (format "Loading data for table '%s'..." (:table-name table-definition))))
(load-table-data! dataset-loader database-definition table-definition)
(log/info (color/blue (format "Inserted %d rows." (count (:rows table-definition))))))
;; Add DB object to Metabase DB
(log/info (color/blue "Adding DB to Metabase..."))
(let [db (ins Database
:name database-name
:engine (name engine)
:details (database->connection-details dataset-loader database-definition))]
;; Sync the database
(log/info (color/blue "Syncing DB..."))
(driver/sync-database! db)
;; Add extra metadata like Field field-type, base-type, etc.
(log/info (color/blue "Adding schema metadata..."))
(doseq [^TableDefinition table-definition (:table-definitions database-definition)]
(let [table-name (:table-name table-definition)
table (delay (let [table (metabase-instance table-definition db)]
(assert table)
table))]
(doseq [{:keys [field-name field-type special-type], :as field-definition} (:field-definitions table-definition)]
(let [field (delay (let [field (metabase-instance field-definition @table)]
(assert field)
field))]
(when field-type
(log/info (format "SET FIELD TYPE %s.%s -> %s" table-name field-name field-type))
(upd Field (:id @field) :field_type (name field-type)))
(when special-type
(log/info (format "SET SPECIAL TYPE %s.%s -> %s" table-name field-name special-type))
(upd Field (:id @field) :special_type (name special-type)))))))
(log/info (color/blue "Finished."))
db)))))
DATASET-LOADER should be an object that implements `IDatasetLoader`; it defaults to the value returned by the method `dataset-loader` for the
current dataset (`*dataset*`), which is H2 by default."
([^DatabaseDefinition database-definition]
(get-or-create-database! (dataset-loader) database-definition))
([dataset-loader {:keys [database-name], :as ^DatabaseDefinition database-definition}]
(let [engine (engine dataset-loader)]
(or (metabase-instance database-definition engine)
(do
;; Create the database
(log/info (color/blue (format "Creating %s database %s..." (name engine) database-name)))
(create-physical-db! dataset-loader database-definition)
;; Load data
(log/info (color/blue "Loading data..."))
(doseq [^TableDefinition table-definition (:table-definitions database-definition)]
(log/info (color/blue (format "Loading data for table '%s'..." (:table-name table-definition))))
(load-table-data! dataset-loader database-definition table-definition)
(log/info (color/blue (format "Inserted %d rows." (count (:rows table-definition))))))
;; Add DB object to Metabase DB
(log/info (color/blue "Adding DB to Metabase..."))
(let [db (ins Database
:name database-name
:engine (name engine)
:details (database->connection-details dataset-loader database-definition))]
;; Sync the database
(log/info (color/blue "Syncing DB..."))
(driver/sync-database! db)
;; Add extra metadata like Field field-type, base-type, etc.
(log/info (color/blue "Adding schema metadata..."))
(doseq [^TableDefinition table-definition (:table-definitions database-definition)]
(let [table-name (:table-name table-definition)
table (delay (let [table (metabase-instance table-definition db)]
(assert table)
table))]
(doseq [{:keys [field-name field-type special-type], :as field-definition} (:field-definitions table-definition)]
(let [field (delay (let [field (metabase-instance field-definition @table)]
(assert field)
field))]
(when field-type
(log/info (format "SET FIELD TYPE %s.%s -> %s" table-name field-name field-type))
(upd Field (:id @field) :field_type (name field-type)))
(when special-type
(log/info (format "SET SPECIAL TYPE %s.%s -> %s" table-name field-name special-type))
(upd Field (:id @field) :special_type (name special-type)))))))
(log/info (color/blue "Finished."))
db))))))
(defn remove-database!
"Delete Metabase `Database`, `Fields` and `Tables` associated with DATABASE-DEFINITION, then remove the physical database from the associated DBMS.
DATASET-LOADER should be an object that implements `IDatasetLoader`."
[dataset-loader ^DatabaseDefinition database-definition]
;; Delete the Metabase Database and associated objects
(cascade-delete Database :id (:id (metabase-instance database-definition (engine dataset-loader))))
;; now delete the DBMS database
(drop-physical-db! dataset-loader database-definition))
DATASET-LOADER should be an object that implements `IDatasetLoader`; by default it is the value returned by the method `dataset-loader` for the
current dataset, bound to `*dataset*`."
([^DatabaseDefinition database-definition]
(remove-database! (dataset-loader) database-definition))
([dataset-loader ^DatabaseDefinition database-definition]
;; Delete the Metabase Database and associated objects
(cascade-delete Database :id (:id (metabase-instance database-definition (engine dataset-loader))))
;; now delete the DBMS database
(drop-physical-db! dataset-loader database-definition)))
;; ## Temporary Dataset Macros
......
(ns metabase.test.data.dataset-definitions
"Definitions of various datasets for use in tests with `with-temp-db`."
(:require [metabase.test.data.interface :refer [def-database-definition]]))
(:require [clojure.tools.reader.edn :as edn]
[metabase.test.data.interface :refer [def-database-definition]]))
;; ## Helper Functions
(defn create-unix-timestamp
"Create a Unix timestamp (in seconds).
(defn unix-timestamp-ms
"Create a Unix timestamp (in milliseconds).
(create-unix-timestamp :year 2012 :month 12 :date 27)"
(unix-timestamp-ms :year 2012 :month 12 :date 27)"
^Long [& {:keys [year month date hour minute second nano]
:or {year 0, month 1, date 1, hour 0, minute 0, second 0, nano 0}}]
(-> (java.sql.Timestamp. (- year 1900) (- month 1) date hour minute second nano)
.getTime
(/ 1000)
long)) ; coerce to long since Korma doesn't know how to insert bigints
(defn unix-timestamp
"Create a Unix timestamp, in seconds."
^Long [& args]
(apply unix-timestamp-ms args))
;; ## Datasets
(def-database-definition us-history-1607-to-1774
......@@ -24,17 +29,35 @@
{:field-name "timestamp"
:base-type :BigIntegerField
:special-type :timestamp_seconds}]
[["Jamestown Settlement Founded" (create-unix-timestamp :year 1607 :month 5 :date 14)]
["Mayflower Compact Signed" (create-unix-timestamp :year 1620 :month 11 :date 11)]
["Ben Franklin's Kite Experiment" (create-unix-timestamp :year 1752 :month 96 :date 15)]
["French and Indian War Begins" (create-unix-timestamp :year 1754 :month 5 :date 28)]
["Stamp Act Enacted" (create-unix-timestamp :year 1765 :month 3 :date 22)]
["Quartering Act Enacted" (create-unix-timestamp :year 1765 :month 3 :date 24)]
["Stamp Act Congress Meets" (create-unix-timestamp :year 1765 :month 10 :date 19)]
["Stamp Act Repealed" (create-unix-timestamp :year 1766 :month 3 :date 18)]
["Townshend Acts Passed" (create-unix-timestamp :year 1767 :month 6 :date 29)]
["Boston Massacre" (create-unix-timestamp :year 1770 :month 3 :date 5)]
["Tea Act Passed" (create-unix-timestamp :year 1773 :month 5 :date 10)]
["Boston Tea Party" (create-unix-timestamp :year 1773 :month 12 :date 16)]
["Boston Port Act Passed" (create-unix-timestamp :year 1774 :month 3 :date 31)]
["First Continental Congress Held" (create-unix-timestamp :year 1774 :month 9 :date 5)]]])
[["Jamestown Settlement Founded" (unix-timestamp :year 1607 :month 5 :date 14)]
["Mayflower Compact Signed" (unix-timestamp :year 1620 :month 11 :date 11)]
["Ben Franklin's Kite Experiment" (unix-timestamp :year 1752 :month 96 :date 15)]
["French and Indian War Begins" (unix-timestamp :year 1754 :month 5 :date 28)]
["Stamp Act Enacted" (unix-timestamp :year 1765 :month 3 :date 22)]
["Quartering Act Enacted" (unix-timestamp :year 1765 :month 3 :date 24)]
["Stamp Act Congress Meets" (unix-timestamp :year 1765 :month 10 :date 19)]
["Stamp Act Repealed" (unix-timestamp :year 1766 :month 3 :date 18)]
["Townshend Acts Passed" (unix-timestamp :year 1767 :month 6 :date 29)]
["Boston Massacre" (unix-timestamp :year 1770 :month 3 :date 5)]
["Tea Act Passed" (unix-timestamp :year 1773 :month 5 :date 10)]
["Boston Tea Party" (unix-timestamp :year 1773 :month 12 :date 16)]
["Boston Port Act Passed" (unix-timestamp :year 1774 :month 3 :date 31)]
["First Continental Congress Held" (unix-timestamp :year 1774 :month 9 :date 5)]]])
(defn random-incident []
[(rand-int 6) (unix-timestamp-ms :year 2015
:month 6
:date (+ 1 (rand-int 28)) ; 0 - 28
:hour (rand-int 24)
:minute (rand-int 60))])
(def ^:const edn-definitions-dir "./test/metabase/test/data/dataset_definitions/")
;; TODO - move this to interface
;; TODO - make rows be lazily loadable for DB definitions from a file
(defmacro def-database-definition-edn [dbname]
`(def-database-definition ~dbname
(edn/read-string (slurp ~(str edn-definitions-dir (name dbname) ".edn")))))
;; Times when the Toucan cried
(def-database-definition-edn sad-toucan-incidents)
["incidents" [{:field-name "severity"
:base-type :IntegerField}
{:field-name "timestamp"
:base-type :BigIntegerField
:special-type :timestamp_milliseconds}]
[[4 1433587200000]
[0 1433965860000]
[5 1433864520000]
[3 1435016940000]
[3 1434764700000]
[3 1433253540000]
[5 1434995940000]
[2 1433383260000]
[1 1434247980000]
[0 1434389160000]
[2 1433276880000]
[2 1433857980000]
[5 1433879640000]
[1 1435482840000]
[3 1433745540000]
[2 1434700080000]
[5 1433536440000]
[3 1434157800000]
[1 1435150440000]
[2 1434702960000]
[3 1433749020000]
[2 1435255140000]
[5 1434358080000]
[4 1433410440000]
[2 1434737820000]
[0 1433794800000]
[5 1433323500000]
[4 1434914760000]
[3 1433397480000]
[5 1435158240000]
[4 1434952620000]
[0 1434060000000]
[3 1433395440000]
[2 1435029300000]
[5 1433272620000]
[0 1433944080000]
[5 1434577620000]
[0 1434753060000]
[1 1433991600000]
[5 1433578500000]
[4 1435365600000]
[4 1433243460000]
[1 1433279820000]
[2 1433288820000]
[3 1435010460000]
[3 1435106400000]
[4 1433535060000]
[1 1433641260000]
[1 1433184900000]
[4 1434937080000]
[0 1435441740000]
[2 1434872700000]
[4 1434705600000]
[2 1435095120000]
[1 1433898300000]
[1 1434519780000]
[1 1435240020000]
[2 1434663960000]
[5 1435363560000]
[4 1434663000000]
[5 1435351860000]
[4 1434975600000]
[2 1434971400000]
[2 1433675100000]
[2 1435088280000]
[0 1433549160000]
[1 1434094740000]
[1 1434904080000]
[5 1433211180000]
[1 1433751900000]
[4 1434982440000]
[1 1433826360000]
[5 1435060020000]
[5 1434450780000]
[1 1434236700000]
[1 1433280000000]
[1 1434135600000]
[5 1434338340000]
[4 1435389960000]
[3 1434302820000]
[0 1434102900000]
[3 1435444560000]
[1 1433174760000]
[4 1434933840000]
[0 1433959800000]
[0 1433977440000]
[0 1433233200000]
[4 1434164460000]
[4 1435193040000]
[4 1435124760000]
[0 1434969660000]
[1 1434867540000]
[3 1433440560000]
[2 1433688720000]
[2 1434946500000]
[1 1433973720000]
[4 1434517080000]
[3 1434709320000]
[5 1433583780000]
[5 1433693040000]
[2 1435229280000]
[2 1435362780000]
[2 1435107540000]
[5 1435048440000]
[5 1434709800000]
[4 1433449500000]
[1 1434947760000]
[5 1433832300000]
[1 1433548500000]
[0 1434071940000]
[1 1434263820000]
[2 1433592360000]
[5 1433652720000]
[2 1435506960000]
[2 1433492460000]
[5 1433785620000]
[3 1433309820000]
[2 1433886480000]
[2 1435106220000]
[0 1434353280000]
[1 1435506780000]
[4 1434954000000]
[5 1434502080000]
[1 1433794440000]
[5 1434456660000]
[4 1434751200000]
[3 1433193540000]
[2 1435190460000]
[2 1433594280000]
[3 1433790660000]
[5 1433365620000]
[5 1433192640000]
[5 1435532520000]
[0 1434284520000]
[4 1433654760000]
[3 1433948340000]
[5 1433223420000]
[4 1435068540000]
[4 1433939580000]
[2 1434707040000]
[5 1435233180000]
[1 1433179380000]
[0 1434963540000]
[2 1433538780000]
[4 1434607980000]
[2 1433481420000]
[2 1435148820000]
[1 1433994840000]
[4 1435476420000]
[2 1435405440000]
[3 1433553960000]
[1 1433764800000]
[2 1433542920000]
[2 1435425840000]
[1 1434731340000]
[5 1433846040000]
[0 1434582480000]
[0 1435514580000]
[2 1434812580000]
[3 1434521820000]
[4 1434166320000]
[0 1435103460000]
[0 1434291000000]
[3 1433517180000]
[1 1433383980000]
[1 1435210860000]
[0 1434403920000]
[4 1433714580000]
[2 1433954940000]
[5 1435044600000]
[0 1435365360000]
[1 1434212880000]
[1 1434920580000]
[0 1433551620000]
[2 1433494440000]
[5 1434398340000]
[4 1433154660000]
[0 1435334040000]
[1 1435123680000]
[0 1433674140000]
[4 1434714240000]
[3 1435336860000]
[5 1433377980000]
[1 1434252120000]
[4 1435038120000]
[3 1434278880000]
[0 1433366220000]
[0 1434029880000]
[4 1433789280000]
[2 1435343340000]
[4 1434343500000]
[5 1433398500000]
[3 1434805860000]
[1 1435215180000]
[3 1435010160000]
[1 1434436140000]
[5 1434972240000]
[5 1434851640000]
[1 1434107400000]
[4 1435492320000]]]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment