Skip to content
Snippets Groups Projects
Commit ba2b7233 authored by Cam Saül's avatar Cam Saül
Browse files

fix duplicate query processing

parent d7730279
No related branches found
No related tags found
No related merge requests found
......@@ -161,3 +161,6 @@
;; 4. API
;; 4A. API Tweaks as Needed
;; 5. Cleanup + Tests
;; 5A. Cleanup / Dox
;; 5B. Tests
;; 5C. $ notation doesn't handle nested Fields (yet) (or id ? )
......@@ -13,6 +13,7 @@
[metabase.driver :as driver]
(metabase.driver [interface :as i]
[query-processor :as qp])
[metabase.driver.query-processor.expand :as expand]
[metabase.driver.mongo.util :refer [with-mongo-connection *mongo-connection* values->base-type]]
[metabase.models.field :refer [Field]]
[metabase.util :as u])
......@@ -39,7 +40,7 @@
(case (keyword query-type)
:query (let [generated-query (process-structured (:query query))]
(when-not qp/*disable-qp-logging*
(log/debug (u/format-color 'green "\nMONGER FORM:\n\n%s\n"
(log/debug (u/format-color 'green "\nMONGER FORM:\n%s\n"
(->> generated-query
(walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
u/pprint-to-str) "\n"))) ; so it's easier to read
......@@ -85,14 +86,14 @@
~@(filter identity forms)]))
(defn- field->name
[{:keys [field-name subfield]}]
(if subfield (format "%s.%s" field-name subfield)
field-name))
"Return qualified string name of FIELD, e.g. `venue` or `venue.address`."
^String [field]
(apply str (interpose "." (expand/qualified-name-components field))))
(defn- field->$str
"Given a FIELD, return a `$`-qualified field name for use in a Mongo aggregate query, e.g. `\"$user_id\"`."
[field]
(format "$%s" (name (field->name field))))
(format "$%s" (field->name field)))
(defn- aggregation:rows []
`(doall (with-collection ^DBApiLayer *mongo-connection* ~*collection-name*
......
......@@ -50,7 +50,8 @@
(defn- pre-expand [qp]
(fn [query]
(qp (expand/expand *driver* query))))
(qp (assoc (expand/expand *driver* query)
:query-id (str (java.util.UUID/randomUUID))))))
(defn- post-add-row-count-and-status
......@@ -171,8 +172,7 @@
(defn- cumulative-sum [qp]
(fn [query]
(let [[cumulative-sum-field query] (pre-cumulative-sum query)
results (qp query)]
(let [[cumulative-sum-field query] (pre-cumulative-sum query)]
(cond->> (qp query)
cumulative-sum-field (post-cumulative-sum cumulative-sum-field)))))
......@@ -400,7 +400,25 @@
;;
;; Pre-processing then happens in order from bottom-to-top; i.e. POST-ANNOTATE gets to modify the results, then LIMIT, then CUMULATIVE-SUM, etc.
(defn- wrap-guard-multiple-calls
"Throw an exception if a QP function accidentally calls (QP QUERY) more than once."
[qp]
(let [called? (atom false)]
(fn [query]
(assert (not @called?) "(QP QUERY) IS BEING CALLED MORE THAN ONCE!")
(reset! called? true)
(qp query))))
(defn- post-log-results [qp]
(fn [query]
(let [results (qp query)]
(when-not *disable-qp-logging*
(log/debug "\nRESULTS:\n" (u/pprint-to-str 'cyan results)))
results)))
(defn- process-structured [driver query]
(when-not *disable-qp-logging*
(println "PROCESS STRUCTURED!"))
(let [driver-process-query (partial i/process-query driver)]
((<<- wrap-catch-exceptions
pre-expand
......@@ -412,6 +430,8 @@
limit
post-annotate
pre-log-query
post-log-results
wrap-guard-multiple-calls
driver-process-query) query)))
(defn- process-native [driver query]
......@@ -420,11 +440,14 @@
post-add-row-count-and-status
post-convert-unix-timestamps-to-dates
limit
wrap-guard-multiple-calls
driver-process-query) query)))
(defn process
"Process a QUERY and return the results."
[driver query]
(when-not *disable-qp-logging*
(log/info (u/format-color 'blue "\nQUERY:\n%s" (u/pprint-to-str query))))
(binding [*driver* driver]
((case (keyword (:type query))
:native process-native
......
......@@ -54,7 +54,8 @@
parse-breakout
parse-fields
parse-filter
parse-order-by)
parse-order-by
ph)
;; ## -------------------- Protocols --------------------
......@@ -91,7 +92,7 @@
(and clause
(or (not (sequential? clause))
(and (seq clause)
(every? identity clause)))))
(not (every? nil? clause))))))
(defn- parse [query-dict]
(update-in query-dict [:query] #(-<> (assoc %
......@@ -123,18 +124,28 @@
:name :field-name
:special_type :special-type
:base_type :base-type
:table_id :table-id}))
:table_id :table-id
:parent_id :parent-id}))
(defn- resolve-fields
"Resolve the `Fields` in an EXPANDED-QUERY-DICT."
[expanded-query-dict field-ids]
(if-not (seq field-ids) expanded-query-dict ; No need to do a DB call or walk expanded-query-dict if we didn't see any Field IDs
(let [fields (->> (sel :many :id->fields [field/Field :name :base_type :special_type :table_id] :id [in field-ids])
(m/map-vals rename-mb-field-keys))]
(reset! *table-ids* (set (map :table-id (vals fields))))
;; This is performed depth-first so we don't end up walking the newly-created Field/Value objects
;; they may have nil values; this was we don't have to write an implementation of resolve-field for nil
(walk/postwalk #(resolve-field % fields) expanded-query-dict))))
(if-not (seq field-ids)
;; Base case: if there's no field-ids to expand we're done
expanded-query-dict
;; Re-bind *field-ids* in case we need to do recursive Field resolution
(binding [*field-ids* (atom #{})]
(let [fields (->> (sel :many :id->fields [field/Field :name :base_type :special_type :table_id :parent_id] :id [in field-ids])
(m/map-vals rename-mb-field-keys)
(m/map-vals #(assoc % :parent (when (:parent-id %)
(ph (:parent-id %))))))]
(swap! *table-ids* set/union (set (map :table-id (vals fields))))
;; Recurse in case any new [nested] Field placeholders were emitted and we need to do recursive Field resolution
;; We can't use recur here because binding wraps body in try/catch
(resolve-fields (walk/postwalk #(resolve-field % fields) expanded-query-dict)
@*field-ids*)))))
(defn- resolve-database
"Resolve the `Database` in question for an EXPANDED-QUERY-DICT."
......@@ -206,17 +217,40 @@
;; ## -------------------- Field + Value --------------------
(defprotocol IField
"Methods specific to the Query Expander `Field` record type."
(qualified-name-components [this]
"Return a vector of name components of the form `[table-name parent-names... field-name]`"))
;; Field is the expansion of a Field ID in the standard QL
(defrecord Field [^Integer field-id
^String field-name
^Keyword base-type
^Keyword special-type
^Integer table-id
^String table-name]
^String table-name
^Integer parent-id
parent] ; Field once its resolved; FieldPlaceholder before that
IResolve
(resolve-field [this field-id->fields]
(cond
parent (if (= (type parent) Field)
this
(resolve-field parent field-id->fields))
parent-id (assoc this :parent (or (field-id->fields parent-id)
(ph parent-id)))
:else this))
(resolve-table [this table-id->table]
(assoc this :table-name (:name (or (table-id->table table-id)
(throw (Exception. (format "Query expansion failed: could not find table %d." table-id))))))))
(throw (Exception. (format "Query expansion failed: could not find table %d." table-id)))))))
IField
(qualified-name-components [this]
(conj (if parent
(qualified-name-components parent)
[table-name])
field-name)))
(defn- Field?
"Is this a valid value for a `Field` ID in an unexpanded query? (i.e. an integer or `fk->` form)."
......@@ -243,9 +277,13 @@
(defrecord FieldPlaceholder [^Integer field-id]
IResolve
(resolve-field [this field-id->fields]
(->> (field-id->fields field-id)
(merge this)
map->Field)))
(or
;; try to resolve the Field with the ones available in field-id->fields
(some->> (field-id->fields field-id)
(merge this)
map->Field)
;; If that fails just return ourselves as-is
this)))
(defn- parse-value
"Convert the `value` of a `Value` to a date or timestamp if needed.
......@@ -286,12 +324,6 @@
(swap! *fk-field-ids* conj fk-field-id)
(->FieldPlaceholder dest-field-id))
["." (id :guard integer?) subfield]
(do (assert-driver-supports :nested-fields)
(swap! *field-ids* conj id)
(map->FieldPlaceholder {:field-id id
:subfield subfield}))
_ (throw (Exception. (str "Invalid field: " field-id)))))
([field-id value]
(->ValuePlaceholder (:field-id (ph field-id)) value)))
......@@ -367,7 +399,7 @@
;; ### Parsers
(defparser parse-filter-subclause
["INSIDE" lat-field lon-field lat-max lon-min lat-min lon-max]
["INSIDE" (lat-field :guard integer?) (lon-field :guard integer?) (lat-max :guard number?) (lon-min :guard number?) (lat-min :guard number?) (lon-max :guard number?)]
(map->Filter:Inside {:filter-type :inside
:lat {:field (ph lat-field)
:min (ph lat-field lat-min)
......@@ -376,22 +408,25 @@
:min (ph lon-field lon-min)
:max (ph lon-field lon-max)}})
["BETWEEN" field-id min max]
["BETWEEN" (field-id :guard integer?) (min :guard number?) (max :guard number?)]
(map->Filter:Between {:filter-type :between
:field (ph field-id)
:min-val (ph field-id min)
:max-val (ph field-id max)})
[filter-type field-id val]
[(filter-type :guard (partial contains? #{"=" "!=" "<" ">" "<=" ">="})) (field-id :guard integer?) val]
(map->Filter:Field+Value {:filter-type (keyword filter-type)
:field (ph field-id)
:value (ph field-id val)})
[filter-type field-id]
[(filter-type :guard string?) (field-id :guard integer?)]
(map->Filter:Field {:filter-type (case filter-type
"NOT_NULL" :not-null
"IS_NULL" :is-null)
:field (ph field-id)}))
:field (ph field-id)})
clause
(throw (Exception. (format "Invalid filter clause: %s" clause))))
(defparser parse-filter
["AND" & subclauses] (map->Filter {:compound-type :and
......
......@@ -151,17 +151,20 @@
(defn- table-id->field-name->field
"Return a map of lowercased `Field` names -> fields for `Table` with TABLE-ID."
[table-id]
(->> (sel :many :field->obj [Field :name], :table_id table-id, :parent_id nil)
{:pre [(integer? table-id)]}
(->> (binding [*sel-disable-logging* true]
(sel :many :field->obj [Field :name], :table_id table-id, :parent_id nil))
(m/map-keys s/lower-case)))
(defn- db-id->table-name->table
"Return a map of lowercased `Table` names -> Tables for `Database` with DATABASE-ID.
Add a delay `:field-name->field` to each Table that calls `table-id->field-name->field` for that Table."
[database-id]
(->> (sel :many :field->obj [Table :name] :db_id database-id)
{:pre [(integer? database-id)]}
(->> (binding [*sel-disable-logging* true]
(sel :many :field->obj [Table :name] :db_id database-id))
(m/map-keys s/lower-case)
(m/map-vals (fn [table]
(assoc table :field-name->field (delay (table-id->field-name->field (:id table))))))))
(m/map-vals #(assoc % :field-name->field (delay (table-id->field-name->field (:id %)))))))
(defn -temp-db-add-getter-delay
"Add a delay `:table-name->table` to DB that calls `db-id->table-name->table`."
......@@ -174,14 +177,20 @@
With three args, fetch `Field` with FIELD-NAME by recursively fetching `Table` and using its `:field-name->field` delay."
([temp-db table-name]
{:pre [(map? temp-db)
(string? table-name)]}
(string? table-name)]
:post [(map? %)]}
(@(:table-name->table temp-db) table-name))
([temp-db table-name field-name]
{:pre [(string? field-name)]}
{:pre [(string? field-name)]
:post [(map? %)]}
(@(:field-name->field (-temp-get temp-db table-name)) field-name))
([temp-db table-name parent-field-name & nested-field-names]
{:pre [(string? (last nested-field-names))]}
(sel :one :id Field, :name (last nested-field-names), :parent_id (:id (apply -temp-get temp-db table-name parent-field-name (butlast nested-field-names))))))
{:pre [(every? string? nested-field-names)]
:post [(map? %)]}
(binding [*sel-disable-logging* true]
(sel :one Field, :name (last nested-field-names), :parent_id (:id (apply -temp-get temp-db table-name parent-field-name (butlast nested-field-names)))))))
(defn- walk-expand-&
"Walk BODY looking for symbols like `&table` or `&table.field` and expand them to appropriate `-temp-get` forms.
......@@ -201,17 +210,20 @@
form))
body))
(defn with-temp-db* [loader ^DatabaseDefinition dbdef f]
(defn -with-temp-db [loader ^DatabaseDefinition dbdef f]
(let [dbdef (map->DatabaseDefinition (assoc dbdef :short-lived? true))]
(try
(remove-database! loader dbdef)
(let [db (-> (get-or-create-database! loader dbdef)
-temp-db-add-getter-delay)]
(assert db)
(assert (exists? Database :id (:id db)))
(f db))
(binding [*sel-disable-logging* true]
(remove-database! loader dbdef)
(let [db (-> (get-or-create-database! loader dbdef)
-temp-db-add-getter-delay)]
(assert db)
(assert (exists? Database :id (:id db)))
(binding [*sel-disable-logging* false]
(f db))))
(finally
(remove-database! loader dbdef)))))
(binding [*sel-disable-logging* true]
(remove-database! loader dbdef))))))
(defmacro with-temp-db
"Load and sync DATABASE-DEFINITION with DATASET-LOADER and execute BODY with
......@@ -232,6 +244,6 @@
:aggregation [\"count\"]
:filter [\"<\" (:id &events.timestamp) \"1765-01-01\"]}}))"
[[db-binding dataset-loader ^DatabaseDefinition database-definition] & body]
`(with-temp-db* ~dataset-loader ~database-definition
`(-with-temp-db ~dataset-loader ~database-definition
(fn [~db-binding]
~@(walk-expand-& db-binding body))))
......@@ -11,7 +11,8 @@
(defn- partition-tokens [keywords tokens]
(->> (loop [all [], current-split nil, [token & more] tokens]
(cond
(not token) (conj all current-split)
(and (not token)
(not (seq more))) (conj all current-split)
(contains? keywords token) (recur (or (when (seq current-split)
(conj all current-split))
all)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment