Skip to content
Snippets Groups Projects
Commit 456372f0 authored by Cam Saül's avatar Cam Saül
Browse files

Merge pull request #371 from metabase/mongo_driver_redux

MongoDB Driver: Syncing + QP
parents 1014828b c50c1e8d
No related branches found
No related tags found
No related merge requests found
Showing
with 953 additions and 233 deletions
......@@ -19,6 +19,8 @@
(expect-eval-actual-first 1)
(expect-expansion 0)
(expect-let 1)
(expect-when-testing-against-dataset 1)
(expect-with-all-drivers 1)
(ins 1)
(let-400 1)
(let-404 1)
......@@ -28,7 +30,9 @@
(macrolet 1)
(org-perms-case 1)
(pdoseq 1)
(qp-expect-with-all-drivers 1)
(symbol-macrolet 1)
(sync-in-context 2)
(upd 2)
(when-testing-dataset 1)
(with-credentials 1)))))))
......@@ -47,6 +47,13 @@ Run unit tests with
lein test
By default, the tests only run against the `generic-sql` dataset (an H2 test database).
You can run specify which datasets/drivers to run tests against with the env var `MB_TEST_DATASETS`:
MB_TEST_DATASETS=generic-sql,mongo lein test
At the time of this writing, the valid datasets are `generic-sql` and `mongo`.
## Documentation
......
......@@ -4,5 +4,10 @@ machine:
oraclejdk8
test:
override:
- case $CIRCLE_NODE_INDEX in 0) lein test ;; 1) MB_DB_TYPE=postgres MB_DB_DBNAME=circle_test MB_DB_PORT=5432 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 2) lein eastwood ;; 3) ./lint_js.sh && lein bikeshed --max-line-length 240 ;; 4) lein uberjar ;; esac:
# 0) runs unit tests w/ H2 local DB. Runs against both Mongo + H2 test datasets
# 1) runs unit tests w/ Postgres local DB. Only runs against H2 test dataset so we can be sure tests work in either scenario
# 2) runs Eastwood linter
# 3) runs JS linter + Bikeshed linter
# 4) Runs lein uberjar
- case $CIRCLE_NODE_INDEX in 0) MB_TEST_DATASETS=generic-sql,mongo lein test ;; 1) MB_DB_TYPE=postgres MB_DB_DBNAME=circle_test MB_DB_PORT=5432 MB_DB_USER=ubuntu MB_DB_HOST=localhost lein test ;; 2) lein eastwood ;; 3) ./lint_js.sh && lein bikeshed --max-line-length 240 ;; 4) lein uberjar ;; esac:
parallel: true
......@@ -73,6 +73,7 @@
[lein-instant-cheatsheet "2.1.1"] ; use awesome instant cheatsheet created by yours truly w/ 'lein instant-cheatsheet'
[lein-marginalia "0.8.0"] ; generate documentation with 'lein marg'
[refactor-nrepl "1.0.1"]] ; support for advanced refactoring in Emacs/LightTable
:global-vars {*warn-on-reflection* true} ; Emit warnings on all reflection calls
:jvm-opts ["-Dlogfile.path=target/log"
"-Xms1024m" ; give JVM a decent heap size to start with
"-Xmx2048m" ; hard limit of 2GB so we stop hitting the 4GB container limit on CircleCI
......
......@@ -74,12 +74,6 @@
(config/config-str :mb-db-user)
(config/config-str :mb-db-pass))))
(defn test-db-conn
"Simple test of a JDBC connection."
[jdbc-db]
(let [result (first (jdbc/query jdbc-db ["select 7 as num"] :row-fn :num))]
(assert (= 7 result) "JDBC Connection Test FAILED")))
;; ## MIGRATE
......@@ -98,16 +92,22 @@
(def ^:private setup-db-has-been-called?
(atom false))
(def ^:private db-can-connect? (u/runtime-resolved-fn 'metabase.driver 'can-connect?))
(defn setup-db
"Do general perparation of database by validating that we can connect.
Caller can specify if we should run any pending database migrations."
[& {:keys [auto-migrate]
:or {auto-migrate true}}]
(reset! setup-db-has-been-called? true)
(log/info "Setting up DB specs...")
(let [jdbc-db (setup-jdbc-db)
korma-db (setup-korma-db)]
;; Test DB connection and throw exception if we have any troubles connecting
(test-db-conn jdbc-db)
(log/info "Verifying Database Connection ...")
(assert (db-can-connect? {:engine (config/config-kw :mb-db-type)
:details {:conn_str (metabase-db-conn-str)}})
"Unable to connect to Metabase DB.")
(log/info "Verify Database Connection ... CHECK")
;; Run through our DB migration process and make sure DB is fully prepared
(if auto-migrate
......@@ -277,7 +277,7 @@
(sel :one User :id 1) -> returns the User (or nil) whose id is 1
(sel :many OrgPerm :user_id 1) -> returns sequence of OrgPerms whose user_id is 1
OPTION, if specified, is one of `:field`, `:fields`, `:id`, `:id->field`, `:field->id`, or `:id->fields`.
OPTION, if specified, is one of `:field`, `:fields`, `:id`, `:id->field`, `:field->id`, `:field->obj`, or `:id->fields`.
;; Only return IDs of objects.
(sel :one :id User :email \"cam@metabase.com\") -> 120
......@@ -297,6 +297,11 @@
(sel :many :field->id [User :first_name])
-> {\"Cam\" 1, \"Sameer\" 2}
;; Return a map of field value -> *entire* object. Duplicates will be discarded!
(sel :many :field->obj [Table :name] :db_id 1)
-> {\"venues\" {:id 1, :name \"venues\", ...}
\"users\" {:id 2, :name \"users\", ...}}
;; Return a map of ID -> specified fields
(sel :many :id->fields [User :first_name :last_name])
-> {1 {:first_name \"Cam\", :last_name \"Saul\"},
......@@ -345,6 +350,11 @@
(map (fn [{id# :id field-val# field#}]
{field-val# id#}))
(into {})))
:field->obj `(let [[entity# field#] ~entity]
(->> (sel :many entity# ~@forms)
(map (fn [obj#]
{(field# obj#) obj#}))
(into {})))
:fields `(let [[~'_ & fields# :as entity#] ~entity]
(map #(select-keys % fields#)
(sel :many entity# ~@forms)))
......
......@@ -41,6 +41,7 @@
java.math.BigInteger :BigIntegerField
java.sql.Date :DateField
java.sql.Timestamp :DateTimeField
java.util.Date :DateField
org.postgresql.util.PGobject :UnknownField}) ; this mapping included here since Native QP uses class->base-type directly. TODO - perhaps make *class-base->type* driver specific?
;; ## Driver Lookup
......@@ -84,33 +85,56 @@
(defn can-connect?
"Check whether we can connect to DATABASE and perform a basic query (such as `SELECT 1`)."
[database]
(i/can-connect? (engine->driver (:engine database)) database))
{:pre [(map? database)]}
(try
(i/can-connect? (engine->driver (:engine database)) database)
(catch Throwable e
(log/error "Failed to connect to database:" (.getMessage e))
false)))
(defn can-connect-with-details?
"Check whether we can connect to a database with ENGINE and DETAILS-MAP and perform a basic query.
(can-connect-with-details? :postgres {:host \"localhost\", :port 5432, ...})"
[engine details-map]
(i/can-connect-with-details? (engine->driver engine) details-map))
{:pre [(keyword? engine)
(contains? (set (keys available-drivers)) engine)
(map? details-map)]}
(try
(i/can-connect-with-details? (engine->driver engine) details-map)
(catch Throwable e
(log/error "Failed to connect to database:" (.getMessage e))
false)))
(def ^{:arglists '([database])} sync-database!
"Sync a `Database`, its `Tables`, and `Fields`."
(let [-sync-database! (u/runtime-resolved-fn 'metabase.driver.sync 'sync-database!)] ; these need to be resolved at runtime to avoid circular deps
(fn [database]
{:pre [(map? database)]}
(time (-sync-database! (engine->driver (:engine database)) database)))))
(def ^{:arglists '([table])} sync-table!
"Sync a `Table` and its `Fields`."
(let [-sync-table! (u/runtime-resolved-fn 'metabase.driver.sync 'sync-table!)]
(fn [table]
{:pre [(map? table)]}
(-sync-table! (database-id->driver (:db_id table)) table))))
(defn process-query
"Process a structured or native query, and return the result."
[query]
(binding [qp/*query* query]
(i/process-query (database-id->driver (:database query))
(qp/preprocess query))))
{:pre [(map? query)]}
(try
(binding [qp/*query* query]
(let [driver (database-id->driver (:database query))
query (qp/preprocess query)
results (binding [qp/*query* query]
(i/process-query driver (dissoc-in query [:query :cum_sum])))] ; strip out things that individual impls don't need to know about / deal with
(qp/post-process driver query results)))
(catch Throwable e
(.printStackTrace e)
{:status :failed
:error (.getMessage e)})))
;; ## Query Execution Stuff
......@@ -187,6 +211,7 @@
(query-complete query-execution query-result (:cache_result options)))
(catch Exception ex
(log/warn ex)
(.printStackTrace ex)
(query-fail query-execution (.getMessage ex))))))
......
......@@ -10,18 +10,15 @@
[util :refer :all])))
(defrecord SqlDriver [column->base-type
connection-details->connection-spec
database->connection-details
sql-string-length-fn]
connection-details->connection-spec
database->connection-details
sql-string-length-fn]
IDriver
;; Connection
(can-connect? [_ database]
(try (connection/test-connection (-> database
database->connection-details
connection-details->connection-spec))
(catch Throwable e
(log/error "Failed to connect to database:" (.getMessage e))
false)))
(connection/test-connection (-> database
database->connection-details
connection-details->connection-spec)))
(can-connect-with-details? [_ details]
(connection/test-connection (connection-details->connection-spec details)))
......@@ -88,7 +85,8 @@
(aggregate (count :*) :count)
(where {(keyword (:name field)) [not= nil]})) first :count)]
(if (= total-non-null-count 0) 0.0
(let [url-count (-> (select korma-table
(aggregate (count :*) :count)
(where {(keyword (:name field)) [like "http%://_%.__%"]})) first :count)]
(let [url-count (or (-> (select korma-table
(aggregate (count :*) :count)
(where {(keyword (:name field)) [like "http%://_%.__%"]})) first :count)
0)]
(float (/ url-count total-non-null-count)))))))
......@@ -51,21 +51,15 @@
(log/debug "Setting timezone to:" timezone)
(jdbc/db-do-prepared conn set-timezone-sql)))
(jdbc/query conn sql :as-arrays? true))]
{:status :completed
:row_count (count rows)
:data {:rows rows
:columns columns
:cols (map (fn [column first-value]
{:name column
:base_type (value->base-type first-value)})
columns first-row)}})
{:rows rows
:columns columns
:cols (map (fn [column first-value]
{:name column
:base_type (value->base-type first-value)})
columns first-row)})
(catch java.sql.SQLException e
{:status :failed
:error (or (->> (.getMessage e) ; error message comes back like 'Column "ZID" not found; SQL statement: ... [error-code]' sometimes
(re-find #"^(.*);") ; the user already knows the SQL, and error code is meaningless
second) ; so just return the part of the exception that is relevant
(.getMessage e))})))
(def db (delay (-> (sel :one Database :id 1)
db->korma-db
korma.db/get-connection)))
(let [^String message (or (->> (.getMessage e) ; error message comes back like 'Column "ZID" not found; SQL statement: ... [error-code]' sometimes
(re-find #"^(.*);") ; the user already knows the SQL, and error code is meaningless
second) ; so just return the part of the exception that is relevant
(.getMessage e))]
(throw (Exception. message))))))
......@@ -5,19 +5,17 @@
[korma.core :refer :all]
[metabase.config :as config]
[metabase.db :refer :all]
[metabase.driver.generic-sql.native :as native]
[metabase.driver.query-processor :as qp]
(metabase.driver.generic-sql [native :as native]
[util :refer :all])
[metabase.driver.generic-sql.query-processor.annotate :as annotate]
[metabase.driver.generic-sql.util :refer :all]
(metabase.models [database :refer [Database]]
[field :refer [Field]]
[table :refer [Table]])))
(declare apply-form
log-query
post-process
query-is-cumulative-sum?
apply-cumulative-sum)
log-query)
;; # INTERFACE
......@@ -45,14 +43,13 @@
(try
(->> (process query)
eval
(post-process query)
(annotate/annotate query))
(catch java.sql.SQLException e
{:status :failed
:error (or (->> (.getMessage e) ; error message comes back like "Error message ... [status-code]" sometimes
(re-find #"(?s)(^.*)\s+\[[\d-]+\]$") ; status code isn't useful and makes unit tests hard to write so strip it off
second)
(.getMessage e))}))) ; (?s) = Pattern.DOTALL - tell regex `.` to match newline characters as well
(let [^String message (or (->> (.getMessage e) ; error message comes back like "Error message ... [status-code]" sometimes
(re-find #"(?s)(^.*)\s+\[[\d-]+\]$") ; status code isn't useful and makes unit tests hard to write so strip it off
second) ; (?s) = Pattern.DOTALL - tell regex `.` to match newline characters as well
(.getMessage e))]
(throw (Exception. message))))))
(defn process-and-run
......@@ -94,18 +91,22 @@
"count" `(aggregate (~'count ~field) :count)
"distinct" `(aggregate (~'count (sqlfn :DISTINCT ~field)) :count)
"stddev" `(fields [(sqlfn :stddev ~field) :stddev])
"sum" `(aggregate (~'sum ~field) :sum)
"cum_sum" `[(fields ~field) ; just make sure this field is returned + included in GROUP BY
(group ~field)])))) ; cumulative sum happens in post-processing (see below)
"sum" `(aggregate (~'sum ~field) :sum))))) ; cumulative sum happens in post-processing (see below)
;; ### `:breakout`
;; ex.
;;
;; [1412 1413]
(defmethod apply-form :breakout [[_ field-ids]]
(let [field-names (map field-id->kw field-ids)]
(let [ ;; Group by all the breakout fields
field-names (map field-id->kw field-ids)
;; Add fields form only for fields that weren't specified in :fields clause -- we don't want to include it twice, or korma will barf
fields-not-in-fields-clause-names (->> field-ids
(filter (partial (complement contains?) (set (:fields (:query qp/*query*)))))
(map field-id->kw))]
`[(group ~@field-names)
(fields ~@field-names)]))
(fields ~@fields-not-in-fields-clause-names)]))
;; ### `:fields`
;; ex.
......@@ -197,44 +198,12 @@
nil)
;; ## Post Processing
(defn- post-process
"Post-processing stage for query results."
[{query :query} results]
(cond
(query-is-cumulative-sum? query) (apply-cumulative-sum query results)
:else (do results)))
;; ### Cumulative sum
;; Cumulative sum is a special case. We can't do it in the DB because it's not a SQL function; thus we do it as a post-processing step.
(defn- query-is-cumulative-sum?
"Is this a cumulative sum query?"
[query]
(some->> (:aggregation query)
first
(= "cum_sum")))
(defn- apply-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
{:arglists '([query results])}
[{[_ field-id] :aggregation} results]
(let [field (field-id->kw field-id)
values (->> results ; make a sequence of cumulative sum values for each row
(map field)
(reductions +))]
(map (fn [row value] ; replace the value for each row with the cumulative sum value
(assoc row field value))
results values)))
;; ## Debugging Functions (Internal)
(defn- log-query
"Log QUERY Dictionary and the korma form and SQL that the Query Processor translates it to."
[{:keys [source_table] :as query} forms]
(when-not *jdbc-metadata* ; HACK. If *jdbc-metadata* is bound we're probably doing a DB sync. Don't log its hundreds of QP calls, which make it hard to debug.
(when-not qp/*disable-qp-logging*
(log/debug
"\n********************"
"\nSOURCE TABLE: " source_table
......
(ns metabase.driver.generic-sql.query-processor.annotate
"Functions related to annotating results returned by the Query Processor."
(:require [metabase.db :refer :all]
[metabase.driver.query-processor :as qp]
[metabase.driver.generic-sql.util :as gsu]
[metabase.models.field :refer [Field field->fk-table]]))
(declare get-column-names
get-column-info
get-special-column-info
uncastify)
(defn annotate
......@@ -16,55 +17,18 @@
* `:columns` ordered sequence of column names
* `:cols` ordered sequence of information about each column, such as `:base_type` and `:special_type`"
[query results]
(let [column-names (get-column-names query results)]
{:status :completed
:row_count (count results)
:data {:rows (->> results
(map #(map % ; pull out the values in each result in the same order we got from get-column-names
(map keyword column-names))))
:columns column-names
:cols (get-column-info query column-names)}}))
(defn- -order-columns
"Don't use this directly; use `order-columns`.
This broken out for testability -- it doesn't depend on data from the DB."
[fields breakout-field-ids castified-field-names]
;; Basically we want to convert both BREAKOUT-FIELD-IDS and CASTIFIED-FIELD-NAMES to maps like:
;; {:name "updated_at"
;; :id 224
;; :castified (keyword "CAST(updated_at AS DATE)")
;; :position 21}
;; Then we can order things appropriately and return the castified names.
(let [uncastified->castified (zipmap (map #(uncastify (name %)) castified-field-names) castified-field-names)
fields (map #(assoc % :castified (uncastified->castified (:name %)))
fields)
id->field (zipmap (map :id fields) fields)
castified->field (zipmap (map :castified fields) fields)
breakout-fields (->> breakout-field-ids
(map id->field))
other-fields (->> castified-field-names
(map (fn [castified-name]
(or (castified->field castified-name)
{:castified castified-name ; for aggregate fields like 'count' create a fake map
:position 0}))) ; with position 0 so it is returned ahead of the other fields
(filter #(not (contains? (set breakout-field-ids)
(:id %))))
(sort-by :position))]
(->> (concat breakout-fields other-fields)
(map :castified))))
(let [column-names (get-column-names query results)
column-name-kws (map keyword column-names)]
{:rows (->> results
(map (fn [row]
(map row column-name-kws))))
:columns (map uncastify column-names)
:cols (get-column-info query column-names)}))
(defn- order-columns
"Return CASTIFIED-FIELD-NAMES in the order we'd like to display them in the output.
They should be ordered as follows:
1. All breakout fields, in the same order as BREAKOUT-FIELD-IDS
2. Any aggregate fields like `count`
3. All other columns in the same order as `Field.position`."
[{{source-table :source_table breakout-field-ids :breakout} :query} castified-field-names]
(-order-columns (sel :many :fields [Field :id :name :position] :table_id source-table)
(filter identity breakout-field-ids) ; handle empty breakout clauses like [nil]
castified-field-names))
[query castified-field-names]
(binding [qp/*uncastify-fn* uncastify]
(qp/order-columns query castified-field-names)))
(defn- get-column-names
"Get an ordered seqences of column names for the results.
......@@ -73,10 +37,10 @@
(let [field-ids (-> query :query :fields)
fields-clause-fields (when-not (or (empty? field-ids)
(= field-ids [nil]))
(let [field-id->name (->> (sel :many [Field :id :name]
:id [in field-ids]) ; Fetch names of fields from `fields` clause
(map (fn [{:keys [id name]}] ; build map of field-id -> field-name
{id (keyword name)}))
(let [field-id->name (->> (sel :many [Field :id :name :base_type]
:id [in field-ids]) ; Fetch names of fields from `fields` clause
(map (fn [{:keys [id name base_type]}] ; build map of field-id -> field-name
{id (gsu/field-name+base-type->castified-key name base_type)}))
(into {}))]
(map field-id->name field-ids))) ; now get names in same order as the IDs
other-fields (->> (first results)
......@@ -84,6 +48,7 @@
(filter #(not (contains? (set fields-clause-fields) %)))
(order-columns query))]
(->> (concat fields-clause-fields other-fields) ; Return a combined vector. Convert them to strs, otherwise korma
(filter identity) ; remove any nils -- don't want a NullPointerException
(map name)))) ; will qualify them like `"METABASE_FIELD"."FOLLOWERS_COUNT"
(defn- uncastify
......@@ -95,36 +60,7 @@
(or (second (re-find #"CAST\(([^\s]+) AS [\w]+\)" column-name))
column-name))
(defn- get-column-info
"Get extra information about result columns. This is done by looking up matching `Fields` for the `Table` in QUERY or looking up
information about special columns such as `count` via `get-special-column-info`."
(defn get-column-info
"Wrapper for `metabase.driver.query-processor/get-column-info` that calls `uncastify` on column names."
[query column-names]
(let [table-id (get-in query [:query :source_table])
column-names (map uncastify column-names)
columns (->> (sel :many [Field :id :table_id :name :description :base_type :special_type] ; lookup columns with matching names for this Table
:table_id table-id :name [in (set column-names)])
(map (fn [{:keys [name] :as column}] ; build map of column-name -> column
{name (-> (select-keys column [:id :table_id :name :description :base_type :special_type])
(assoc :extra_info (if-let [fk-table (field->fk-table column)]
{:target_table_id (:id fk-table)}
{})))}))
(into {}))]
(->> column-names
(map (fn [column-name]
(or (columns column-name) ; try to get matching column from the map we build earlier
(get-special-column-info query column-name))))))) ; if it's not there then it's a special column like `count`
(defn- get-special-column-info
"Get info like `:base_type` and `:special_type` for a special aggregation column like `count` or `sum`."
[query column-name]
(merge {:name column-name
:id nil
:table_id nil
:description nil}
(let [aggregation-type (keyword column-name) ; For aggregations of a specific Field (e.g. `sum`)
field-aggregation? (contains? #{:avg :stddev :sum} aggregation-type)] ; lookup the field we're aggregating and return its
(if field-aggregation? (sel :one :fields [Field :base_type :special_type] ; type info. (The type info of the aggregate result
:id (-> query :query :aggregation second)) ; will be the same.)
(case aggregation-type ; Otherwise for general aggregations such as `count`
:count {:base_type :IntegerField ; just return hardcoded type info
:special_type :number})))))
(qp/get-column-info query (map uncastify column-names)))
......@@ -90,7 +90,6 @@
(sel :one Table :id table-id)
(throw (Exception. (format "Table with ID %d doesn't exist!" table-id))))))
(defn castify-field
"Wrap Field in a SQL `CAST` statement if needed (i.e., it's a `:DateTimeField`).
......@@ -103,6 +102,15 @@
(if (contains? #{:DateField :DateTimeField} field-base-type) `(korma/raw ~(format "CAST(\"%s\" AS DATE)" field-name))
(keyword field-name)))
(defn field-name+base-type->castified-key
"Like `castify-field`, but returns a keyword that should match the one returned in results."
[field-name field-base-type]
{:pre [(string? field-name)
(keyword? field-base-type)]
:post [(keyword? %)]}
(if (contains? #{:DateField :DateTimeField} field-base-type) (keyword (format "CAST(%s AS DATE)" field-name))
(keyword field-name)))
(def field-id->kw
"Given a metabase `Field` ID, return a keyword for use in the Korma form (or a casted raw string for date fields)."
(memoize ; This can be memozied since the names and base_types of Fields never change
......
(ns metabase.driver.mongo
"MongoDB Driver."
(:require [clojure.core.reducers :as r]
[clojure.set :as set]
[clojure.tools.logging :as log]
[colorize.core :as color]
(monger [collection :as mc]
[command :as cmd]
[conversion :as conv]
[core :as mg]
[db :as mdb]
[query :as mq])
[metabase.driver :as driver]
[metabase.driver.interface :refer :all]
(metabase.driver.mongo [query-processor :as qp]
[util :refer [*mongo-connection* with-mongo-connection values->base-type]])))
(declare driver)
;;; ### Driver Helper Fns
(defn- table->column-names
"Return a set of the column names for TABLE."
[table]
(with-mongo-connection [^com.mongodb.DBApiLayer conn @(:db table)]
(->> (mc/find-maps conn (:name table))
(r/map keys)
(r/map set)
(r/reduce set/union))))
(defn- field->base-type
"Determine the base type of FIELD in the most ghetto way possible, via `values->base-type`."
[field]
{:pre [(map? field)]
:post [(keyword? %)]}
(with-mongo-connection [_ @(:db @(:table field))]
(values->base-type (field-values-lazy-seq driver field))))
;;; ## MongoDriver
(deftype MongoDriver []
IDriver
;;; ### Connection
(can-connect? [_ database]
(with-mongo-connection [^com.mongodb.DBApiLayer conn database]
(= (-> (cmd/db-stats conn)
(conv/from-db-object :keywordize)
:ok)
1.0)))
(can-connect-with-details? [this {:keys [user password host port dbname]}]
(assert (and host
dbname))
(can-connect? this (str "mongodb://"
user
(when password
(assert user "Can't have a password without a user!")
(str ":" password))
(when user "@")
host
(when port
(str ":" port))
"/"
dbname)))
;;; ### QP
(process-query [_ query]
(qp/process-and-run query))
;;; ### Syncing
(sync-in-context [_ database do-sync-fn]
(with-mongo-connection [_ database]
(do-sync-fn)))
(active-table-names [_ database]
(with-mongo-connection [^com.mongodb.DBApiLayer conn database]
(-> (mdb/get-collection-names conn)
(set/difference #{"system.indexes"}))))
(active-column-names->type [_ table]
(with-mongo-connection [_ @(:db table)]
(->> (table->column-names table)
(map (fn [column-name]
{(name column-name)
(field->base-type {:name (name column-name)
:table (delay table)})}))
(into {}))))
(table-pks [_ _]
#{"_id"})
ISyncDriverFieldValues
(field-values-lazy-seq [_ field]
(lazy-seq
(let [table @(:table field)]
(map (keyword (:name field))
(with-mongo-connection [^com.mongodb.DBApiLayer conn @(:db table)]
(mq/with-collection conn (:name table)
(mq/fields [(:name field)]))))))))
(def ^:const driver
"Concrete instance of the MongoDB driver."
(MongoDriver.))
(ns metabase.driver.mongo.query-processor
(:refer-clojure :exclude [find sort])
(:require [clojure.core.match :refer [match]]
[clojure.tools.logging :as log]
[colorize.core :as color]
(monger [collection :as mc]
[core :as mg]
[db :as mdb]
[operators :refer :all]
[query :refer :all])
[metabase.db :refer :all]
[metabase.driver :as driver]
[metabase.driver.query-processor :as qp :refer [*query*]]
[metabase.driver.mongo.util :refer [with-mongo-connection *mongo-connection* values->base-type]]
(metabase.models [database :refer [Database]]
[field :refer [Field]]
[table :refer [Table]]))
(:import (com.mongodb CommandResult
DBApiLayer)
(clojure.lang PersistentArrayMap)))
(declare apply-clause
annotate-native-results
annotate-results
eval-raw-command
field-id->kw
process-structured
process-and-run-structured)
;; # DRIVER QP INTERFACE
(defn process-and-run
"Process and run a MongoDB QUERY."
[{query-type :type database-id :database :as query}]
{:pre [(contains? #{:native :query} (keyword query-type))
(integer? database-id)]}
(with-mongo-connection [_ (sel :one :fields [Database :details] :id database-id)]
(case (keyword query-type)
:query (if (zero? (:source_table (:query query))) qp/empty-response
(let [generated-query (process-structured (:query query))]
(when-not qp/*disable-qp-logging*
(log/debug (color/magenta "\n******************** Generated Monger Query: ********************\n"
(with-out-str (clojure.pprint/pprint generated-query))
"*****************************************************************\n")))
(->> (eval generated-query)
(annotate-results (:query query)))))
:native (->> (eval-raw-command (:query (:native query)))
annotate-native-results))))
;; # NATIVE QUERY PROCESSOR
(defn eval-raw-command
"Evaluate raw MongoDB javascript code. This must be ran insided the body of a `with-mongo-connection`.
(with-mongo-connection [_ \"mongodb://localhost/test\"]
(eval-raw-command \"db.zips.findOne()\"))
-> {\"_id\" \"01001\", \"city\" \"AGAWAM\", ...}"
[^String command]
(assert *mongo-connection* "eval-raw-command must be ran inside the body of with-mongo-connection.")
(let [^CommandResult result (.doEval ^DBApiLayer *mongo-connection* command nil)]
(when-not (.ok result)
(throw (.getException result)))
(let [{result "retval"} (PersistentArrayMap/create (.toMap result))]
result)))
(defn annotate-native-results
"Package up the results in the way the frontend expects."
[results]
(if-not (sequential? results) (annotate-native-results [results])
{:status :completed
:row_count (count results)
:data {:rows results
:columns (keys (first results))}}))
;; # STRUCTURED QUERY PROCESSOR
;; ## AGGREGATION IMPLEMENTATIONS
(def ^:private aggregations
"Used internally by `defaggregation` to store the different aggregation patterns to match against."
(atom '()))
(def ^:dynamic *collection-name*
"String name of the collection (i.e., `Table`) that we're currently querying against."
nil)
(def ^:dynamic *constraints*
"Monger clauses generated from query dict `filter` clauses; bound dynamically so we can insert these as appropriate for various types of aggregations."
nil)
(defmacro defaggregation
"Define a new function that will be called when the `aggregation` clause in a structured query matches MATCH-BINDING.
(All functions defined with `defaggregation` are combined into a massive `match` statement inside `match-aggregation`).
These should emit a form that can be `eval`ed to get the query results; the `aggregate` function takes care of some of the
boilerplate for this form."
[match-binding & body]
`(swap! aggregations concat
(quote [~match-binding (try
~@body
(catch Throwable e#
(log/error (color/red ~(format "Failed to apply aggregation %s: " match-binding)
e#))))])))
(defn aggregate
"Generate a Monger `aggregate` form."
[& forms]
`(mc/aggregate ^DBApiLayer *mongo-connection* ~*collection-name* [~@(when *constraints*
[{$match *constraints*}])
~@(filter identity forms)]))
(defn field-id->$string
"Given a FIELD-ID, return a `$`-qualified field name for use in a Mongo aggregate query, e.g. `\"$user_id\"`."
[field-id]
(format "$%s" (name (field-id->kw field-id))))
(defaggregation ["rows"]
`(doall (with-collection ^DBApiLayer *mongo-connection* ~*collection-name*
~@(when *constraints* [`(find ~*constraints*)])
~@(mapcat apply-clause *query*))))
(defaggregation ["count"]
`[{:count (mc/count ^DBApiLayer *mongo-connection* ~*collection-name*
~*constraints*)}])
(defaggregation ["avg" field-id]
(aggregate {$group {"_id" nil
"avg" {$avg (field-id->$string field-id)}}}
{$project {"_id" false, "avg" true}}))
(defaggregation ["count" field-id]
(aggregate {$match {(field-id->kw field-id) {$exists true}}}
{$group {"_id" nil
"count" {$sum 1}}}
{$project {"_id" false, "count" true}}))
(defaggregation ["distinct" field-id]
(aggregate {$group {"_id" (field-id->$string field-id)}}
{$group {"_id" nil
"count" {$sum 1}}}
{$project {"_id" false, "count" true}}))
(defaggregation ["stddev" field-id]
nil) ; TODO
(defaggregation ["sum" field-id]
(aggregate {$group {"_id" nil ; TODO - I don't think this works for _id
"sum" {$sum (field-id->$string field-id)}}}
{$project {"_id" false, "sum" true}}))
(defmacro match-aggregation
"Match structured query `aggregation` clause against the clauses defined by `defaggregation`."
[aggregation]
`(match ~aggregation
~@@aggregations
~'_ nil))
;; ## BREAKOUT
;; This is similar to the aggregation stuff but has to be implemented separately since Mongo doesn't really have
;; GROUP BY functionality the same way SQL does.
;; This is annoying, since it effectively duplicates logic we have in the aggregation definitions above and the
;; clause definitions below, but the query we need to generate is different enough that I haven't found a cleaner
;; way of doing this yet.
;;
(defn breakout-aggregation->field-name+expression
"Match AGGREGATION clause of a structured query *that contains a `breakout` clause*, and return
a pair containing `[field-name aggregation-expression]`, which are used to generate the Mongo aggregate query."
[aggregation]
;; AFAIK these are the only aggregation types that make sense in combination with a breakout clause
;; or are we missing something?
;; At any rate these seem to be the most common use cases, so we can add more here if and when they're needed.
(match aggregation
["rows"] nil
["count"] ["count" {$sum 1}]
["avg" field-id] ["avg" {$avg (field-id->$string field-id)}]
["sum" field-id] ["sum" {$sum (field-id->$string field-id)}]))
(defn do-breakout
"Generate a Monger query from a structured QUERY dictionary that contains a `breakout` clause.
Since the Monger query we generate looks very different from ones we generate when no `breakout` clause
is present, this is essentialy a separate implementation :/"
[{aggregation :aggregation, field-ids :breakout, order-by :order_by, limit :limit, :as query}]
{:pre [(sequential? field-ids)
(every? integer? field-ids)]}
(let [[ag-field ag-clause] (breakout-aggregation->field-name+expression aggregation)
fields (->> (map field-id->kw field-ids)
(map name))
$fields (map field-id->$string field-ids)
fields->$fields (zipmap fields $fields)]
(aggregate {$group (merge {"_id" (if (= (count fields) 1) (first $fields)
fields->$fields)}
(when (and ag-field ag-clause)
{ag-field ag-clause})
(->> fields->$fields
(map (fn [[field $field]]
(when-not (= field "_id")
{field {$first $field}})))
(into {})))}
{$sort (->> order-by
(mapcat (fn [[field-id asc-or-desc]]
[(name (field-id->kw field-id)) (case asc-or-desc
"ascending" 1
"descending" -1)]))
(apply sorted-map))}
{$project (merge {"_id" false}
(when ag-field
{ag-field true})
(zipmap fields (repeat true)))}
(when limit
{$limit limit}))))
;; ## PROCESS-STRUCTURED
(defn process-structured
"Process a structured MongoDB QUERY.
This establishes some bindings, then:
* queries that contain `breakout` clauses are handled by `do-breakout`
* other queries are handled by `match-aggregation`, which hands off to the
appropriate fn defined by a `defaggregation`."
[{:keys [source_table aggregation breakout] :as query}]
(binding [*collection-name* (sel :one :field [Table :name] :id source_table)
*constraints* (when-let [filter-clause (:filter query)]
(apply-clause [:filter filter-clause]))
*query* (dissoc query :filter)]
;;
(if-not (empty? breakout) (do-breakout query)
(match-aggregation aggregation))))
;; ## ANNOTATION
;; TODO - This is similar to the implementation in generic-sql; can we combine them and move it into metabase.driver.query-processor?
(defn annotate-results
"Add column information, `row_count`, etc. to the results of a Mongo QP query."
[{:keys [source_table] :as query} results]
{:pre [(integer? source_table)]}
(let [field-name->field (sel :many :field->obj [Field :name] :table_id source_table)
column-keys (qp/order-columns {:query query} (keys (first results)))
column-names (map name column-keys)]
{:columns column-names
:cols (qp/get-column-info {:query query} column-names)
:rows (map #(map % column-keys)
results)}))
;; ## CLAUSE APPLICATION 2.0
(def ^{:arglists '([field-id])} field-id->kw
"Return the keyword name of a `Field` with ID FIELD-ID. Memoized."
(memoize
(fn [field-id]
{:pre [(integer? field-id)]
:post [(keyword? %)]}
(keyword (sel :one :field [Field :name] :id field-id)))))
(def ^:private clauses
"Used by `defclause` to store the clause definitions generated by it."
(atom '()))
(defmacro defclause
"Generate a new clause definition that will be called inside of a `match` statement
whenever CLAUSE matches MATCH-BINDING.
In general, these should emit a vector of forms to be included in the generated Monger query;
however, `filter` is handled a little differently (see below)."
[clause match-binding & body]
`(swap! clauses concat '[[~clause ~match-binding] (try
~@body
(catch Throwable e#
(log/error (color/red ~(format "Failed to process clause [%s %s]: " clause match-binding)
(.getMessage e#)))))]))
;; ### CLAUSE DEFINITIONS
;; ### fields
(defclause :fields field-ids
`[(fields ~(mapv field-id->kw field-ids))])
;; ### filter
;; !!! SPECIAL CASE - the results of this clause are bound to *constraints*, which is used differently
;; by the various defaggregation definitions or by do-breakout. Here, we just return a "constraints" map instead.
(defclause :filter ["INSIDE" lat-field-id lon-field-id lat-max lon-min lat-min lon-max]
(let [lat-field (field-id->kw lat-field-id)
lon-field (field-id->kw lon-field-id)]
{$and [{lat-field {$gte lat-min, $lte lat-max}}
{lon-field {$gte lon-min, $lte lon-max}}]}))
(defclause :filter ["IS_NULL" field-id]
{(field-id->kw field-id) {$exists false}})
(defclause :filter ["NOT_NULL" field-id]
{(field-id->kw field-id) {$exists true}})
(defclause :filter ["BETWEEN" field-id min max]
{(field-id->kw field-id) {$gte min
$lte max}})
(defclause :filter ["=" field-id value]
{(field-id->kw field-id) value})
(defclause :filter ["!=" field-id value]
{(field-id->kw field-id) {$ne value}})
(defclause :filter ["<" field-id value]
{(field-id->kw field-id) {$lt value}})
(defclause :filter [">" field-id value]
{(field-id->kw field-id) {$gt value}})
(defclause :filter ["<=" field-id value]
{(field-id->kw field-id) {$lte value}})
(defclause :filter [">=" field-id value]
{(field-id->kw field-id) {$gte value}})
(defclause :filter ["AND" & subclauses]
{$and (mapv #(apply-clause [:filter %]) subclauses)})
(defclause :filter ["OR" & subclauses]
{$or (mapv #(apply-clause [:filter %]) subclauses)})
;; ### limit
(defclause :limit value
`[(limit ~value)])
;; ### order_by
(defclause :order_by field-dir-pairs
(let [sort-options (mapcat (fn [[field-id direction]]
[(field-id->kw field-id) (case (keyword direction)
:ascending 1
:descending -1)])
field-dir-pairs)]
(when (seq sort-options)
`[(sort (array-map ~@sort-options))])))
;; ### page
(defclause :page page-clause
(let [{page-num :page items-per-page :items} page-clause
num-to-skip (* (dec page-num) items-per-page)]
`[(skip ~num-to-skip)
(limit ~items-per-page)]))
;; ### APPLY-CLAUSE
(defmacro match-clause
"Generate a `match` form against all the clauses defined by `defclause`."
[clause]
`(match ~clause
~@@clauses
~'_ nil))
(defn apply-clause
"Match CLAUSE against a clause defined by `defclause`."
[clause]
(match-clause clause))
(ns metabase.driver.mongo.util
"`*mongo-connection*`, `with-mongo-connection`, and other functions shared between several Mongo driver namespaces."
(:require [clojure.tools.logging :as log]
[colorize.core :as color]
[monger.core :as mg]
[metabase.driver :as driver]))
(def ^:dynamic *mongo-connection*
"Connection to a Mongo database.
Bound by top-level `with-mongo-connection` so it may be reused within its body."
nil)
(defn -with-mongo-connection
"Run F with a new connection (bound to `*mongo-connection*`) to DATABASE.
Don't use this directly; use `with-mongo-connection`."
[f database]
(let [connection-string (if (map? database) (-> database :details :conn_str)
database)
_ (assert (string? connection-string) (str "with-mongo-connection failed: connection string is must be a string, got: " connection-string))
{conn :conn mongo-connection :db} (mg/connect-via-uri connection-string)]
(log/debug (color/cyan "<< OPENED NEW MONGODB CONNECTION >>"))
(try
(binding [*mongo-connection* mongo-connection]
(f *mongo-connection*))
(finally
(mg/disconnect conn)))))
(defmacro with-mongo-connection
"Open a new MongoDB connection to DATABASE-OR-CONNECTION-STRING, bind connection to BINDING, execute BODY, and close the connection.
The DB connection is re-used by subsequent calls to `with-mongo-connection` within BODY.
(We're smart about it: DATABASE isn't even evaluated if `*mongo-connection*` is already bound.)
;; delay isn't derefed if *mongo-connection* is already bound
(with-mongo-connection [^com.mongodb.DBApiLayer conn @(:db (sel :one Table ...))]
...)
;; You can use a string instead of a Database
(with-mongo-connection [^com.mongodb.DBApiLayer conn \"mongodb://127.0.0.1:27017/test\"]
...)"
[[binding database] & body]
`(let [f# (fn [~binding]
~@body)]
(if *mongo-connection* (f# *mongo-connection*)
(-with-mongo-connection f# ~database))))
;; TODO - this is actually more sophisticated than the one used for annotation in the GenericSQL driver, which just takes the
;; types of the values in the first row.
;; We should move this somewhere where it can be shared amongst the drivers and rewrite GenericSQL to use it instead.
(defn values->base-type
"Given a sequence of values, return `Field` `base_type` in the most ghetto way possible.
This just gets counts the types of *every* value and returns the `base_type` for class whose count was highest."
[values-seq]
{:pre [(sequential? values-seq)]}
(or (->> values-seq
(filter identity)
(group-by type)
(map (fn [[type valus]]
[type (count valus)]))
(sort-by second)
first
first
driver/class->base-type)
:UnknownField))
(ns metabase.driver.query-processor
"Preprocessor that does simple transformations to all incoming queries, simplifing the driver-specific implementations.")
"Preprocessor that does simple transformations to all incoming queries, simplifing the driver-specific implementations."
(:require [clojure.core.match :refer [match]]
[clojure.tools.logging :as log]
[metabase.db :refer :all]
[metabase.driver.interface :as i]
[metabase.models.field :refer [Field field->fk-table]]))
(declare add-implicit-breakout-order-by
get-special-column-info
preprocess-cumulative-sum
preprocess-structured
remove-empty-clauses)
(def ^:dynamic *query* "The structured query we're currently processing, before any preprocessing occurs (i.e. the `:query` part of the API call body)"
;; # CONSTANTS
(def ^:const empty-response
"An empty response dictionary to return when there's no query to run."
{:rows [], :columns [], :cols []})
;; # DYNAMIC VARS
(def ^:dynamic *query*
"The query we're currently processing (i.e., the body of the query API call)."
nil)
(defn preprocess [{query-type :type :as query}]
(def ^:dynamic *disable-qp-logging*
"Should we disable logging for the QP? (e.g., during sync we probably want to turn it off to keep logs less cluttered)."
false)
;; # PREPROCESSOR
(defn preprocess
"Preprocess QUERY dict, applying various driver-independent transformations to it before it is passed to specific driver query processor implementations."
[{query-type :type :as query}]
(case (keyword query-type)
:query (preprocess-structured query)
:native query))
(defn preprocess-structured [query]
(update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by)))
(defn preprocess-structured
"Preprocess a strucuted QUERY dict."
[query]
(let [preprocessed-query (update-in query [:query] #(->> %
remove-empty-clauses
add-implicit-breakout-order-by
preprocess-cumulative-sum))]
(when-not *disable-qp-logging*
(log/debug (colorize.core/cyan "\n******************** PREPROCESSED: ********************\n"
(with-out-str (clojure.pprint/pprint preprocessed-query)) "\n"
"*******************************************************\n")))
preprocessed-query))
;; ## PREPROCESSOR FNS
......@@ -57,3 +91,197 @@
[field-id "ascending"]))
(apply conj (or order-by-subclauses []))
(assoc query :order_by)))))
;; ### PREPROCESS-CUMULATIVE-SUM
(defn preprocess-cumulative-sum
"Rewrite queries containing a cumulative sum (`cum_sum`) aggregation to simply fetch the values of the aggregate field instead.
(Cumulative sum is a special case; it is implemented in post-processing)."
[{[ag-type ag-field :as aggregation] :aggregation, breakout-fields :breakout, order-by :order_by, :as query}]
(let [cum-sum? (= ag-type "cum_sum")
cum-sum-with-breakout? (and cum-sum?
(not (empty? breakout-fields)))
cum-sum-with-same-breakout? (and cum-sum-with-breakout?
(= (count breakout-fields) 1)
(= (first breakout-fields) ag-field))]
;; Cumulative sum is only applicable if it has breakout fields
;; For these, store the cumulative sum field under the key :cum_sum so we know which one to sum later
;; Cumulative summing happens in post-processing
(cond
;; If there's only one breakout field that is the same as the cum_sum field, re-write this as a "rows" aggregation
;; to just fetch all the values of the field in question.
cum-sum-with-same-breakout? (-> query
(dissoc :breakout)
(assoc :cum_sum ag-field
:aggregation ["rows"]
:fields [ag-field]))
;; Otherwise if we're breaking out on different fields, rewrite the query as a "sum" aggregation
cum-sum-with-breakout? (assoc query
:cum_sum ag-field
:aggregation ["sum" ag-field])
;; Cumulative sum without any breakout fields should just be treated the same way as "sum". Rewrite query as such
cum-sum? (assoc query
:aggregation ["sum" ag-field])
;; Otherwise if this isn't a cum_sum query return it as-is
:else query)))
;; # POSTPROCESSOR
;; ### POST-PROCESS-CUMULATIVE-SUM
(defn post-process-cumulative-sum
"Cumulative sum the values of the aggregate `Field` in RESULTS."
{:arglists '([query results])}
[{cum-sum-field :cum_sum, :as query} {rows :rows, cols :cols, :as results}]
(if-not cum-sum-field results
(let [ ;; Determine the index of the field we need to cumulative sum
cum-sum-field-index (->> cols
(map-indexed (fn [i {field-name :name, field-id :id}]
(when (or (= field-name "sum")
(= field-id cum-sum-field))
i)))
(filter identity)
first)
_ (assert (integer? cum-sum-field-index))
;; Now make a sequence of cumulative sum values for each row
values (->> rows
(map #(nth % cum-sum-field-index))
(reductions +))
;; Update the values in each row
rows (map (fn [row value]
(assoc (vec row) cum-sum-field-index value))
rows values)]
(assoc results :rows rows))))
;; ### ADD-ROW-COUNT-AND-STATUS
(defn add-row-count-and-status
"Wrap the results of a successfully processed query in the format expected by the frontend (add `row_count` and `status`)."
[results]
{:pre [(map? results)
(sequential? (:columns results))
(sequential? (:cols results))
(sequential? (:rows results))]}
{:row_count (count (:rows results))
:status :completed
:data results})
;; ### POST-PROCESS
(defn post-process
"Apply post-processing steps to the RESULTS of a QUERY, such as applying cumulative sum."
[driver query results]
(->> (case (keyword (:type query))
:native results
:query (let [query (:query query)]
(->> results
(post-process-cumulative-sum query))))
add-row-count-and-status))
;; # COMMON ANNOTATION FNS
(defn get-column-info
"Get extra information about result columns. This is done by looking up matching `Fields` for the `Table` in QUERY or looking up
information about special columns such as `count` via `get-special-column-info`."
[{{table-id :source_table} :query, :as query} column-names]
{:pre [(integer? table-id)
(every? string? column-names)]}
(let [columns (->> (sel :many [Field :id :table_id :name :description :base_type :special_type] ; lookup columns with matching names for this Table
:table_id table-id :name [in (set column-names)])
(map (fn [{:keys [name] :as column}] ; build map of column-name -> column
{name (-> (select-keys column [:id :table_id :name :description :base_type :special_type])
(assoc :extra_info (if-let [fk-table (field->fk-table column)]
{:target_table_id (:id fk-table)}
{})))}))
(into {}))]
(->> column-names
(map (fn [column-name]
(or (columns column-name) ; try to get matching column from the map we build earlier
(get-special-column-info query column-name))))))) ; if it's not there then it's a special column like `count`
(defn get-special-column-info
"Get info like `:base_type` and `:special_type` for a special aggregation column like `count` or `sum`."
[query column-name]
{:pre [(:query query)]}
(merge {:name column-name
:id nil
:table_id nil
:description nil}
(let [aggregation-type (keyword column-name) ; For aggregations of a specific Field (e.g. `sum`)
field-aggregation? (contains? #{:avg :stddev :sum} aggregation-type)] ; lookup the field we're aggregating and return its
(if field-aggregation? (sel :one :fields [Field :base_type :special_type] ; type info. (The type info of the aggregate result
:id (-> query :query :aggregation second)) ; will be the same.)
(case aggregation-type ; Otherwise for general aggregations such as `count`
:count {:base_type :IntegerField ; just return hardcoded type info
:special_type :number})))))
(def ^:dynamic *uncastify-fn*
"Function that should be called to transform a column name from the set of results to one that matches a `Field` in the DB.
The default implementation returns the column name as is; others, such as `generic-sql`, provide implementations that remove
remove casting statements and the like."
identity)
;; TODO - since this was moved over from generic SQL some of its functionality should be reworked. And dox updated.
;; (Since castification is basically SQL-specific it would make sense to handle castification / decastification separately)
;; Fix this when I'm not burnt out on driver code
(defn -order-columns
"Don't use this directly; use `order-columns`.
This broken out for testability -- it doesn't depend on data from the DB."
[fields breakout-field-ids field-field-ids castified-field-names]
;; Basically we want to convert both BREAKOUT-FIELD-IDS and CASTIFIED-FIELD-NAMES to maps like:
;; {:name "updated_at"
;; :id 224
;; :castified (keyword "CAST(updated_at AS DATE)")
;; :position 21}
;; Then we can order things appropriately and return the castified names.
(let [uncastified->castified (zipmap (map #(*uncastify-fn* (name %)) castified-field-names) castified-field-names)
fields (map #(assoc % :castified (uncastified->castified (:name %)))
fields)
id->field (zipmap (map :id fields) fields)
castified->field (zipmap (map :castified fields) fields)
breakout-fields (->> breakout-field-ids
(map id->field))
field-fields (->> field-field-ids
(map id->field))
other-fields (->> castified-field-names
(map (fn [castified-name]
(or (castified->field castified-name)
{:castified castified-name ; for aggregate fields like 'count' create a fake map
:position 0}))) ; with position 0 so it is returned ahead of the other fields
(filter #(not (or (contains? (set breakout-field-ids)
(:id %))
(contains? (set field-field-ids)
(:id %)))))
(sort-by :position))]
(->> (concat breakout-fields field-fields other-fields)
(map :castified)
(filter identity))))
(defn order-columns
"Return CASTIFIED-FIELD-NAMES in the order we'd like to display them in the output.
They should be ordered as follows:
1. All breakout fields, in the same order as BREAKOUT-FIELD-IDS
2. Any aggregate fields like `count`
3. Fields included in the `fields` clause
4. All other columns in the same order as `Field.position`."
[{{source-table :source_table, breakout-field-ids :breakout, field-field-ids :fields} :query} castified-field-names]
{:post [(every? keyword? %)]}
(try
(-order-columns (sel :many :fields [Field :id :name :position] :table_id source-table)
breakout-field-ids
field-field-ids
castified-field-names)
(catch Exception e
(.printStackTrace e)
(log/error (.getMessage e)))))
......@@ -5,7 +5,8 @@
[colorize.core :as color]
[korma.core :as k]
[metabase.db :refer :all]
[metabase.driver.interface :refer :all]
(metabase.driver [interface :refer :all]
[query-processor :as qp])
[metabase.driver.sync.queries :as queries]
(metabase.models [field :refer [Field] :as field]
[foreign-key :refer [ForeignKey]]
......@@ -27,52 +28,54 @@
(defn sync-database!
"Sync DATABASE and all its Tables and Fields."
[driver database]
(sync-in-context driver database
(fn []
(log/info (color/blue (format "Syncing database %s..." (:name database))))
(let [active-table-names (active-table-names driver database)
table-name->id (sel :many :field->id [Table :name] :db_id (:id database) :active true)]
(assert (set? active-table-names) "active-table-names should return a set.")
(assert (every? string? active-table-names) "active-table-names should return the names of Tables as *strings*.")
;; First, let's mark any Tables that are no longer active as such.
;; These are ones that exist in table-name->id but not in active-table-names.
(log/debug "Marking inactive tables...")
(doseq [[table-name table-id] table-name->id]
(when-not (contains? active-table-names table-name)
(upd Table table-id :active false)
(log/info (format "Marked table %s.%s as inactive." (:name database) table-name))
;; We need to mark driver Table's Fields as inactive so we don't expose them in UI such as FK selector (etc.) This can happen in the background
(future (k/update Field
(k/where {:table_id table-id})
(k/set-fields {:active false})))))
;; Next, we'll create new Tables (ones that came back in active-table-names but *not* in table-name->id)
(log/debug "Creating new tables...")
(let [existing-table-names (set (keys table-name->id))]
(doseq [active-table-name active-table-names]
(when-not (contains? existing-table-names active-table-name)
(ins Table :db_id (:id database), :active true, :name active-table-name)
(log/info (format "Found new table: %s.%s" (:name database) active-table-name))))))
;; Now sync the active tables
(log/debug "Syncing active tables...")
(->> (sel :many Table :db_id (:id database) :active true)
(map #(assoc % :db (delay database))) ; replace default delays with ones that reuse database (and don't require a DB call)
(sync-database-active-tables! driver))
(log/info (color/blue (format "Finished syncing database %s." (:name database)))))))
(binding [qp/*disable-qp-logging* true]
(sync-in-context driver database
(fn []
(log/info (color/blue (format "Syncing database %s..." (:name database))))
(let [active-table-names (active-table-names driver database)
table-name->id (sel :many :field->id [Table :name] :db_id (:id database) :active true)]
(assert (set? active-table-names) "active-table-names should return a set.")
(assert (every? string? active-table-names) "active-table-names should return the names of Tables as *strings*.")
;; First, let's mark any Tables that are no longer active as such.
;; These are ones that exist in table-name->id but not in active-table-names.
(log/debug "Marking inactive tables...")
(doseq [[table-name table-id] table-name->id]
(when-not (contains? active-table-names table-name)
(upd Table table-id :active false)
(log/info (format "Marked table %s.%s as inactive." (:name database) table-name))
;; We need to mark driver Table's Fields as inactive so we don't expose them in UI such as FK selector (etc.) This can happen in the background
(future (k/update Field
(k/where {:table_id table-id})
(k/set-fields {:active false})))))
;; Next, we'll create new Tables (ones that came back in active-table-names but *not* in table-name->id)
(log/debug "Creating new tables...")
(let [existing-table-names (set (keys table-name->id))]
(doseq [active-table-name active-table-names]
(when-not (contains? existing-table-names active-table-name)
(ins Table :db_id (:id database), :active true, :name active-table-name)
(log/info (format "Found new table: %s.%s" (:name database) active-table-name))))))
;; Now sync the active tables
(log/debug "Syncing active tables...")
(->> (sel :many Table :db_id (:id database) :active true)
(map #(assoc % :db (delay database))) ; replace default delays with ones that reuse database (and don't require a DB call)
(sync-database-active-tables! driver))
(log/info (color/blue (format "Finished syncing database %s." (:name database))))))))
(defn sync-table!
"Sync a *single* TABLE by running all the sync steps for it.
This is used *instead* of `sync-database!` when syncing just one Table is desirable."
[driver table]
(let [database @(:db table)]
(sync-in-context driver database
(fn []
(sync-database-active-tables! driver [table])))))
(binding [qp/*disable-qp-logging* true]
(sync-in-context driver database
(fn []
(sync-database-active-tables! driver [table]))))))
;; ### sync-database-active-tables! -- runs the sync-table steps over sequence of Tables
......@@ -264,7 +267,6 @@
(extend-protocol ISyncDriverFieldPercentUrls ; Default implementation
Object
(field-percent-urls [this field]
(log/warn (color/red (format "Using default (read: slow) implementation of field-percent-urls for driver %s." (.getName (class this)))))
(assert (extends? ISyncDriverFieldValues (class this))
"A sync driver implementation that doesn't implement ISyncDriverFieldPercentURLs must implement ISyncDriverFieldValues.")
(let [field-values (field-values-lazy-seq this field)]
......@@ -312,7 +314,6 @@
(extend-protocol ISyncDriverFieldAvgLength ; Default implementation
Object
(field-avg-length [this field]
(log/warn (color/red (format "Using default (read: slow) implementation of field-avg-length for driver %s." (.getName (class this)))))
(assert (extends? ISyncDriverFieldValues (class this))
"A sync driver implementation that doesn't implement ISyncDriverFieldAvgLength must implement ISyncDriverFieldValues.")
(let [field-values (field-values-lazy-seq this field)
......
......@@ -50,7 +50,7 @@
{:from (email-from-address)
:to recipients
:subject subject
:body (condp = message-type
:body (case message-type
:text message
:html [{:type "text/html; charset=utf-8"
:content message}])})]
......
......@@ -18,7 +18,7 @@
:can_read (delay (org-can-read organization_id))
:can_write (delay (org-can-write organization_id))))
(defmethod pre-cascade-delete Database [_ {:keys [id]}]
(defmethod pre-cascade-delete Database [_ {:keys [id] :as database}]
(cascade-delete 'metabase.models.table/Table :db_id id)
(cascade-delete 'metabase.models.query/Query :database_id id))
......
......@@ -2,7 +2,6 @@
(:require [clojure.tools.logging :as log]
[korma.core :refer :all]
[metabase.db :refer :all]
[metabase.db.metadata-queries :as metadata]
[metabase.util :as u]))
;; ## Entity + DB Multimethods
......@@ -36,6 +35,9 @@
(or (contains? #{:category :city :state :country} (keyword special_type))
(= (keyword base_type) :BooleanField)))
(def ^:private field-distinct-values
(u/runtime-resolved-fn 'metabase.db.metadata-queries 'field-distinct-values))
(defn create-field-values
"Create `FieldValues` for a `Field`."
{:arglists '([field]
......@@ -46,7 +48,7 @@
(log/debug (format "Creating FieldValues for Field %d..." field-id))
(ins FieldValues
:field_id field-id
:values (metadata/field-distinct-values field)
:values (field-distinct-values field)
:human_readable_values human-readable-values))
(defn create-field-values-if-needed
......
......@@ -62,7 +62,7 @@
:aggregation ["count"]
:breakout [nil]
:limit nil}
:database @org-id}
:database @db-id}
:id $
:display "scalar"
:visualization_settings {:global {:title nil}}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment