Skip to content
Snippets Groups Projects
Commit bf9794e6 authored by Cam Saul's avatar Cam Saul
Browse files

add new metabase.drive.rmongo.query-processor and

metabase.driver.mongo.util namespaces
parent b94e86c5
No related branches found
No related tags found
No related merge requests found
......@@ -72,7 +72,7 @@
[lein-instant-cheatsheet "2.1.1"] ; use awesome instant cheatsheet created by yours truly w/ 'lein instant-cheatsheet'
[lein-marginalia "0.8.0"] ; generate documentation with 'lein marg'
[refactor-nrepl "1.0.1"]] ; support for advanced refactoring in Emacs/LightTable
:warn-on-reflection true ; Emit warnings on all reflection calls
:global-vars {*warn-on-reflection* true} ; Emit warnings on all reflection calls
:jvm-opts ["-Dlogfile.path=target/log"
"-Xms1024m" ; give JVM a decent heap size to start with
"-Xmx2048m" ; hard limit of 2GB so we stop hitting the 4GB container limit on CircleCI
......
(ns metabase.driver.mongo.query-processor
(:refer-clojure :exclude [find sort])
(:require [clojure.core.match :refer [match]]
[colorize.core :as color]
(monger [collection :as mc]
[core :as mg]
[db :as mdb]
[operators :refer :all]
[query :refer :all])
[metabase.db :refer :all]
[metabase.driver :as driver]
[metabase.driver.query-processor :refer [*query*]]
[metabase.driver.mongo.util :refer [with-mongo-connection *mongo-connection*]]
(metabase.models [database :refer [Database]]
[field :refer [Field]]
[table :refer [Table]]))
(:import (com.mongodb CommandResult
DBApiLayer)
(clojure.lang PersistentArrayMap)))
(declare apply-clause
annotate-native-results
annotate-results
eval-raw-command
field-id->kw
process-structured
process-and-run-structured)
;; # DRIVER QP INTERFACE
(defn process-and-run [{query-type :type database-id :database :as query}]
(with-mongo-connection [_ (sel :one :fields [Database :details] :id database-id)]
(case (keyword query-type)
:query (let [generated-query (process-structured (:query query))]
;; ; TODO - log/debug
(println (color/magenta "\n******************** Generated Monger Query: ********************\n"
(with-out-str (clojure.pprint/pprint generated-query))
"*****************************************************************\n"))
(->> (eval generated-query)
(annotate-results (:query query))))
:native (->> (eval-raw-command (:query (:native query)))
annotate-native-results))))
;; # NATIVE QUERY PROCESSOR
(defn eval-raw-command
"Evaluate raw MongoDB javascript code. This must be ran insided the body of a `with-mongo-connection`.
(with-mongo-connection [_ \"mongodb://localhost/test\"]
(eval-raw-command \"db.zips.findOne()\"))
-> {\"_id\" \"01001\", \"city\" \"AGAWAM\", ...}"
[^String command]
(assert *mongo-connection* "eval-raw-command must be ran inside the body of with-mongo-connection.")
(let [^CommandResult result (.doEval ^DBApiLayer *mongo-connection* command nil)]
(when-not (.ok result)
(throw (.getException result)))
(let [{result "retval"} (PersistentArrayMap/create (.toMap result))]
result)))
(defn annotate-native-results
"Package up the results in the way the frontend expects."
[results]
(if-not (sequential? results) (annotate-native-results [results])
{:status :completed
:row_count (count results)
:data {:rows results
:columns (keys (first results))}}))
;; # STRUCTURED QUERY PROCESSOR
;; ## AGGREGATION IMPLEMENTATIONS
(def ^:private aggregations (atom '()))
(def ^:dynamic *collection-name* nil)
(def ^:dynamic *constraints* nil)
(defmacro defaggregation [match-binding & body]
`(swap! aggregations concat
(quote [~match-binding (try
~@body
(catch Throwable e#
(println (color/red ~(format "Failed to apply aggregation %s: " match-binding)
e#))))])))
(defn aggregate [& forms]
`(mc/aggregate *mongo-connection* ~*collection-name* [~@(when *constraints*
[{$match *constraints*}])
~@forms]))
(defn field-id->$string [field-id]
(format "$%s" (name (field-id->kw field-id))))
(defaggregation ["rows"]
`(doall (with-collection *mongo-connection* ~*collection-name*
~@(when *constraints* [`(find ~*constraints*)])
~@(mapcat apply-clause *query*))))
(defaggregation ["count"]
`[{:count (mc/count *mongo-connection* ~*collection-name*
~*constraints*)}])
(defaggregation ["avg" field-id]
(aggregate {$group {"_id" nil
"avg" {$avg (field-id->$string field-id)}}}))
(defaggregation ["count" field-id]
(aggregate {$match {(field-id->kw field-id) {$exists true}}}
{$group {"_id" nil
"count" {$sum 1}}}
{$project {"_id" false, "count" true}}))
(defaggregation ["distinct" field-id]
(aggregate {$group {"_id" (field-id->$string field-id)}}
{$group {"_id" nil
"count" {$sum 1}}}
{$project {"_id" false, "count" true}}))
(defaggregation ["stddev" field-id]
nil) ; TODO
(defaggregation ["sum" field-id]
(aggregate {$group {"_id" nil ; TODO - I don't think this works for _id
"sum" {$sum (field-id->$string field-id)}}}
{$project {"_id" false, "sum" true}}))
(defaggregation ["cum_sum" field-id]
nil) ; TODO
(defmacro match-aggregation [aggregation]
`(match ~aggregation
~@@aggregations
~'_ nil))
(defn process-structured [{:keys [source_table aggregation] :as query}]
(binding [*collection-name* (sel :one :field [Table :name] :id source_table)
*constraints* (when-let [filter-clause (:filter query)]
(apply-clause [:filter filter-clause]))
*query* (dissoc query :filter)]
(match-aggregation aggregation)))
;; ## ANNOTATION
(defn annotate-results [{:keys [source_table] :as query} results]
{:pre [(integer? source_table)]}
(let [field-name->id (sel :many :field->id [Field :name] :table_id source_table)
column-names (keys (first results))]
{:row_count (count results)
:status :completed
:data {:columns column-names
:cols (map (fn [column-name]
{:name column-name
:id (field-name->id (name column-name))
:table_id source_table
:description nil
:base_type :UnknownField
:special_type nil
:extra_info {}})
column-names)
:rows (map #(map % column-names)
results)}}))
;; ## CLAUSE APPLICATION 2.0
(def field-id->kw
(memoize
(fn [field-id]
(keyword (sel :one :field [Field :name] :id field-id)))))
(def clauses (atom '()))
(defmacro defclause [clause match-binding & body]
`(swap! clauses concat '[[~clause ~match-binding] (try
~@body
(catch Throwable e#
(println (color/red ~(format "Failed to process clause [%s %s]: " clause match-binding)
(.getMessage e#)))))]))
;; ### CLAUSE DEFINITIONS
;; ### breakout (TODO)
(defclause :breakout field-ids
nil)
;; TODO - this still returns _id, even if we don't ask for it :/
(defclause :fields field-ids
`[(fields ~(mapv field-id->kw field-ids))])
;; ### filter
;; !!! SPECIAL CASE - since this is used in a different way by the different aggregation options
;; we just return a "constraints" map
(defclause :filter ["INSIDE" lat-field-id lon-field-id lat-max lon-min lat-min lon-max]
(let [lat-field (field-id->kw lat-field-id)
lon-field (field-id->kw lon-field-id)]
{$and [{lat-field {$gte lat-min, $lte lat-max}}
{lon-field {$gte lon-min, $lte lon-max}}]}))
(defclause :filter ["IS_NULL" field-id]
{(field-id->kw field-id) {$exists false}})
(defclause :filter ["NOT_NULL" field-id]
{(field-id->kw field-id) {$exists true}})
(defclause :filter ["BETWEEN" field-id min max] ; is this supposed to be inclusive, or not ?
{(field-id->kw field-id) {$gt min
$lt max}})
(defclause :filter ["=" field-id value]
{(field-id->kw field-id) value})
(defclause :filter ["!=" field-id value]
{(field-id->kw field-id) {$ne value}})
(defclause :filter ["<" field-id value]
{(field-id->kw field-id) {$lt value}})
(defclause :filter [">" field-id value]
{(field-id->kw field-id) {$gt value}})
(defclause :filter ["<=" field-id value]
{(field-id->kw field-id) {$lte value}})
(defclause :filter [">=" field-id value]
{(field-id->kw field-id) {$gte value}})
(defclause :filter ["AND" & subclauses]
{$and (mapv #(apply-clause [:filter %]) subclauses)})
(defclause :filter ["OR" & subclauses]
{$or (mapv #(apply-clause [:filter %]) subclauses)})
;; ### limit
(defclause :limit value
`[(limit ~value)])
;; ### order_by
(defclause :order_by field-dir-pairs
(let [sort-options (mapcat (fn [[field-id direction]]
[(field-id->kw field-id) (case (keyword direction)
:ascending 1
:descending -1)])
field-dir-pairs)]
(when (seq sort-options)
`[(sort (array-map ~@sort-options))])))
;; ### page
(defclause :page page-clause
(let [{page-num :page items-per-page :items} page-clause
num-to-skip (* (dec page-num) items-per-page)]
`[(skip ~num-to-skip)
(limit ~items-per-page)]))
;; ### APPLY-CLAUSE
(defmacro match-clause [clause]
`(match ~clause
~@@clauses
~'_ nil))
(defn apply-clause [clause]
(match-clause clause))
(ns metabase.driver.mongo.util
"`*mongo-connection*`, `with-mongo-connection`, and other functions shared between several Mongo driver namespaces."
(:require [clojure.tools.logging :as log]
[colorize.core :as color]
[monger.core :as mg]))
(def ^:dynamic *mongo-connection*
"Bound by top-level `with-mongo-connection` so it may be reused within its body."
nil)
(defn -with-mongo-connection
"Run F with a new connection (bound to `*mongo-connection*`) to DATABASE.
Don't use this directly; use `with-mongo-connection`."
[f database]
(let [connection-string (if (map? database) (-> database :details :conn_str)
database)
_ (assert (string? connection-string) (str "with-mongo-connection failed: connection string is must be a string, got: " connection-string))
{conn :conn mongo-connection :db} (mg/connect-via-uri connection-string)]
(log/debug (color/cyan "<< OPENED NEW MONGODB CONNECTION >>"))
(try
(binding [*mongo-connection* mongo-connection]
(f *mongo-connection*))
(finally
(mg/disconnect conn)))))
(defmacro with-mongo-connection
"Open a new MongoDB connection to DATABASE-OR-CONNECTION-STRING, bind connection to BINDING, execute BODY, and close the connection.
The DB connection is re-used by subsequent calls to `with-mongo-connection` within BODY.
(We're smart about it: DATABASE isn't even evaluated if `*mongo-connection*` is already bound.)
(with-mongo-connection [conn @(:db (sel :one Table ...))] ; delay isn't derefed if *mongo-connection* is already bound
...)
(with-mongo-connection [conn \"mongodb://127.0.0.1:27017/test\"] ; use a string instead of a DB
...)"
[[binding database] & body]
`(let [f# (fn [~binding]
~@body)]
(if *mongo-connection* (f# *mongo-connection*)
(-with-mongo-connection f# ~database))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment