Skip to content
Snippets Groups Projects
Unverified Commit 75d26ad7 authored by Simon Belak's avatar Simon Belak Committed by GitHub
Browse files

Merge pull request #9765 from metabase/druid-code-cleanup

Druid code cleanup
parents 049fa818 51765b09
Branches
Tags
No related merge requests found
......@@ -50,28 +50,33 @@
(`:settings` is merged in from the outer query as well so we can access timezone info)."
nil)
(defn- get-timezone-id [] (or (get-in *query* [:settings :report-timezone]) "UTC"))
(defn- get-timezone-id
[]
(or (get-in *query* [:settings :report-timezone]) "UTC"))
(defn- query-type-dispatch-fn [query-type & _] query-type)
(defn- query-type-dispatch-fn
[query-type & _]
query-type)
(defmulti ^:private
->rvalue
(defmulti ^:private ->rvalue
"Convert something to an 'rvalue`, i.e. a value that could be used in the right-hand side of an assignment expression.
(let [x 100] ...) ; x is the lvalue; 100 is the rvalue"
{:arglists '([x])}
mbql.u/dispatch-by-clause-name-or-class)
(defmethod ->rvalue nil [_]
(defmethod ->rvalue nil
[_]
nil)
(defmethod ->rvalue Object [this]
(defmethod ->rvalue Object
[this]
this)
(defmethod ->rvalue :aggregation [[_ index]]
(defmethod ->rvalue :aggregation
[[_ index]]
(let [[ag-type :as ag] (nth (:aggregation *query*) index)]
(cond
(= [:count] ag)
:count
......@@ -84,33 +89,39 @@
:else
(throw (Exception. "Unknown aggregation type!")))))
(defmethod ->rvalue :field-id [[_ field-id]]
(defmethod ->rvalue :field-id
[[_ field-id]]
(:name (qp.store/field field-id)))
(defmethod ->rvalue :datetime-field [[_ field]]
(defmethod ->rvalue :datetime-field
[[_ field]]
(->rvalue field))
(defmethod ->rvalue :absolute-datetime [[_ timestamp unit]]
(defmethod ->rvalue :absolute-datetime
[[_ timestamp unit]]
(du/date->iso-8601 (du/date-trunc unit timestamp (get-timezone-id))))
;; TODO - not 100% sure how to handle times here, just treating it exactly like a date will have to do for now
(defmethod ->rvalue :time [[_ time unit]]
(defmethod ->rvalue :time
[[_ time unit]]
(du/date->iso-8601 (du/date-trunc unit time (get-timezone-id))))
(defmethod ->rvalue :relative-datetime [[_ amount unit]]
(defmethod ->rvalue :relative-datetime
[[_ amount unit]]
(du/date->iso-8601 (du/date-trunc unit (du/relative-date unit amount) (get-timezone-id))))
(defmethod ->rvalue :value [[_ value]]
(defmethod ->rvalue :value
[[_ value]]
(->rvalue value))
(defmulti ^:private
dimension-or-metric?
(defmulti ^:private dimension-or-metric?
"Is this field clause a `:dimension` or `:metric`?"
{:arglists '([field-clause])}
mbql.u/dispatch-by-clause-name-or-class)
(defmethod dimension-or-metric? :field-id [[_ field-id]]
(defmethod dimension-or-metric? :field-id
[[_ field-id]]
(let [{base-type :base_type} (qp.store/field field-id)]
(cond
(isa? base-type :type/Text) :dimension
......@@ -118,7 +129,8 @@
(isa? base-type :type/Integer) :metric
(isa? base-type :type/DruidHyperUnique) :metric)))
(defmethod dimension-or-metric? :datetime-field [[_ field]]
(defmethod dimension-or-metric? :datetime-field
[[_ field]]
(dimension-or-metric? field))
......@@ -140,7 +152,8 @@
;;; ---------------------------------------------- handle-source-table -----------------------------------------------
(defn- handle-source-table [_ {source-table-id :source-table} updated-query]
(defn- handle-source-table
[_ {source-table-id :source-table} updated-query]
(let [{source-table-name :name} (qp.store/table source-table-id)]
(assoc-in updated-query [:query :dataSource] source-table-name)))
......@@ -212,17 +225,15 @@
:lowerStrict (not inclusive?)
:upperStrict (not inclusive?)})
(defn- filter-fields-are-dimensions? [fields]
(reduce
#(and %1 %2)
true
(for [field fields]
(or
(not= (dimension-or-metric? field) :metric)
(log/warn
(u/format-color 'red
(tru "WARNING: Filtering only works on dimensions! ''{0}'' is a metric. Ignoring filter."
(->rvalue field))))))))
(defn- filter-fields-are-dimensions?
[fields]
(every? true? (for [field fields]
(or
(not= (dimension-or-metric? field) :metric)
(log/warn
(u/format-color 'red
(tru "WARNING: Filtering only works on dimensions! ''{0}'' is a metric. Ignoring filter."
(->rvalue field))))))))
(defmulti ^:private parse-filter
{:arglists '([filter-clause])}
......@@ -238,53 +249,68 @@
(not-any? (partial mbql.u/is-clause? :datetime-field) fields))
clause-name))))
(defmethod parse-filter nil [_] nil)
(defmethod parse-filter nil
[_]
nil)
(defmethod parse-filter :between [[_ field min-val max-val]]
(defmethod parse-filter :between
[[_ field min-val max-val]]
(filter:bound field, :lower min-val, :upper max-val))
(defmethod parse-filter :contains [[_ field string-or-field options]]
(defmethod parse-filter :contains
[[_ field string-or-field options]]
{:type :search
:dimension (->rvalue field)
:query {:type :contains
:value (->rvalue string-or-field)
:caseSensitive (get options :case-sensitive true)}})
(defmethod parse-filter :starts-with [[_ field string-or-field options]]
(defmethod parse-filter :starts-with
[[_ field string-or-field options]]
(filter:like field
(str (escape-like-filter-pattern (->rvalue string-or-field)) \%)
(get options :case-sensitive true)))
(defmethod parse-filter :ends-with [[_ field string-or-field options]]
(defmethod parse-filter :ends-with
[[_ field string-or-field options]]
(filter:like field
(str \% (escape-like-filter-pattern (->rvalue string-or-field)))
(get options :case-sensitive true)))
(defmethod parse-filter := [[_ field value-or-field]]
(defmethod parse-filter :=
[[_ field value-or-field]]
(filter:= field value-or-field))
(defmethod parse-filter :!= [[_ field value-or-field]]
(defmethod parse-filter :!=
[[_ field value-or-field]]
(filter:not (filter:= field value-or-field)))
(defmethod parse-filter :< [[_ field value-or-field]]
(defmethod parse-filter :<
[[_ field value-or-field]]
(filter:bound field, :upper value-or-field, :inclusive? false))
(defmethod parse-filter :> [[_ field value-or-field]]
(defmethod parse-filter :>
[[_ field value-or-field]]
(filter:bound field, :lower value-or-field, :inclusive? false))
(defmethod parse-filter :<= [[_ field value-or-field]]
(defmethod parse-filter :<=
[[_ field value-or-field]]
(filter:bound field, :upper value-or-field))
(defmethod parse-filter :>= [[_ field value-or-field]]
(defmethod parse-filter :>=
[[_ field value-or-field]]
(filter:bound field, :lower value-or-field))
(defmethod parse-filter :and [[_ & args]]
(defmethod parse-filter :and
[[_ & args]]
{:type :and, :fields (filterv identity (map parse-filter args))})
(defmethod parse-filter :or [[_ & args]]
(defmethod parse-filter :or
[[_ & args]]
{:type :or, :fields (filterv identity (map parse-filter args))})
(defmethod parse-filter :not [[_ subclause]]
(defmethod parse-filter :not
[[_ subclause]]
(when-let [subclause (parse-filter subclause)]
(filter:not subclause)))
......@@ -299,7 +325,8 @@
(when (seq more)
(apply make-intervals more)))))
(defn- parse-filter-subclause:intervals [[filter-type field value maybe-max-value]]
(defn- parse-filter-subclause:intervals
[[filter-type field value maybe-max-value]]
(when (mbql.u/is-clause? :datetime-field field)
(case filter-type
;; BETWEEN "2015-12-09", "2015-12-11" -> ["2015-12-09/2015-12-12"], because BETWEEN is inclusive
......@@ -317,7 +344,8 @@
;; <= "2015-12-11" -> ["-5000/2015-12-12"]
:<= (make-intervals nil (mbql.u/add-datetime-units value 1)))))
(defn- parse-filter-clause:intervals [[compound-type & subclauses, :as clause]]
(defn- parse-filter-clause:intervals
[[compound-type & subclauses, :as clause]]
(if-not (#{:and :or :not} compound-type)
(parse-filter-subclause:intervals clause)
(let [subclauses (filterv identity (mapcat parse-filter-clause:intervals subclauses))]
......@@ -325,14 +353,15 @@
(case compound-type
;; A date can't be in more than one interval, so ANDing them together doesn't really make sense. In this
;; situation, just ignore all intervals after the first
:and (do (when (> (count subclauses) 1)
(log/warn
(u/format-color 'red
(str
(tru "WARNING: A date can't belong to multiple discrete intervals, so ANDing them together doesn't make sense.")
"\n"
(tru "Ignoring these intervals: {0}" (rest subclauses))) )))
[(first subclauses)])
:and (do
(when (> (count subclauses) 1)
(log/warn
(u/format-color 'red
(str
(tru "WARNING: A date can't belong to multiple discrete intervals, so ANDing them together doesn't make sense.")
"\n"
(tru "Ignoring these intervals: {0}" (rest subclauses))) )))
[(first subclauses)])
;; Ok to specify multiple intervals for OR
:or subclauses
;; We should never get to this point since the all non-string negations should get automatically rewritten
......@@ -340,7 +369,8 @@
:not (log/warn (u/format-color 'red (tru "WARNING: Don't know how to negate: {0}" clause))))))))
(defn- handle-filter [_ {filter-clause :filter} updated-query]
(defn- handle-filter
[_ {filter-clause :filter} updated-query]
(if-not filter-clause
updated-query
(let [filter (parse-filter filter-clause)
......@@ -353,7 +383,8 @@
;;; ----------------------------------------------- handle-aggregation -----------------------------------------------
(defn- expression->field-names [[_ & args]]
(defn- expression->field-names
[[_ & args]]
{:post [(every? (some-fn keyword? string?) %)]}
(flatten (for [arg args
:when (or (field? arg)
......@@ -362,13 +393,15 @@
(mbql.u/is-clause? #{:+ :- :/ :*} arg) (expression->field-names arg)
(field? arg) (->rvalue arg)))))
(defn- expression-arg->js [arg default-value]
(defn- expression-arg->js
[arg default-value]
(if-not (field? arg)
arg
(js/or (js/parse-float (->rvalue arg))
default-value)))
(defn- expression->js [[operator & args] default-value]
(defn- expression->js
[[operator & args] default-value]
(apply (case operator
:+ js/+
:- js/-
......@@ -377,7 +410,8 @@
(for [arg args]
(expression-arg->js arg default-value))))
(defn- ag:doubleSum:expression [[operator :as expression] output-name]
(defn- ag:doubleSum:expression
[[operator :as expression] output-name]
(let [field-names (expression->field-names expression)]
{:type :javascript
:name output-name
......@@ -389,7 +423,8 @@
:fnCombine (js/function [:x :y]
(js/return (js/+ :x :y)))}))
(defn- ag:doubleSum [field-clause output-name]
(defn- ag:doubleSum
[field-clause output-name]
(if (mbql.u/is-clause? #{:+ :- :/ :*} field-clause)
(ag:doubleSum:expression field-clause output-name)
;; metrics can use the built-in :doubleSum aggregator, but for dimensions we have to roll something that does the
......@@ -405,7 +440,8 @@
:fnAggregate "function(current, x) { return current + (parseFloat(x) || 0); }"
:fnCombine "function(x, y) { return x + y; }"})))
(defn- ag:doubleMin:expression [expression output-name]
(defn- ag:doubleMin:expression
[expression output-name]
(let [field-names (expression->field-names expression)]
{:type :javascript
:name output-name
......@@ -417,7 +453,8 @@
:fnCombine (js/function [:x :y]
(js/return (js/fn-call :Math.min :x :y)))}))
(defn- ag:doubleMin [field-clause output-name]
(defn- ag:doubleMin
[field-clause output-name]
(if (mbql.u/is-clause? #{:+ :- :/ :*} field-clause)
(ag:doubleMin:expression field-clause output-name)
(case (dimension-or-metric? field-clause)
......@@ -431,7 +468,8 @@
:fnAggregate "function(current, x) { return Math.min(current, (parseFloat(x) || Number.MAX_VALUE)); }"
:fnCombine "function(x, y) { return Math.min(x, y); }"})))
(defn- ag:doubleMax:expression [expression output-name]
(defn- ag:doubleMax:expression
[expression output-name]
(let [field-names (expression->field-names expression)]
{:type :javascript
:name output-name
......@@ -443,7 +481,8 @@
:fnCombine (js/function [:x :y]
(js/return (js/fn-call :Math.max :x :y)))}))
(defn- ag:doubleMax [field output-name]
(defn- ag:doubleMax
[field output-name]
(if (mbql.u/is-clause? #{:+ :- :/ :*} field)
(ag:doubleMax:expression field output-name)
(case (dimension-or-metric? field)
......@@ -457,7 +496,9 @@
:fnAggregate "function(current, x) { return Math.max(current, (parseFloat(x) || Number.MIN_VALUE)); }"
:fnCombine "function(x, y) { return Math.max(x, y); }"})))
(defn- ag:filtered [filtr aggregator] {:type :filtered, :filter filtr, :aggregator aggregator})
(defn- ag:filtered
[filtr aggregator]
{:type :filtered, :filter filtr, :aggregator aggregator})
(defn- ag:distinct
[field output-name]
......@@ -470,17 +511,21 @@
:fieldNames [(->rvalue field)]}))
(defn- ag:count
([output-name] {:type :count, :name output-name})
([field output-name] (ag:filtered (filter:not (filter:nil? field))
(ag:count output-name))))
([output-name]
{:type :count, :name output-name})
([field output-name]
(ag:filtered (filter:not (filter:nil? field)) (ag:count output-name))))
(defn- ag:countWhere
([pred output-name] (ag:filtered (parse-filter pred)
(ag:count output-name))))
[pred output-name]
(ag:filtered (parse-filter pred) (ag:count output-name)))
(defn- ag:sumWhere
([field pred output-name] (ag:filtered (parse-filter pred)
(ag:doubleSum field output-name))))
[field pred output-name]
(ag:filtered (parse-filter pred) (ag:doubleSum field output-name)))
(def ^:private ^{:arglists '([prefix])} genname
(comp name gensym))
(defn- create-aggregation-clause
[output-name ag-type ag-field args]
......@@ -494,8 +539,8 @@
[:count _] [[(or output-name-kwd :count)] {:aggregations [(ag:count ag-field (or (name output-name) :count))]}]
[:avg _] (let [count-name (name (gensym "___count_"))
sum-name (name (gensym "___sum_"))]
[:avg _] (let [count-name (genname "___count_")
sum-name (genname "___sum_")]
[[(keyword count-name) (keyword sum-name) (or output-name-kwd :avg)]
{:aggregations [(ag:count ag-field count-name)
(ag:doubleSum ag-field sum-name)]
......@@ -512,8 +557,8 @@
[:count-where _] [[(or output-name-kwd :count-where)]
{:aggregations [(ag:countWhere ag-field output-name-kwd)]}]
[:share _] (let [total-count-name (name (gensym "___total_count_"))
true-count-name (name (gensym "___true_count_"))]
[:share _] (let [total-count-name (genname "___total_count_")
true-count-name (genname "___true_count_")]
[[(keyword total-count-name) (keyword true-count-name) (or output-name-kwd :share)]
{:aggregations [(ag:count total-count-name)
(ag:countWhere ag-field true-count-name)]
......@@ -532,11 +577,12 @@
[:max _] [[(or output-name-kwd :max)]
{:aggregations [(ag:doubleMax ag-field (or output-name :max))]}])))
(defn- handle-aggregation [query-type ag-clause updated-query]
(let [output-name (annotate/aggregation-name ag-clause)
(defn- handle-aggregation
[query-type ag-clause updated-query]
(let [output-name (annotate/aggregation-name ag-clause)
[ag-type ag-field & args] (mbql.u/match-one ag-clause
[:named ag _] (recur ag)
[_ _ & _] &match)]
[:named ag _] (recur ag)
[_ _ & _] &match)]
(if-not (isa? query-type ::ag-query)
updated-query
(let [[projections ag-clauses] (create-aggregation-clause output-name ag-type ag-field args)]
......@@ -544,27 +590,27 @@
(update :projections #(vec (concat % projections)))
(update :query #(merge-with concat % ag-clauses)))))))
(defn- add-expression-aggregation-output-names [[operator & args :as expression]]
(defn- add-expression-aggregation-output-names
[[operator & args :as expression]]
(if (mbql.u/is-clause? :named expression)
[:named (add-expression-aggregation-output-names (second expression)) (last expression)]
(apply
vector
operator
(for [arg args]
(cond
(number? arg)
arg
(into [operator]
(for [arg args]
(cond
(number? arg)
arg
(mbql.u/is-clause? :named arg)
arg
(mbql.u/is-clause? :named arg)
arg
(mbql.u/is-clause? #{:count :avg :distinct :stddev :sum :min :max} arg)
[:named arg (name (gensym (str "___" (name (first arg)) "_")))]
(mbql.u/is-clause? #{:count :avg :distinct :stddev :sum :min :max} arg)
[:named arg (name (gensym (str "___" (name (first arg)) "_")))]
(mbql.u/is-clause? #{:+ :- :/ :*} arg)
(add-expression-aggregation-output-names arg))))))
(mbql.u/is-clause? #{:+ :- :/ :*} arg)
(add-expression-aggregation-output-names arg))))))
(defn- expression-post-aggregation [[operator & args, :as expression]]
(defn- expression-post-aggregation
[[operator & args, :as expression]]
(if (mbql.u/is-clause? :named expression)
;; If it's a named expression, we want to preserve the included name, so recurse, but merge in the name
(merge (expression-post-aggregation (second expression))
......@@ -594,12 +640,14 @@
(expression->actual-ags arg)
[arg]))))
(defn- unwrap-name [x]
(defn- unwrap-name
[x]
(if (mbql.u/is-clause? :named x)
(second x)
x))
(defn- handle-expression-aggregation [query-type [operator & args, :as expression] updated-query]
(defn- handle-expression-aggregation
[query-type [operator & args, :as expression] updated-query]
;; filter out constants from the args list
(let [expression (add-expression-aggregation-output-names expression)
;; The QP will automatically add a generated name to the expression, if it's there, unwrap it before looking
......@@ -611,7 +659,8 @@
(update :projections conj (keyword (:name post-agg)))
(update :query #(merge-with concat % {:postAggregations [post-agg]})))))
(defn- handle-aggregations [query-type {aggregations :aggregation} updated-query]
(defn- handle-aggregations
[query-type {aggregations :aggregation} updated-query]
(let [aggregations (mbql.u/pre-alias-and-uniquify-aggregations annotate/aggregation-name aggregations)]
(loop [[ag & more] aggregations, query updated-query]
(cond
......@@ -631,8 +680,7 @@
;;; ------------------------------------------------ handle-breakout -------------------------------------------------
(defmulti ^:private
->dimension-rvalue
(defmulti ^:private ->dimension-rvalue
"Format `Field` for use in a `:dimension` or `:dimensions` clause."
{:arglists '([field-clause])}
mbql.u/dispatch-by-clause-name-or-class)
......@@ -693,23 +741,24 @@
"}")
:year (extract:timeFormat "yyyy")))
(defn- unit->granularity [unit]
(conj {:type "period"
:period (case unit
:minute "PT1M"
:hour "PT1H"
:day "P1D"
:week "P1W"
:month "P1M"
:quarter "P3M"
:year "P1Y")
:timeZone (get-timezone-id)}
;; Druid uses Monday for the start of its weekly calculations. Metabase uses Sundays. When grouping by week,
;; the origin keypair will use the date specified as it's start of the week. The below date is the first
;; Sunday after Epoch. The date itself isn't significant, it just uses it to figure out what day it should
;; start from.
(when (= :week unit)
{:origin "1970-01-04T00:00:00Z"})))
(defn- unit->granularity
[unit]
(merge {:type "period"
:period (case unit
:minute "PT1M"
:hour "PT1H"
:day "P1D"
:week "P1W"
:month "P1M"
:quarter "P3M"
:year "P1Y")
:timeZone (get-timezone-id)}
;; Druid uses Monday for the start of its weekly calculations. Metabase uses Sundays. When grouping by week,
;; the origin keypair will use the date specified as it's start of the week. The below date is the first
;; Sunday after Epoch. The date itself isn't significant, it just uses it to figure out what day it should
;; start from.
(when (= :week unit)
{:origin "1970-01-04T00:00:00Z"})))
(def ^:private units-that-need-post-processing-int-parsing
"`extract:timeFormat` always returns a string; there are cases where we'd like to return an integer instead, such as
......@@ -727,13 +776,20 @@
:quarter-of-year
:year})
(defmethod ->dimension-rvalue nil [_] (->rvalue nil))
(defmethod ->dimension-rvalue nil
[_]
(->rvalue nil))
(defmethod ->dimension-rvalue Object [this] (->rvalue this))
(defmethod ->dimension-rvalue Object
[this]
(->rvalue this))
(defmethod ->dimension-rvalue :field-id [this] (->rvalue this))
(defmethod ->dimension-rvalue :field-id
[this]
(->rvalue this))
(defmethod ->dimension-rvalue :datetime-field [[_ _ unit]]
(defmethod ->dimension-rvalue :datetime-field
[[_ _ unit]]
{:type :extraction
:dimension :__time
;; :timestamp is a special case, and we need to do an 'extraction' against the secret special value :__time to get
......@@ -749,20 +805,24 @@
query-type-dispatch-fn)
;; only topN , grouped-timeseries & groupBy handle breakouts
(defmethod handle-breakout ::query [_ _ updated-query]
(defmethod handle-breakout ::query
[_ _ updated-query]
updated-query)
(defmethod handle-breakout ::grouped-timeseries [_ {[breakout-field] :breakout} updated-query]
(defmethod handle-breakout ::grouped-timeseries
[_ {[breakout-field] :breakout} updated-query]
(assoc-in updated-query [:query :granularity] (unit->granularity (:unit breakout-field))))
(defn- field-clause->name [field-clause]
(defn- field-clause->name
[field-clause]
(when field-clause
(let [id (mbql.u/field-clause->id-or-literal field-clause)]
(if (integer? id)
(:name (qp.store/field id))
id))))
(defmethod handle-breakout ::topN [_ {[breakout-field] :breakout} updated-query]
(defmethod handle-breakout ::topN
[_ {[breakout-field] :breakout} updated-query]
(let [dim-rvalue (->dimension-rvalue breakout-field)]
(-> updated-query
(update :projections conj (keyword (if (and (map? dim-rvalue)
......@@ -771,7 +831,8 @@
(field-clause->name breakout-field))))
(assoc-in [:query :dimension] dim-rvalue))))
(defmethod handle-breakout ::groupBy [_ {breakout-fields :breakout} updated-query]
(defmethod handle-breakout ::groupBy
[_ {breakout-fields :breakout} updated-query]
(-> updated-query
(update :projections into (for [breakout-field breakout-fields]
(let [dim-rvalue (->dimension-rvalue breakout-field)]
......@@ -789,7 +850,8 @@
{:arglists '([query-type original-query updated-query])}
query-type-dispatch-fn)
(defmethod handle-order-by ::query [_ _ updated-query]
(defmethod handle-order-by ::query
[_ _ updated-query]
(log/warn
(u/format-color 'red
(tru "Sorting with Druid is only allowed in queries that have one or more breakout columns. Ignoring :order-by clause.")))
......@@ -797,9 +859,7 @@
(defmethod handle-order-by ::topN
[_
{[[ag-type]] :aggregation, [breakout-field] :breakout, [[direction field]] :order-by}
updated-query]
[_ {[[ag-type]] :aggregation, [breakout-field] :breakout, [[direction field]] :order-by} updated-query]
(let [field (->rvalue field)
breakout-field (->rvalue breakout-field)
sort-by-breakout? (= field breakout-field)
......@@ -810,7 +870,8 @@
[false :asc] {:type :inverted, :metric ag-field}
[false :desc] ag-field))))
(defmethod handle-order-by ::groupBy [_ {:keys [order-by]} updated-query]
(defmethod handle-order-by ::groupBy
[_ {:keys [order-by]} updated-query]
(assoc-in updated-query [:query :limitSpec :columns] (vec (for [[direction field] order-by]
{:dimension (->rvalue field)
:direction (case direction
......@@ -825,14 +886,17 @@
(mbql.u/datetime-field? (qp.store/field (second field))))))
;; Handle order by timstamp field
(defn- handle-order-by-timestamp [field direction updated-query]
(defn- handle-order-by-timestamp
[field direction updated-query]
(assoc-in updated-query [:query :descending] (and (datetime-field? field)
(= direction :desc))))
(defmethod handle-order-by ::grouped-timeseries [_ {[[direction field]] :order-by} updated-query]
(defmethod handle-order-by ::grouped-timeseries
[_ {[[direction field]] :order-by} updated-query]
(handle-order-by-timestamp field direction updated-query))
(defmethod handle-order-by ::select [_ {[[direction field]] :order-by} updated-query]
(defmethod handle-order-by ::select
[_ {[[direction field]] :order-by} updated-query]
(handle-order-by-timestamp field direction updated-query))
......@@ -842,7 +906,8 @@
{:arglists '([query-type original-query updated-query])}
query-type-dispatch-fn)
(defmethod handle-fields ::query [_ {fields :fields} updated-query]
(defmethod handle-fields ::query
[_ {fields :fields} updated-query]
(when fields
(log/warn
(u/format-color 'red
......@@ -850,16 +915,20 @@
(tru "WARNING: It only makes sense to specify :fields for a query with no aggregation. Ignoring the clause."))))
updated-query)
(defmethod handle-fields ::select [_ {fields :fields} updated-query]
(defmethod handle-fields ::select
[_ {fields :fields} updated-query]
(if-not (seq fields)
updated-query
(loop [dimensions [], metrics [], projections (:projections updated-query), [field & more] fields]
(loop [dimensions []
metrics []
projections (:projections updated-query)
[field & more] fields]
(cond
;; If you specify nil or empty `:dimensions` or `:metrics` Druid will just return all of the ones available.
;; In cases where we don't want anything to be returned in one or the other, we'll ask for a `:___dummy`
;; column instead. Druid happily returns `nil` for the column in every row, and it will get auto-filtered out
;; of the results so the User will never see it.
(not field)
(nil? field)
(-> updated-query
(assoc :projections (conj projections :timestamp))
(assoc-in [:query :dimensions] (or (seq dimensions) [:___dummy]))
......@@ -884,24 +953,28 @@
{:arglists '([query-type original-query updated-query])}
query-type-dispatch-fn)
(defmethod handle-limit ::select [_ {limit :limit} updated-query]
(defmethod handle-limit ::select
[_ {limit :limit} updated-query]
(if-not limit
updated-query
(assoc-in updated-query [:query :pagingSpec :threshold] limit)))
(defmethod handle-limit ::timeseries [_ {limit :limit} updated-query]
(defmethod handle-limit ::timeseries
[_ {limit :limit} updated-query]
(when limit
(log/warn
(u/format-color 'red
(tru "WARNING: Druid does not allow limitSpec in time series queries. Ignoring the LIMIT clause."))))
updated-query)
(defmethod handle-limit ::topN [_ {limit :limit} updated-query]
(defmethod handle-limit ::topN
[_ {limit :limit} updated-query]
(if-not limit
updated-query
(assoc-in updated-query [:query :threshold] limit)))
(defmethod handle-limit ::groupBy [_ {limit :limit} updated-query]
(defmethod handle-limit ::groupBy
[_ {limit :limit} updated-query]
(if-not limit
updated-query
(-> updated-query
......@@ -917,7 +990,8 @@
{:arglists '([query-type original-query updated-query])}
query-type-dispatch-fn)
(defmethod handle-page ::query [_ {page-clause :page} updated-query]
(defmethod handle-page ::query
[_ {page-clause :page} updated-query]
(when page-clause
(log/warn (u/format-color 'red "WARNING: 'page' is not yet implemented.")))
updated-query)
......@@ -927,8 +1001,7 @@
;;; | Build + Log + Process Query |
;;; +----------------------------------------------------------------------------------------------------------------+
(def ^:private ^:const timeseries-units
#{:minute :hour :day :week :month :quarter :year})
(def ^:private ^:const timeseries-units #{:minute :hour :day :week :month :quarter :year})
(defn- druid-query-type
"What type of Druid query type should we perform?"
......@@ -949,7 +1022,8 @@
[:many _ _] ::groupBy)))
(defn- build-druid-query [original-query]
(defn- build-druid-query
[original-query]
{:pre [(map? original-query)]}
(let [query-type (druid-query-type original-query)]
(reduce (fn [updated-query f]
......@@ -972,23 +1046,25 @@
{:arglists '([query-type projections timezone-and-middleware-settings results])}
query-type-dispatch-fn)
(defn- post-process-map [projections results]
(defn- post-process-map
[projections results]
{:projections projections
:results results})
(def ^:private druid-ts-format (tformat/formatters :date-time))
(defn- parse-timestamp
[timestamp]
(->> timestamp (tformat/parse druid-ts-format) tcoerce/to-date))
(def ^:private ^{:arglists '([timestamp])} parse-timestamp
(comp tcoerce/to-date (partial tformat/parse druid-ts-format)))
(defn- reformat-timestamp [timestamp target-formatter]
(defn- reformat-timestamp
[timestamp target-formatter]
(->> timestamp
(tformat/parse druid-ts-format)
(tformat/unparse target-formatter)))
(defmethod post-process ::select [_ projections {:keys [timezone middleware]} results]
(let [target-formater (and timezone (tformat/with-zone druid-ts-format timezone))
(defmethod post-process ::select
[_ projections {:keys [timezone middleware]} results]
(let [target-formater (some->> timezone (tformat/with-zone druid-ts-format))
update-ts-fn (cond
(not (:format-rows? middleware true))
#(update % :timestamp parse-timestamp)
......@@ -1005,17 +1081,20 @@
(map (comp update-ts-fn :event))
(post-process-map projections))))
(defmethod post-process ::total [_ projections _ results]
(defmethod post-process ::total
[_ projections _ results]
(post-process-map projections (map :result results)))
(defmethod post-process ::topN [_ projections {:keys [middleware]} results]
(defmethod post-process ::topN
[_ projections {:keys [middleware]} results]
(post-process-map projections
(let [results (-> results first :result)]
(if (:format-rows? middleware true)
results
(map #(u/update-when % :timestamp parse-timestamp) results)))))
(defmethod post-process ::groupBy [_ projections {:keys [middleware]} results]
(defmethod post-process ::groupBy
[_ projections {:keys [middleware]} results]
(post-process-map projections
(if (:format-rows? middleware true)
(map :event results)
......@@ -1023,13 +1102,14 @@
:event)
results))))
(defmethod post-process ::timeseries [_ projections {:keys [middleware]} results]
(defmethod post-process ::timeseries
[_ projections {:keys [middleware]} results]
(post-process-map (conj projections :timestamp)
(let [ts-getter (if (:format-rows? middleware true)
:timestamp
(comp parse-timestamp :timestamp))]
(for [event results]
(conj {:timestamp (ts-getter event)} (:result event))))))
(merge {:timestamp (ts-getter event)} (:result event))))))
(defn- remove-bonus-keys
"Remove keys that start with `___` from the results -- they were temporary, and we don't want to return them."
......@@ -1060,21 +1140,19 @@
(into
(ordered-map/ordered-map)
(for [k columns]
[k
(case k
:distinct___count (comp math/round k)
:timestamp___int (comp (fn [^String s]
(when (seq s)
(Integer/parseInt s)))
k)
k)])))
[k (case k
:distinct___count (comp math/round k)
:timestamp___int (comp (fn [^String s]
(when (some? s)
(Integer/parseInt s)))
k)
k)])))
(defn- utc?
"There are several timezone ids that mean UTC. This will create a TimeZone object from `TIMEZONE` and check to see if
it's a UTC timezone"
[^DateTimeZone timezone]
(.hasSameRules (TimeZone/getTimeZone "UTC")
(.toTimeZone timezone)))
(.hasSameRules (TimeZone/getTimeZone "UTC") (.toTimeZone timezone)))
(defn- resolve-timezone
"Returns the timezone object (either report-timezone or JVM timezone). Returns nil if the timezone is UTC as the
......@@ -1086,17 +1164,17 @@
(defn execute-query
"Execute a query for a Druid DB."
[do-query
{database-id :database
{:keys [query query-type mbql? projections]} :native
middleware :middleware
:as mbql-query}]
[do-query {database-id :database
{:keys [query query-type mbql? projections]} :native
middleware :middleware
:as mbql-query}]
{:pre [query]}
(let [details (:details (qp.store/database))
query (if (string? query)
(json/parse-string query keyword)
query)
query-type (or query-type (keyword "metabase.driver.druid.query-processor" (name (:queryType query))))
query-type (or query-type
(keyword "metabase.driver.druid.query-processor" (name (:queryType query))))
post-proc-map (->> query
(do-query details)
(post-process query-type projections
......@@ -1111,18 +1189,17 @@
column->getter (columns->getter-fns columns)]
;; Leave `:rows` as a sequence of maps and the `annotate` middleware will take care of converting them to vectors
;; in the correct column order
{:rows
(for [row (:results post-proc-map)]
;; use ordered-map to preseve the column ordering because for native queries results are returned in whatever
;; order the keys come out when calling `keys`
(into
(ordered-map/ordered-map)
(for [[column getter] column->getter]
;; rename any occurances of `:timestamp___int` to `:timestamp` in the results so the user doesn't know about
;; our behind-the-scenes conversion and apply any other post-processing on the value such as parsing some
;; units to int and rounding up approximate cardinality values.
[(case column
:timestamp___int :timestamp
:distinct___count :count
column)
(getter row)])))}))
{:rows (for [row (:results post-proc-map)]
;; use ordered-map to preseve the column ordering because for native queries results are returned in whatever
;; order the keys come out when calling `keys`
(into
(ordered-map/ordered-map)
(for [[column getter] column->getter]
;; rename any occurances of `:timestamp___int` to `:timestamp` in the results so the user doesn't know about
;; our behind-the-scenes conversion and apply any other post-processing on the value such as parsing some
;; units to int and rounding up approximate cardinality values.
[(case column
:timestamp___int :timestamp
:distinct___count :count
column)
(getter row)])))}))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment