From 8cc8c6ae89d1a1f3f931b32eb55253050fbc1131 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cam=20Sau=CC=88l?= <cammsaul@gmail.com>
Date: Thu, 9 Jul 2015 14:15:07 -0700
Subject: [PATCH] Fix open connection during Mongo sync

---
 src/metabase/driver/generic_sql.clj           |  4 ++
 src/metabase/driver/interface.clj             |  8 ++-
 src/metabase/driver/mongo.clj                 |  5 ++
 src/metabase/driver/mongo/query_processor.clj | 25 +++++-----
 src/metabase/driver/query_processor.clj       | 49 ++++++++++---------
 .../driver/query_processor/expand.clj         | 12 ++---
 6 files changed, 59 insertions(+), 44 deletions(-)

diff --git a/src/metabase/driver/generic_sql.clj b/src/metabase/driver/generic_sql.clj
index 505686f5724..f281f15ed2d 100644
--- a/src/metabase/driver/generic_sql.clj
+++ b/src/metabase/driver/generic_sql.clj
@@ -45,6 +45,10 @@
                first))))
 
   ;; Query Processing
+  (wrap-process-query-middleware [_ qp]
+    (fn [query]
+      (qp query))) ; Nothing to do here
+
   (process-query [_ query]
     (qp/process-and-run query))
 
diff --git a/src/metabase/driver/interface.clj b/src/metabase/driver/interface.clj
index 73d45ac648c..928df060f7b 100644
--- a/src/metabase/driver/interface.clj
+++ b/src/metabase/driver/interface.clj
@@ -53,7 +53,13 @@
   (process-query [this query]
     "Process a native or structured query.
      (Don't use this directly; instead, use `metabase.driver/process-query`,
-     which does things like preprocessing before calling the appropriate implementation.)"))
+     which does things like preprocessing before calling the appropriate implementation.)")
+  (wrap-process-query-middleware [this qp-fn]
+    "Custom QP middleware for this driver.
+     Like `sync-in-context`, but for running queries rather than syncing. This is basically around-advice for the QP pre and post-processing stages.
+     This should be used to do things like open DB connections that need to remain open for the duration of post-processing.
+     This middleware is injected into the QP middleware stack immediately after the Query Expander; in other words, it will receive the expanded query.
+     See the Mongo driver for and example of how this is intended to be used."))
 
 
 ;; ## ISyncDriverTableFKs Protocol (Optional)
diff --git a/src/metabase/driver/mongo.clj b/src/metabase/driver/mongo.clj
index 2865a3ef7e0..52c4421a158 100644
--- a/src/metabase/driver/mongo.clj
+++ b/src/metabase/driver/mongo.clj
@@ -71,6 +71,11 @@
     (can-connect? this {:details details}))
 
 ;;; ### QP
+  (wrap-process-query-middleware [_ qp]
+    (fn [query]
+      (with-mongo-connection [^com.mongodb.DBApiLayer conn (:database query)]
+        (qp query))))
+
   (process-query [_ query]
     (qp/process-and-run query))
 
diff --git a/src/metabase/driver/mongo/query_processor.clj b/src/metabase/driver/mongo/query_processor.clj
index 3d14ffb503e..76b42e2694b 100644
--- a/src/metabase/driver/mongo/query_processor.clj
+++ b/src/metabase/driver/mongo/query_processor.clj
@@ -36,20 +36,19 @@
 
 (defn process-and-run
   "Process and run a MongoDB QUERY."
-  [{query-type :type, database :database, :as query}]
+  [{query-type :type, :as query}]
   (binding [*query* query]
-    (with-mongo-connection [_ database]
-      (case (keyword query-type)
-        :query (let [generated-query (process-structured (:query query))]
-                 (when-not qp/*disable-qp-logging*
-                   (log/debug (u/format-color 'green "\nMONGER FORM:\n%s\n"
-                                              (->> generated-query
-                                                   (walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
-                                                   u/pprint-to-str) "\n")))                              ; so it's easier to read
-                 {:results (eval generated-query)})
-        :native (let [results (eval-raw-command (:query (:native query)))]
-                  {:results (if (sequential? results) results
-                                [results])})))))
+    (case (keyword query-type)
+      :query (let [generated-query (process-structured (:query query))]
+               (when-not qp/*disable-qp-logging*
+                 (log/debug (u/format-color 'green "\nMONGER FORM:\n%s\n"
+                                            (->> generated-query
+                                                 (walk/postwalk #(if (symbol? %) (symbol (name %)) %)) ; strip namespace qualifiers from Monger form
+                                                 u/pprint-to-str) "\n"))) ; so it's easier to read
+                {:results (eval generated-query)})
+      :native (let [results (eval-raw-command (:query (:native query)))]
+                {:results (if (sequential? results) results
+                              [results])}))))
 
 
 ;; # NATIVE QUERY PROCESSOR
diff --git a/src/metabase/driver/query_processor.clj b/src/metabase/driver/query_processor.clj
index f49e400b835..42d36b04c1b 100644
--- a/src/metabase/driver/query_processor.clj
+++ b/src/metabase/driver/query_processor.clj
@@ -252,12 +252,12 @@
                                           name]))
                               (map :id)) ; Return the sorted IDs
 
-        ;; Concat the Fields clause IDs + the sequence of all Fields ID for the Table.
-        ;; Then filter out ones that appear in breakout clause and remove duplicates
-        ;; which effectively gives us parts #3 and #4 from above.
-        non-breakout-ids (->> (concat fields-ids all-field-ids)
-                              (filter (complement (partial contains? (set breakout-ids))))
-                              distinct)
+        ;; Get the aggregate column if any
+        ag-kws           (when (and ag-type
+                                    (not= ag-type :rows))
+                           (let [ag (if (= ag-type :distinct) :count
+                                        ag-type)]
+                             [ag]))
 
         ;; Make a helper function that will take a sequence of Field IDs and convert them to corresponding column name keywords.
         ;; Don't include names that aren't part of RESULT-KWS: we fetch *all* the Fields for a Table regardless of the Query, so
@@ -268,15 +268,16 @@
                                     (map keyword)
                                     (filter valid-kw?)))
 
-        ;; Use fn above to get the keyword column names of other non-aggregation fields [#3 and #4]
-        non-breakout-kws (ids->kws non-breakout-ids)
+        ;; Concat the Fields clause IDs + the sequence of all Fields ID for the Table.
+        ;; Then filter out ones that appear in breakout clause and remove duplicates
+        ;; which effectively gives us parts #3 and #4 from above.
+        non-breakout-ids (->> (concat fields-ids all-field-ids)
+                              (filter (complement (partial contains? (set breakout-ids))))
+                              distinct)
 
-        ;; Get the aggregate column if any
-        ag-kws           (when (and ag-type
-                                    (not= ag-type :rows))
-                           (let [ag (if (= ag-type :distinct) :count
-                                        ag-type)]
-                             [ag]))
+        ;; Use fn above to get the keyword column names of other non-aggregation fields [#3 and #4]
+        non-breakout-kws (->> (ids->kws non-breakout-ids)
+                              (filter (complement (partial contains? (set ag-kws)))))
 
         ;; Collect all other Fields
         other-kws        (->> result-kws
@@ -290,9 +291,12 @@
     (when-not *disable-qp-logging*
       (log/debug (u/format-color 'magenta "Using this ordering: breakout: %s, ag: %s, non-breakout: %s, other: %s"
                                  (vec breakout-kws) (vec ag-kws) (vec non-breakout-kws) (vec other-kws))))
+
     (let [ordered-kws (concat breakout-kws ag-kws non-breakout-kws other-kws)]
-      (assert (= (set ordered-kws) result-kws)
-        (format "Order-cols returned invalid results: expected %s, got %s" result-kws (set ordered-kws)))
+      (assert (and (= (set ordered-kws) result-kws)
+                   (= (count ordered-kws) (count result-kws)))
+        (format "Order-cols returned invalid results: expected %s, got %s\nbreakout: %s, ag: %s, non-breakout: %s, other: %s" result-kws (vec ordered-kws)
+                (vec breakout-kws) (vec ag-kws) (vec non-breakout-kws) (vec other-kws)))
       ordered-kws)))
 
 (defn- add-fields-extra-info
@@ -394,11 +398,6 @@
           fields                         (field/unflatten-nested-fields (sel :many :fields [Field :id :table_id :name :description :base_type :special_type :parent_id], :table_id source-table-id, :active true))
           ordered-col-kws                (order-cols query results fields)]
 
-      (assert (= (count (keys (first results))) (count ordered-col-kws))
-              (format "Order-cols returned an invalid number of keys.\nExpected: %d %s\nGot: %d %s"
-                      (count (keys (first results))) (vec (keys (first results)))
-                      (count ordered-col-kws)        (vec ordered-col-kws)))
-
       {:rows    (for [row results]
                   (mapv row ordered-col-kws))                                                      ; might as well return each row and col info as vecs because we're not worried about making
        :columns (mapv name ordered-col-kws)                                                        ; making them lazy, and results are easier to play with in the REPL / paste into unit tests
@@ -446,9 +445,11 @@
       (qp query))))
 
 (defn- process-structured [{:keys [driver], :as query}]
-  (let [driver-process-query (partial i/process-query driver)]
+  (let [driver-process-query      (partial i/process-query driver)
+        driver-wrap-process-query (partial i/wrap-process-query-middleware driver)]
     ((<<- wrap-catch-exceptions
           pre-expand
+          driver-wrap-process-query
           post-add-row-count-and-status
           pre-add-implicit-fields
           pre-add-implicit-breakout-order-by
@@ -461,8 +462,10 @@
           driver-process-query) query)))
 
 (defn- process-native [{:keys [driver], :as query}]
-  (let [driver-process-query (partial i/process-query driver)]
+  (let [driver-process-query      (partial i/process-query driver)
+        driver-wrap-process-query (partial i/wrap-process-query-middleware driver)]
     ((<<- wrap-catch-exceptions
+          driver-wrap-process-query
           post-add-row-count-and-status
           post-convert-unix-timestamps-to-dates
           limit
diff --git a/src/metabase/driver/query_processor/expand.clj b/src/metabase/driver/query_processor/expand.clj
index 61ecf748bae..8e376ec89be 100644
--- a/src/metabase/driver/query_processor/expand.clj
+++ b/src/metabase/driver/query_processor/expand.clj
@@ -341,7 +341,8 @@
   `(defn ~(vary-meta fn-name assoc :private true) [form#]
      (when (non-empty-clause? form#)
        (match form#
-         ~@match-forms))))
+         ~@match-forms
+         form# (throw (Exception. (format ~(format "%s failed: invalid clause: %%s" fn-name) form#)))))))
 
 ;; ## -------------------- Aggregation --------------------
 
@@ -412,13 +413,13 @@
                                      :min   (ph lon-field lon-min)
                                      :max   (ph lon-field lon-max)}})
 
-  ["BETWEEN" (field-id :guard Field?) min max]
+  ["BETWEEN" (field-id :guard Field?) (min :guard identity) (max :guard identity)]
   (map->Filter:Between {:filter-type :between
                         :field       (ph field-id)
                         :min-val     (ph field-id min)
                         :max-val     (ph field-id max)})
 
-  [(filter-type :guard (partial contains? #{"=" "!=" "<" ">" "<=" ">="})) (field-id :guard Field?) val]
+  [(filter-type :guard (partial contains? #{"=" "!=" "<" ">" "<=" ">="})) (field-id :guard Field?) (val :guard identity)]
   (map->Filter:Field+Value {:filter-type (keyword filter-type)
                             :field       (ph field-id)
                             :value       (ph field-id val)})
@@ -427,10 +428,7 @@
   (map->Filter:Field {:filter-type (case filter-type
                                      "NOT_NULL" :not-null
                                      "IS_NULL"  :is-null)
-                      :field       (ph field-id)})
-
-  clause
-  (throw (Exception. (format "Invalid filter clause: %s" clause))))
+                      :field       (ph field-id)}))
 
 (defparser parse-filter
   ["AND" & subclauses] (map->Filter {:compound-type :and
-- 
GitLab