Skip to content
Snippets Groups Projects
Commit afa50d0f authored by Cam Saül's avatar Cam Saül
Browse files

Mongo Query Processor 2.0 :scream_cat:

parent 77fd45d5
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,12 @@ machine:
version: 2.7.3
dependencies:
override:
- sudo apt-get purge mongodb-org*
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
- sudo apt-get update
- sudo apt-get install -y mongodb-org
- sudo service mongod restart
- lein deps
- pip install awscli==1.7.3
database:
......
(ns metabase.db.metadata-queries
"Predefined QP queries for getting metadata about an external database."
(:require [metabase.driver :as driver]
[metabase.driver.sync :as sync]
[metabase.util :as u]))
;; TODO - These queries have to be evaluated by the query processor and macroexpanded at runtime every time they're ran.
;; It would be more efficient if we could let the QP could macroexpand normally for predefined queries like these
(defn- field-query [field query]
(->> (driver/process-query
{:type :query
:database ((u/deref-> field :table :db) :id)
:query (assoc query
:source_table ((u/deref-> field :table) :id))})
:data
:rows))
(-> (driver/process-query
{:type :query
:database ((u/deref-> field :table :db) :id)
:query (assoc query
:source_table ((u/deref-> field :table) :id))})
:data
:rows))
(defn field-distinct-values
"Return the distinct values of FIELD."
"Return the distinct values of FIELD.
This is used to create a `FieldValues` object for `:category` Fields."
[{field-id :id :as field}]
(->> (field-query field {:aggregation ["rows"] ; should we add a limit here? In case someone is dumb and tries to get millions of distinct values?
:breakout [field-id]}) ; or should we let them do it
(map first)))
(mapv first (field-query field {:breakout [field-id]
:limit sync/low-cardinality-threshold})))
(defn field-distinct-count
"Return the distinct count of FIELD."
[{field-id :id :as field}]
(->> (field-query field {:aggregation ["distinct" field-id]})
first
first))
(-> (field-query field {:aggregation ["distinct" field-id]})
first
first))
(defn field-count
"Return the count of FIELD."
[{field-id :id :as field}]
(->> (field-query field {:aggregation ["count" field-id]})
first
first))
(-> (field-query field {:aggregation ["count" field-id]})
first
first))
......@@ -285,7 +285,7 @@
:distinct "count"
:sum "sum"})
(defn- ag-type->group-by-clause [{:keys [aggregation-type field]}]
(defn- aggregation->rvalue [{:keys [aggregation-type field]}]
(if-not field
(case aggregation-type
:count {$sum 1})
......@@ -298,24 +298,32 @@
:sum {$sum (->rvalue field)})))
(defn- handle-breakout+aggregation [{breakout-fields :breakout, {ag-type :aggregation-type, :as aggregation} :aggregation} pipeline]
(when (and ag-type
(not= ag-type :rows))
(let [ag-field-name (ag-type->field-name ag-type)]
(filter identity
[(when (seq breakout-fields)
{$project {"_id" "$_id"
"___group" (into {} (for [field breakout-fields] ; create a totally sweet made-up column called __group
{(->lvalue field) (->rvalue field)}))}}) ; to store the fields we'd like to group by
{$group {"_id" (when (seq breakout-fields)
"$___group")
ag-field-name (ag-type->group-by-clause aggregation)}}
{$sort {"_id" 1}}
{$project (merge {"_id" false
ag-field-name (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)}
(into {} (for [field breakout-fields]
{(->lvalue field) (format "$_id.%s" (->lvalue field))})))}]))))
(let [aggregation? (and ag-type
(not= ag-type :rows))
breakout? (seq breakout-fields)]
(when (or aggregation? breakout?)
(let [ag-field-name (ag-type->field-name ag-type)]
(filter identity
[ ;; create a totally sweet made-up column called __group to store the fields we'd like to group by
(when breakout?
{$project {"_id" "$_id"
"___group" (into {} (for [field breakout-fields]
{(->lvalue field) (->rvalue field)}))}})
;; Now project onto the __group and the aggregation rvalue
{$group (merge {"_id" (when breakout?
"$___group")}
(when aggregation
{ag-field-name (aggregation->rvalue aggregation)}))}
;; Sort by _id (___group)
{$sort {"_id" 1}}
;; now project back to the fields we expect
{$project (merge {"_id" false}
(when aggregation?
{ag-field-name (if (= ag-type :distinct)
{$size "$count"} ; HACK
true)})
(into {} (for [field breakout-fields]
{(->lvalue field) (format "$_id.%s" (->lvalue field))})))}])))))
;;; ### order-by
......@@ -412,6 +420,6 @@
(log-monger-form generated-pipeline)
(->> (with-mongo-connection [_ database]
(mc/aggregate *mongo-connection* source-table-name generated-pipeline
:allow-disk-use (not (config/is-test?))))
:allow-disk-use true))
unescape-names
unstringify-dates))))
......@@ -113,7 +113,6 @@
(let [expected-keys (set (map :field-name fields))
_ (assert (every? keyword? expected-keys))
missing-keys (set/difference actual-keys expected-keys)]
(when (seq missing-keys)
(log/warn (u/format-color 'yellow "There are fields we weren't expecting in the results: %s\nExpected: %s\nActual: %s"
missing-keys expected-keys actual-keys)))
......@@ -206,7 +205,7 @@
:destination_id [not= nil]))))
;; Fetch the destination Fields referenced by the ForeignKeys
([fields fk-ids id->dest-id]
(when (seq (vals id->dest-id))
(when (seq id->dest-id)
(fk-field->dest-fn fields fk-ids id->dest-id (sel :many :id->fields [Field :id :name :display_name :table_id :description :base_type :special_type :preview_display]
:id [in (vals id->dest-id)]))))
;; Return a function that will return the corresponding destination Field for a given Field
......
......@@ -357,31 +357,22 @@
;; ## sync-field
(defmacro ^:private sync-field->>
"Like `->>`, but wrap each form with `try-apply`, and pass FIELD along to the next if the previous form returned `nil`."
[field & fns]
`(->> ~field
~@(->> fns
(map (fn [f]
(let [[f & args] (if (list? f) f [f])]
`((fn [field#]
(or (u/try-apply ~f ~@args field#)
field#)))))))))
(defn- sync-field!
"Sync the metadata for FIELD, marking urls, categories, etc. when applicable."
[driver field]
{:pre [driver
field]}
(sync-field->> field
(maybe-driver-specific-sync-field! driver)
set-field-display-name-if-needed!
(mark-url-field! driver)
(mark-no-preview-display-field! driver)
mark-category-field-or-update-field-values!
(mark-json-field! driver)
auto-assign-field-special-type-by-name!
(sync-field-nested-fields! driver)))
{:pre [driver field]}
(loop [field field, [f & more] [(partial maybe-driver-specific-sync-field! driver)
set-field-display-name-if-needed!
(partial mark-url-field! driver)
(partial mark-no-preview-display-field! driver)
mark-category-field-or-update-field-values!
(partial mark-json-field! driver)
auto-assign-field-special-type-by-name!
(partial sync-field-nested-fields! driver)]]
(let [field (or (u/try-apply f field)
field)]
(when (seq more)
(recur field more)))))
;; Each field-syncing function below should return FIELD with any updates that we made, or nil.
......@@ -453,7 +444,7 @@
;; ### mark-category-field-or-update-field-values!
(def ^:const ^:private low-cardinality-threshold
(def ^:const low-cardinality-threshold
"Fields with less than this many distinct values should automatically be marked with `special_type = :category`."
40)
......
......@@ -166,7 +166,7 @@
:hour (trunc-with-format "yyyy-MM-dd'T'HH:00:00")
:day (trunc-with-format "yyyy-MM-dd")
:week (let [day-of-week (date-extract :day-of-week date)
date (relative-date :day (- day-of-week) date)]
date (relative-date :day (- (dec day-of-week)) date)]
(trunc-with-format "yyyy-MM-dd" date))
:month (trunc-with-format "yyyy-MM")
:quarter (let [year (date-extract :year date)
......@@ -426,9 +426,20 @@
(pprint-to-str (filtered-stacktrace e))))))))))
(defn try-apply
"Like `apply`, but wraps F inside a `try-catch` block and logs exceptions caught."
"Like `apply`, but wraps F inside a `try-catch` block and logs exceptions caught.
(This is actaully more flexible than `apply` -- the last argument doesn't have to be
a sequence:
(try-apply vector :a :b [:c :d]) -> [:a :b :c :d]
(apply vector :a :b [:c :d]) -> [:a :b :c :d]
(try-apply vector :a :b :c :d) -> [:a :b :c :d]
(apply vector :a :b :c :d) -> Not ok - :d is not a sequence
This allows us to use `try-apply` in more situations than we'd otherwise be able to."
[^clojure.lang.IFn f & args]
(apply (wrap-try-catch f) args))
(apply (wrap-try-catch f) (concat (butlast args) (if (sequential? (last args))
(last args)
[(last args)]))))
(defn wrap-try-catch!
"Re-intern FN-SYMB as a new fn that wraps the original with a `try-catch`. Intended for debugging.
......
......@@ -430,10 +430,20 @@
breakout user_id
order user_id+))
;; ### BREAKOUT w/o AGGREGATION
;; This should act as a "distinct values" query and return ordered results
(qp-expect-with-all-datasets
{:cols [(checkins-col :user_id)]
:columns [(format-name "user_id")]
:rows [[1] [2] [3] [4] [5] [6] [7] [8] [9] [10]]}
(Q breakout user_id of checkins
limit 10))
;; ### "BREAKOUT" - MULTIPLE COLUMNS W/ IMPLICT "ORDER_BY"
;; Fields should be implicitly ordered :ASC for all the fields in `breakout` that are not specified in `order_by`
(qp-expect-with-all-datasets
{:rows [[1 1 1] [1 5 1] [1 7 1] [1 10 1] [1 13 1] [1 16 1] [1 26 1] [1 31 1] [1 35 1] [1 36 1]],
{:rows [[1 1 1] [1 5 1] [1 7 1] [1 10 1] [1 13 1] [1 16 1] [1 26 1] [1 31 1] [1 35 1] [1 36 1]]
:columns [(format-name "user_id")
(format-name "venue_id")
"count"]
......@@ -821,8 +831,7 @@
[897 "Wearing a Biggie Shirt"]
[499 "In the Expa Office"]]
(Q dataset tupac-sightings
return rows
of sightings
return rows of sightings
fields id category_id->categories.name
order timestamp-
limit 10))
......
......@@ -8,7 +8,7 @@
;; Check that setting a Field's special_type to :category will cause a corresponding FieldValues to be created asynchronously
(expect
[nil
75
40
:done]
(let [orig-special-type (sel :one :field [Field :special_type] :id (id :categories :name))
set-field-special-type (fn [special-type]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment