Skip to content
Snippets Groups Projects
Commit da7e09f5 authored by Ryan Senior's avatar Ryan Senior
Browse files

Fixed up remapping issues on MongoDB

Specifically added code that matches projections from mongo with the
expected order of results coming back from the database. Also added
code to ensure order by columns are compared correctly after the split
of expand and resolve into two separate middlewares.
parent 8543e1c5
No related branches found
No related tags found
No related merge requests found
......@@ -197,11 +197,14 @@
;;; ### initial projection
(defn- add-initial-projection [query pipeline]
(defn- add-initial-projection [query pipeline-ctx]
(let [all-fields (distinct (annotate/collect-fields query :keep-date-time-fields))]
(when (seq all-fields)
{$project (into (array-map) (for [field all-fields]
{(->lvalue field) (->initial-rvalue field)}))})))
(if-not (seq all-fields)
pipeline-ctx
(let [projections (doall (map #(vector (->lvalue %) (->initial-rvalue %)) all-fields))]
(-> pipeline-ctx
(assoc :projections (doall (map (comp keyword first) projections)))
(update :query conj {$project (into (hash-map) projections)}))))))
;;; ### filter
......@@ -232,9 +235,10 @@
:not (parse-filter-subclause subclause :negate)
nil (parse-filter-subclause clause)))
(defn- handle-filter [{filter-clause :filter} pipeline]
(when filter-clause
{$match (parse-filter-clause filter-clause)}))
(defn- handle-filter [{filter-clause :filter} pipeline-ctx]
(if-not filter-clause
pipeline-ctx
(update pipeline-ctx :query conj {$match (parse-filter-clause filter-clause)})))
;;; ### aggregation
......@@ -254,69 +258,78 @@
:min {$min (->rvalue field)}
:max {$max (->rvalue field)})))
(defn- handle-breakout+aggregation [{breakout-fields :breakout, aggregations :aggregation} pipeline]
(defn- handle-breakout+aggregation [{breakout-fields :breakout, aggregations :aggregation} pipeline-ctx]
(let [aggregations? (seq aggregations)
breakout? (seq breakout-fields)]
(when (or aggregations? breakout?)
(filter identity
[ ;; create a totally sweet made-up column called __group to store the fields we'd like to group by
(when breakout?
{$project (merge {"_id" "$_id"
"___group" (into {} (for [field breakout-fields]
{(->lvalue field) (->rvalue field)}))}
(into {} (for [{ag-field :field} aggregations
:when ag-field]
{(->lvalue ag-field) (->rvalue ag-field)})))})
;; Now project onto the __group and the aggregation rvalue
{$group (merge {"_id" (when breakout?
"$___group")}
(into {} (for [{ag-type :aggregation-type, :as aggregation} aggregations]
{(ag-type->field-name ag-type) (aggregation->rvalue aggregation)})))}
;; Sort by _id (___group)
{$sort {"_id" 1}}
;; now project back to the fields we expect
{$project (merge {"_id" false}
(into {} (for [{ag-type :aggregation-type} aggregations]
{(ag-type->field-name ag-type) (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)}))
(into {} (for [field breakout-fields]
{(->lvalue field) (format "$_id.%s" (->lvalue field))})))}]))))
(if-not (or aggregations? breakout?)
pipeline-ctx
(let [projected-fields (concat (for [{ag-type :aggregation-type} aggregations]
[(ag-type->field-name ag-type) (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)])
(for [field breakout-fields]
[(->lvalue field) (format "$_id.%s" (->lvalue field))]))]
(-> pipeline-ctx
(assoc :projections (doall (map (comp keyword first) projected-fields)))
(update :query into (filter identity
[ ;; create a totally sweet made-up column called __group to store the fields we'd like to group by
(when breakout?
{$project (merge {"_id" "$_id"
"___group" (into {} (for [field breakout-fields]
{(->lvalue field) (->rvalue field)}))}
(into {} (for [{ag-field :field} aggregations
:when ag-field]
{(->lvalue ag-field) (->rvalue ag-field)})))})
;; Now project onto the __group and the aggregation rvalue
{$group (merge {"_id" (when breakout?
"$___group")}
(into {} (for [{ag-type :aggregation-type, :as aggregation} aggregations]
{(ag-type->field-name ag-type) (aggregation->rvalue aggregation)})))}
;; Sort by _id (___group)
{$sort {"_id" 1}}
;; now project back to the fields we expect
{$project (merge {"_id" false}
(into {} projected-fields))}])))))))
;;; ### order-by
(defn- handle-order-by [{:keys [order-by]} pipeline]
(when (seq order-by)
{$sort (into (array-map) (for [{:keys [field direction]} order-by]
{(->lvalue field) (case direction
:ascending 1
:descending -1)}))}))
(defn- handle-order-by [{:keys [order-by]} pipeline-ctx]
(if-not (seq order-by)
pipeline-ctx
(update pipeline-ctx :query conj {$sort (into (hash-map)
(for [{:keys [field direction]} order-by]
[(->lvalue field) (case direction
:ascending 1
:descending -1)]))})))
;;; ### fields
(defn- handle-fields [{:keys [fields]} pipeline]
(when (seq fields)
;; add project _id = false to keep _id from getting automatically returned unless explicitly specified
{$project (into (array-map "_id" false)
(for [field fields]
{(->lvalue field) (->rvalue field)}))}))
(defn- handle-fields [{:keys [fields]} pipeline-ctx]
(if-not (seq fields)
pipeline-ctx
(let [new-projections (doall (map #(vector (->lvalue %) (->rvalue %)) fields))]
(-> pipeline-ctx
(assoc :projections (map (comp keyword first) new-projections))
;; add project _id = false to keep _id from getting automatically returned unless explicitly specified
(update :query conj {$project (merge {"_id" false}
(into (hash-map) new-projections))})))))
;;; ### limit
(defn- handle-limit [{:keys [limit]} pipeline]
(when limit
{$limit limit}))
(defn- handle-limit [{:keys [limit]} pipeline-ctx]
(if-not limit
pipeline-ctx
(update pipeline-ctx :query conj {$limit limit})))
;;; ### page
(defn- handle-page [{{page-num :page items-per-page :items, :as page-clause} :page} pipeline]
(when page-clause
[{$skip (* items-per-page (dec page-num))}
{$limit items-per-page}]))
(defn- handle-page [{{page-num :page items-per-page :items, :as page-clause} :page} pipeline-ctx]
(if-not page-clause
pipeline-ctx
(update pipeline-ctx :query into [{$skip (* items-per-page (dec page-num))}
{$limit items-per-page}])))
;;; # process + run
......@@ -324,21 +337,25 @@
(defn- generate-aggregation-pipeline
"Generate the aggregation pipeline. Returns a sequence of maps representing each stage."
[query]
(loop [pipeline [], [f & more] [add-initial-projection
handle-filter
handle-breakout+aggregation
handle-order-by
handle-fields
handle-limit
handle-page]]
(let [out (f query pipeline)
pipeline (cond
(nil? out) pipeline
(map? out) (conj pipeline out)
(sequential? out) (vec (concat pipeline out)))]
(if-not (seq more)
pipeline
(recur pipeline more)))))
(reduce (fn [pipeline-ctx f]
(f query pipeline-ctx))
{:projections [], :query []}
[add-initial-projection
handle-filter
handle-breakout+aggregation
handle-order-by
handle-fields
handle-limit
handle-page]))
(defn- create-unescaping-rename-map [original-keys]
(into {} (for [k original-keys]
(let [k-str (name k)
unescaped (-> k-str
(s/replace #"___" ".")
(s/replace #"~~~(.+)$" ""))]
(when-not (= k-str unescaped)
{k (keyword unescaped)})))))
(defn- unescape-names
"Restore the original, unescaped nested Field names in the keys of RESULTS.
......@@ -346,13 +363,7 @@
[results]
;; Build a map of escaped key -> unescaped key by looking at the keys in the first result
;; e.g. {:source___username :source.username}
(let [replacements (into {} (for [k (keys (first results))]
(let [k-str (name k)
unescaped (-> k-str
(s/replace #"___" ".")
(s/replace #"~~~(.+)$" ""))]
(when-not (= k-str unescaped)
{k (keyword unescaped)}))))]
(let [replacements (create-unescaping-rename-map (keys (first results)))]
;; If the map is non-empty then map set/rename-keys over the results with it
(if-not (seq replacements)
results
......@@ -449,15 +460,16 @@
{:pre [(map? database)
(string? source-table-name)]}
(binding [*query* query]
(let [generated-pipeline (generate-aggregation-pipeline (:query query))]
(let [{proj :projections, generated-pipeline :query} (generate-aggregation-pipeline (:query query))]
(log-monger-form generated-pipeline)
{:query generated-pipeline
{:projections proj
:query generated-pipeline
:collection source-table-name
:mbql? true})))
(defn execute-query
"Process and run a native MongoDB query."
[{{:keys [collection query mbql?]} :native, database :database}]
[{{:keys [collection query mbql? projections]} :native, database :database}]
{:pre [query
(string? collection)
(map? database)]}
......@@ -470,11 +482,20 @@
results
[results])
;; if we formed the query using MBQL then we apply a couple post processing functions
results (if-not mbql? results
(-> results
unescape-names
unstringify-dates))
columns (vec (keys (first results)))]
results (if-not mbql?
results
(-> results
unescape-names
unstringify-dates))
rename-map (create-unescaping-rename-map projections)
columns (if-not mbql?
(vec (keys (first results)))
(map (fn [proj]
(if (contains? rename-map proj)
(get rename-map proj)
proj))
projections))]
{:columns columns
:rows (for [row results]
(mapv row columns))
......
......@@ -15,7 +15,8 @@
[humanization :as humanization]]
[metabase.query-processor
[interface :as i]
[sort :as sort]]
[sort :as sort]
[util :as qputil]]
[toucan.db :as db])
(:import [metabase.query_processor.interface Expression ExpressionRef]))
......
......@@ -28,8 +28,8 @@
(defn- row-map-fn [dim-seq]
(fn [row]
(concat row (map (fn [{:keys [col-index xform-fn]}]
(xform-fn (nth row col-index)))
dim-seq))))
(xform-fn (nth row col-index)))
dim-seq))))
(defn- transform-values-for-col
"Converts `VALUES` to a type compatible with the base_type found for
......
......@@ -76,7 +76,6 @@
query
(update query :query add-implicit-clauses-to-inner-query)))
(defn add-implicit-clauses
"Add an implicit `fields` clause to queries with no `:aggregation`, `breakout`, or explicit `:fields` clauses.
Add implicit `:order-by` clauses for fields specified in a `:breakout`."
......
......@@ -9,6 +9,9 @@
[util :as tu]]
[metabase.test.data.datasets :refer [expect-with-engine]]))
(def col-defaults
{:remapped_to nil, :remapped_from nil})
;; Test native queries
(expect-with-engine :bigquery
[[100]
......@@ -21,10 +24,12 @@
;; make sure that BigQuery native queries maintain the column ordering specified in the SQL -- post-processing ordering shouldn't apply (Issue #2821)
(expect-with-engine :bigquery
{:cols [{:name "venue_id", :display_name "Venue ID", :base_type :type/Integer}
{:name "user_id", :display_name "User ID" :base_type :type/Integer}
{:name "checkins_id", :display_name "Checkins ID" :base_type :type/Integer}],
:columns ["venue_id" "user_id" "checkins_id"]}
{:columns ["venue_id" "user_id" "checkins_id"],
:cols (mapv #(merge col-defaults %)
[{:name "venue_id", :display_name "Venue ID", :base_type :type/Integer}
{:name "user_id", :display_name "User ID", :base_type :type/Integer}
{:name "checkins_id", :display_name "Checkins ID", :base_type :type/Integer}])}
(select-keys (:data (qp/process-query {:native {:query "SELECT [test_data.checkins.venue_id] AS [venue_id], [test_data.checkins.user_id] AS [user_id], [test_data.checkins.id] AS [checkins_id]
FROM [test_data.checkins]
LIMIT 2"}
......
......@@ -78,7 +78,9 @@
:row_count 1
:data {:rows [[1]]
:columns ["count"]
:cols [{:name "count", :display_name "Count", :base_type :type/Integer}]
:cols [{:name "count", :display_name "Count", :base_type :type/Integer
:remapped_to nil, :remapped_from nil}]
:native_form {:collection "venues"
:query native-query}}}
(-> (qp/process-query {:native {:query native-query
......
......@@ -3,6 +3,7 @@
(:require [metabase.query-processor.middleware.expand :as ql]
[metabase.query-processor-test :refer :all]
[metabase.test.data :as data]
[metabase.test.data.datasets :as datasets]
[metabase.test.util :as tu]))
(tu/resolve-private-vars metabase.query-processor.middleware.add-dimension-projections create-remapped-col)
......@@ -11,8 +12,7 @@
{:rows [["20th Century Cafe" 12 "Café Sweets"]
["25°" 11 "Café"]
["33 Taps" 7 "Beer Garden"]
["800 Degrees Neapolitan Pizzeria" 58 "Ramen"]
["BCD Tofu House" 44 "Landmark"]]
["800 Degrees Neapolitan Pizzeria" 58 "Ramen"]]
:columns [(data/format-name "name")
(data/format-name "category_id")
"Foo"]
......@@ -25,7 +25,7 @@
(->> (data/run-query venues
(ql/fields $name $category_id)
(ql/order-by (ql/asc $name))
(ql/limit 5))
(ql/limit 4))
booleanize-native-form
(format-rows-by [str int str]))))
......@@ -51,12 +51,11 @@
(fn [rows]
(map #(mapv % col-indexes) rows))))))
(qp-expect-with-all-engines
(datasets/expect-with-engines (engines-that-support :foreign-keys)
{:rows [["20th Century Cafe" 2 "Café"]
["25°" 2 "Burger"]
["33 Taps" 2 "Bar"]
["800 Degrees Neapolitan Pizzeria" 2 "Pizza"]
["BCD Tofu House" 2 "Korean"]]
["800 Degrees Neapolitan Pizzeria" 2 "Pizza"]]
:columns [(:name (venues-col :name))
(:name (venues-col :price))
(data/format-name "name_2")]
......@@ -73,7 +72,8 @@
(data/create-venue-category-fk-remapping "Foo")
(->> (data/run-query venues
(ql/order-by (ql/asc $name))
(ql/limit 5))
(ql/limit 4))
booleanize-native-form
(format-rows-by [int str int double double int str])
(select-columns (set (map data/format-name ["name" "price" "name_2"]))))))
(select-columns (set (map data/format-name ["name" "price" "name_2"])))
:data)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment