Skip to content
Snippets Groups Projects
Commit 8cc8c6ae authored by Cam Saül's avatar Cam Saül
Browse files

Fix open connection during Mongo sync

parent 9e1827bb
No related branches found
No related tags found
No related merge requests found
......@@ -45,6 +45,10 @@
first))))
;; Query Processing
(wrap-process-query-middleware [_ qp]
(fn [query]
(qp query))) ; Nothing to do here
(process-query [_ query]
(qp/process-and-run query))
......
......@@ -53,7 +53,13 @@
(process-query [this query]
"Process a native or structured query.
(Don't use this directly; instead, use `metabase.driver/process-query`,
which does things like preprocessing before calling the appropriate implementation.)"))
which does things like preprocessing before calling the appropriate implementation.)")
(wrap-process-query-middleware [this qp-fn]
"Custom QP middleware for this driver.
Like `sync-in-context`, but for running queries rather than syncing. This is basically around-advice for the QP pre and post-processing stages.
This should be used to do things like open DB connections that need to remain open for the duration of post-processing.
This middleware is injected into the QP middleware stack immediately after the Query Expander; in other words, it will receive the expanded query.
See the Mongo driver for and example of how this is intended to be used."))
;; ## ISyncDriverTableFKs Protocol (Optional)
......
......@@ -71,6 +71,11 @@
(can-connect? this {:details details}))
;;; ### QP
(wrap-process-query-middleware [_ qp]
(fn [query]
(with-mongo-connection [^com.mongodb.DBApiLayer conn (:database query)]
(qp query))))
(process-query [_ query]
(qp/process-and-run query))
......
......@@ -36,20 +36,19 @@
(defn process-and-run
"Process and run a MongoDB QUERY."
[{query-type :type, database :database, :as query}]
[{query-type :type, :as query}]
(binding [*query* query]
(with-mongo-connection [_ database]
(case (keyword query-type)
:query (let [generated-query (process-structured (:query query))]
(when-not qp/*disable-qp-logging*
(log/debug (u/format-color 'green "\nMONGER FORM:\n%s\n"
(->> generated-query
(walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
u/pprint-to-str) "\n"))) ; so it's easier to read
{:results (eval generated-query)})
:native (let [results (eval-raw-command (:query (:native query)))]
{:results (if (sequential? results) results
[results])})))))
(case (keyword query-type)
:query (let [generated-query (process-structured (:query query))]
(when-not qp/*disable-qp-logging*
(log/debug (u/format-color 'green "\nMONGER FORM:\n%s\n"
(->> generated-query
(walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
u/pprint-to-str) "\n"))) ; so it's easier to read
{:results (eval generated-query)})
:native (let [results (eval-raw-command (:query (:native query)))]
{:results (if (sequential? results) results
[results])}))))
;; # NATIVE QUERY PROCESSOR
......
......@@ -252,12 +252,12 @@
name]))
(map :id)) ; Return the sorted IDs
;; Concat the Fields clause IDs + the sequence of all Fields ID for the Table.
;; Then filter out ones that appear in breakout clause and remove duplicates
;; which effectively gives us parts #3 and #4 from above.
non-breakout-ids (->> (concat fields-ids all-field-ids)
(filter (complement (partial contains? (set breakout-ids))))
distinct)
;; Get the aggregate column if any
ag-kws (when (and ag-type
(not= ag-type :rows))
(let [ag (if (= ag-type :distinct) :count
ag-type)]
[ag]))
;; Make a helper function that will take a sequence of Field IDs and convert them to corresponding column name keywords.
;; Don't include names that aren't part of RESULT-KWS: we fetch *all* the Fields for a Table regardless of the Query, so
......@@ -268,15 +268,16 @@
(map keyword)
(filter valid-kw?)))
;; Use fn above to get the keyword column names of other non-aggregation fields [#3 and #4]
non-breakout-kws (ids->kws non-breakout-ids)
;; Concat the Fields clause IDs + the sequence of all Fields ID for the Table.
;; Then filter out ones that appear in breakout clause and remove duplicates
;; which effectively gives us parts #3 and #4 from above.
non-breakout-ids (->> (concat fields-ids all-field-ids)
(filter (complement (partial contains? (set breakout-ids))))
distinct)
;; Get the aggregate column if any
ag-kws (when (and ag-type
(not= ag-type :rows))
(let [ag (if (= ag-type :distinct) :count
ag-type)]
[ag]))
;; Use fn above to get the keyword column names of other non-aggregation fields [#3 and #4]
non-breakout-kws (->> (ids->kws non-breakout-ids)
(filter (complement (partial contains? (set ag-kws)))))
;; Collect all other Fields
other-kws (->> result-kws
......@@ -290,9 +291,12 @@
(when-not *disable-qp-logging*
(log/debug (u/format-color 'magenta "Using this ordering: breakout: %s, ag: %s, non-breakout: %s, other: %s"
(vec breakout-kws) (vec ag-kws) (vec non-breakout-kws) (vec other-kws))))
(let [ordered-kws (concat breakout-kws ag-kws non-breakout-kws other-kws)]
(assert (= (set ordered-kws) result-kws)
(format "Order-cols returned invalid results: expected %s, got %s" result-kws (set ordered-kws)))
(assert (and (= (set ordered-kws) result-kws)
(= (count ordered-kws) (count result-kws)))
(format "Order-cols returned invalid results: expected %s, got %s\nbreakout: %s, ag: %s, non-breakout: %s, other: %s" result-kws (vec ordered-kws)
(vec breakout-kws) (vec ag-kws) (vec non-breakout-kws) (vec other-kws)))
ordered-kws)))
(defn- add-fields-extra-info
......@@ -394,11 +398,6 @@
fields (field/unflatten-nested-fields (sel :many :fields [Field :id :table_id :name :description :base_type :special_type :parent_id], :table_id source-table-id, :active true))
ordered-col-kws (order-cols query results fields)]
(assert (= (count (keys (first results))) (count ordered-col-kws))
(format "Order-cols returned an invalid number of keys.\nExpected: %d %s\nGot: %d %s"
(count (keys (first results))) (vec (keys (first results)))
(count ordered-col-kws) (vec ordered-col-kws)))
{:rows (for [row results]
(mapv row ordered-col-kws)) ; might as well return each row and col info as vecs because we're not worried about making
:columns (mapv name ordered-col-kws) ; making them lazy, and results are easier to play with in the REPL / paste into unit tests
......@@ -446,9 +445,11 @@
(qp query))))
(defn- process-structured [{:keys [driver], :as query}]
(let [driver-process-query (partial i/process-query driver)]
(let [driver-process-query (partial i/process-query driver)
driver-wrap-process-query (partial i/wrap-process-query-middleware driver)]
((<<- wrap-catch-exceptions
pre-expand
driver-wrap-process-query
post-add-row-count-and-status
pre-add-implicit-fields
pre-add-implicit-breakout-order-by
......@@ -461,8 +462,10 @@
driver-process-query) query)))
(defn- process-native [{:keys [driver], :as query}]
(let [driver-process-query (partial i/process-query driver)]
(let [driver-process-query (partial i/process-query driver)
driver-wrap-process-query (partial i/wrap-process-query-middleware driver)]
((<<- wrap-catch-exceptions
driver-wrap-process-query
post-add-row-count-and-status
post-convert-unix-timestamps-to-dates
limit
......
......@@ -341,7 +341,8 @@
`(defn ~(vary-meta fn-name assoc :private true) [form#]
(when (non-empty-clause? form#)
(match form#
~@match-forms))))
~@match-forms
form# (throw (Exception. (format ~(format "%s failed: invalid clause: %%s" fn-name) form#)))))))
;; ## -------------------- Aggregation --------------------
......@@ -412,13 +413,13 @@
:min (ph lon-field lon-min)
:max (ph lon-field lon-max)}})
["BETWEEN" (field-id :guard Field?) min max]
["BETWEEN" (field-id :guard Field?) (min :guard identity) (max :guard identity)]
(map->Filter:Between {:filter-type :between
:field (ph field-id)
:min-val (ph field-id min)
:max-val (ph field-id max)})
[(filter-type :guard (partial contains? #{"=" "!=" "<" ">" "<=" ">="})) (field-id :guard Field?) val]
[(filter-type :guard (partial contains? #{"=" "!=" "<" ">" "<=" ">="})) (field-id :guard Field?) (val :guard identity)]
(map->Filter:Field+Value {:filter-type (keyword filter-type)
:field (ph field-id)
:value (ph field-id val)})
......@@ -427,10 +428,7 @@
(map->Filter:Field {:filter-type (case filter-type
"NOT_NULL" :not-null
"IS_NULL" :is-null)
:field (ph field-id)})
clause
(throw (Exception. (format "Invalid filter clause: %s" clause))))
:field (ph field-id)}))
(defparser parse-filter
["AND" & subclauses] (map->Filter {:compound-type :and
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment