Skip to content
Snippets Groups Projects
Commit 508e416a authored by Cam Saul's avatar Cam Saul
Browse files

much simpler driver implementations

parent 98261c4a
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@
;; It would be more efficient if we could let the QP could macroexpand normally for predefined queries like these
(defn- field-query [field query]
(->> (driver/process-and-run
(->> (driver/driver-process-query
{:type :query
:database ((u/deref-> field :table :db) :id)
:query (assoc query
......
(ns metabase.driver
(:require [clojure.tools.logging :as log]
(:require clojure.java.classpath
[clojure.tools.logging :as log]
[clojure.tools.namespace.find :as ns-find]
[cheshire.core :as cheshire]
[medley.core :refer :all]
[metabase.db :refer [exists? ins sel upd]]
(metabase.driver [result :as result])
(metabase.models [database :refer [Database]]
[query-execution :refer [QueryExecution]])
[metabase.util :as util]))
[metabase.util :as u]))
(declare -dataset-query query-fail query-complete save-query-execution)
(def available-drivers
;; TODO - look this up at runtime
(def ^:const available-drivers
"DB drivers that are available (pairs of `[namespace user-facing-name]`)."
[["h2" "H2"]
["postgres" "PostgreSQL"]])
;; TODO lazily requiring this way is a bit wonky.
;; We should rewrite this to load all sub-namespaces on first load like `metabase.task` does
(defn db-dispatch-fn
"Returns a dispatch fn for multi-methods that keys off of a database's `:engine`.
(defprotocol IDriver
;; Connection
(can-connect? [this database]
"Check whether we can connect to DATABASE and perform a simple query.
(To check whether we can connect to a database given only its details, use `can-connect-with-details?` instead).
The correct driver implementation is loaded dynamically to avoid having to require the files elsewhere in the codebase.
IMPL-NAMESPACE is the namespace we should load relative to the driver, e.g.
(can-connect? (sel :one Database :id 1))")
(defmulti my-multimethod (db-dispatch-fn \"metadata\"))
(can-connect-with-details? [this details-map]
"Check whether we can connect to a database and performa a simple query.
Returns true if we can, otherwise returns false or throws an Exception.
Would load `metabase.driver.postgres.metadata` for a `Database` whose `:engine` was `:postgres`."
[impl-namespace]
(let [memoized-dispatch (memoize (fn [engine] ; memoize this so we don't need to call require every single dispatch
(require (symbol (str "metabase.driver." (name engine) "." impl-namespace)))
(keyword engine)))]
(fn [{:keys [engine]}]
{:pre [engine]}
(memoized-dispatch engine))))
(can-connect-with-details? {:engine :postgres, :dbname \"book\", ...})")
;; Syncing
(sync-database! [this database]
"Sync DATABASE and all its Tables and Fields.")
(defmulti process-and-run
"Process a query of type `query` (implemented by various DB drivers)."
(let [database-id->database (memoize
(fn [database-id]
(sel :one [Database :engine] :id database-id))) ; actually we just need :engine for dispatch
dispatch-fn (db-dispatch-fn "query-processor")]
(fn [{database-id :database}]
(dispatch-fn (database-id->database database-id)))))
(sync-table! [this table]
"Sync TABLE and all its Fields.")
;; Query Processing
(process-query [this query]
"Process a native or structured query."))
(defn- execute-query
"Process and run a query and return results."
[{:keys [type] :as query}]
(case (keyword type)
:native (process-and-run query)
:query (process-and-run query)
:result (result/process-and-run query)))
;; ## Driver Lookup
(defmulti connection-details
"Return a map of connection details (in format usable by korma or equivalent) for DATABASE."
(db-dispatch-fn "connection"))
(def ^{:arglists '([engine])} engine->driver
(memoize
(fn [engine]
(let [ns-symb (symbol (format "metabase.driver.%s" (name engine)))]
(require ns-symb)
(var-get (ns-resolve ns-symb 'driver))))))
(defmulti connection
"Return a [korma or equivalent] connection to DATABASE."
(db-dispatch-fn "connection"))
;; Can the type of a DB change?
(def ^{:arglists '([database-id])} database-id->driver
(memoize
(fn [database-id]
(engine->driver (sel :one :field [Database :engine] :id database-id)))))
(defmulti can-connect?
"Check whether we can connect to DATABASE and perform a simple query.
(To check whether we can connect to a database given only its details, use `can-connect-with-details?` instead).
(can-connect? (sel :one Database :id 1))"
(db-dispatch-fn "connection"))
;; ## Implementation-Agnostic Driver API
(defmulti can-connect-with-details?
"Check whether we can connect to a database and performa a simple query.
Returns true if we can, otherwise returns false or throws an Exception.
(defn driver-can-connect? [database]
(can-connect? ^IDriver (engine->driver (:engine database)) database))
(can-connect-with-details? {:engine :postgres, :dbname \"book\", ...})"
(db-dispatch-fn "connection"))
(defn driver-can-connect-with-details [details])
(defmulti sync-database
"Update the metadata for ALL `Tables` within a given DATABASE, creating new `Tables` if they don't already exist.
(This is executed in parallel.)"
(db-dispatch-fn "sync"))
(defn driver-sync-database! [database]
(sync-database! ^IDriver (engine->driver (:engine database)) database))
(defmulti sync-table
"Update the metadata for a SINGLE `Table`, creating it if it doesn't already exist.
(This is executed in parallel.)"
(db-dispatch-fn "sync"))
(defn driver-sync-table! [table]
(sync-table! ^IDriver (database-id->driver (:db_id table)) table))
(declare -dataset-query query-fail query-complete save-query-execution)
(defn driver-process-query [query]
(process-query ^IDriver (database-id->driver (:database query)) query))
;; ## DEPRECATED API -- Pending Removal
(defn connection [database]
nil)
;; ## Query Execution Stuff
(defn- execute-query
"Process and run a query and return results."
[{:keys [type] :as query}]
(case (keyword type)
:native (driver-process-query query)
:query (driver-process-query query)
:result (result/process-and-run query)))
(defn dataset-query
"Process and run a json based dataset query and return results.
......@@ -117,8 +122,8 @@
:version (get saved_query :version 0)
:status :starting
:error ""
:started_at (util/new-sql-timestamp)
:finished_at (util/new-sql-timestamp)
:started_at (u/new-sql-timestamp)
:finished_at (u/new-sql-timestamp)
:running_time 0
:result_rows 0
:result_file ""
......@@ -159,7 +164,7 @@
[query-execution error-message]
(let [updates {:status :failed
:error error-message
:finished_at (util/new-sql-timestamp)
:finished_at (u/new-sql-timestamp)
:running_time (- (System/currentTimeMillis) (:start_time_millis query-execution))}]
;; record our query execution and format response
(-> query-execution
......@@ -177,9 +182,9 @@
"Save QueryExecution state and construct a completed (successful) query response"
[query-execution query-result cache-result]
;; record our query execution and format response
(-> (util/assoc* query-execution
(-> (u/assoc* query-execution
:status :completed
:finished_at (util/new-sql-timestamp)
:finished_at (u/new-sql-timestamp)
:running_time (- (System/currentTimeMillis) (:start_time_millis <>))
:result_rows (get query-result :row_count 0)
:result_data (if cache-result
......
(ns metabase.driver.generic-sql
(:require [clojure.tools.logging :as log]
[metabase.driver :as driver]
[metabase.driver.sync :as driver-sync]
(metabase.driver.generic-sql [connection :as connection]
[sync :as sync]
[query-processor :as qp]
[util :refer :all])))
(defmacro deftype+
"Same as `deftype` but define an extra constructor fn that takes params as kwargs."
[name fields convenience-fn-name & body]
`(do (deftype ~name ~fields
~@body)
(defn ~convenience-fn-name [& {:keys ~fields}]
(new ~name ~@fields))))
(deftype+ SqlDriver [column->base-type
connection-details->korma-connection
database->connection-details
sql-string-length-fn]
make-sql-driver
driver/IDriver
;; Connection
(can-connect? [_ database]
(try (connection/test-connection (-> database
database->connection-details
connection-details->korma-connection))
(catch Throwable e
(log/error "Failed to connect to database:" (.getMessage e))
false)))
(can-connect-with-details? [_ details]
(connection/test-connection (connection-details->korma-connection details)))
;; Syncing
(sync-database! [_ database]
(with-jdbc-metadata [md database]
(driver-sync/sync-database! (sync/->GenericSqlSyncDriverDatasource column->base-type sql-string-length-fn md) database)))
(sync-table! [_ table]
(let [database @(:db table)]
(with-jdbc-metadata [md database]
(driver-sync/sync-table! (sync/->GenericSqlSyncDriverDatasource column->base-type sql-string-length-fn md) table))))
;; Query Processing
(process-query [_ query]
(qp/process-and-run query)))
......@@ -13,13 +13,3 @@
first
vals
first)))
(defn can-connect?
"Check whether we can connect to a DATABASE and perform a very simple SQL query.
(can-connect? (sel :one Database ....)) -> true"
[database]
(try (test-connection (driver/connection database))
(catch Throwable e
(log/error "Failed to connect to database:" (.getMessage e))
false)))
......@@ -15,7 +15,7 @@
;; # NEW IMPL
(deftype GenericSqlSyncDriver [column->base-type sql-string-length-fn ^java.sql.DatabaseMetaData metadata]
(deftype GenericSqlSyncDriverDatasource [column->base-type sql-string-length-fn ^java.sql.DatabaseMetaData metadata]
sync/ISyncDriverDataSource
(active-table-names [_ database]
(->> (.getTables metadata nil nil nil (into-array String ["TABLE"]))
......@@ -76,12 +76,3 @@
(aggregate (count :*) :count)
(where {(keyword (:name field)) [like "http%://_%.__%"]})) first :count)]
(float (/ url-count total-non-null-count)))))))
(defn sync-database! [column->base-type sql-string-length-fn database]
(with-jdbc-metadata [md database]
(sync/sync-database! (GenericSqlSyncDriver. column->base-type sql-string-length-fn md) database)))
(defn sync-table! [column->base-type sql-string-length-fn table]
(let [database @(:db table)]
(with-jdbc-metadata [md database]
(sync/sync-table! (GenericSqlSyncDriver. column->base-type sql-string-length-fn md) table))))
(ns metabase.driver.h2.sync
"Implementation of `sync-tables` for H2."
(:require [metabase.driver.generic-sql.sync :as generic]
[metabase.driver :refer [sync-database sync-table]]))
(ns metabase.driver.h2
(:require [clojure.set :as set]
[korma.db :as kdb]
[metabase.driver :as driver]
[metabase.driver.generic-sql :as generic-sql]))
;; ## CONNECTION
(defn- connection-details->korma-connection [details-map]
(korma.db/h2 (set/rename-keys details-map {:conn_str :db})))
(defn- database->connection-details [database]
(:details database))
;; ## SYNCING
(def ^:const column->base-type
"Map of H2 Column types -> Field base types. (Add more mappings here as needed)"
......@@ -68,11 +80,12 @@
:YEAR :IntegerField
(keyword "DOUBLE PRECISION") :FloatField})
(def ^:const ^:private sql-string-length-fn
:LENGTH)
(defmethod sync-database :h2 [database]
(generic/sync-database! column->base-type sql-string-length-fn database))
;; ## DRIVER
(defmethod sync-table :h2 [table]
(generic/sync-table! column->base-type sql-string-length-fn table))
(def ^:const driver
(generic-sql/make-sql-driver
:column->base-type column->base-type
:connection-details->korma-connection connection-details->korma-connection
:database->connection-details database->connection-details
:sql-string-length-fn :LENGTH
:timezone->set-timezone-sql nil))
(ns metabase.driver.h2.connection
(:require [clojure.set :as set]
korma.db
[metabase.driver :refer [can-connect? can-connect-with-details? connection connection-details]]
[metabase.driver.generic-sql.connection :as generic]))
(defmethod connection-details :h2 [{:keys [details]}]
(set/rename-keys details {:conn_str :db}))
(defmethod connection :h2 [database]
(korma.db/h2 (connection-details database)))
(defmethod can-connect? :h2 [database]
(generic/can-connect? database))
(defmethod can-connect-with-details? :h2 [details-map]
(let [connection (korma.db/h2 (set/rename-keys details-map {:conn_str :db}))]
(generic/test-connection connection)))
(ns metabase.driver.h2.query-processor
(:require [metabase.driver.generic-sql.query-processor :as generic]
[metabase.driver :refer [process-and-run]]))
(defmethod process-and-run :h2 [query]
(generic/process-and-run query))
(ns metabase.driver.postgres.sync
"Implementation of `sync-tables` for Postgres."
(:require [metabase.driver :refer [sync-database sync-table]]
[metabase.driver.generic-sql.sync :as generic]))
(ns metabase.driver.postgres
(:require [clojure.set :refer [rename-keys]]
[clojure.string :as s]
[korma.db :as kdb]
[swiss.arrows :refer :all]
[metabase.config :as config]
[metabase.driver :as driver]
[metabase.driver.generic-sql :as generic-sql]))
;; ## SYNCING
(def ^:const column->base-type
"Map of Postgres column types -> Field base types.
......@@ -64,11 +70,41 @@
(keyword "timestamp with timezone") :DateTimeField
(keyword "timestamp without timezone") :DateTimeField})
(def ^:const ^:private sql-string-length-fn
:CHAR_LENGTH)
(defmethod sync-database :postgres [database]
(generic/sync-database! column->base-type sql-string-length-fn database))
;; ## CONNECTION
(defn- connection-details->korma-connection [details-map]
(kdb/postgres (rename-keys details-map {:dbname :db})))
(defn- database->connection-details [database]
(let [details (-<>> database :details :conn_str ; get conn str like "password=corvus user=corvus ..."
(s/split <> #" ") ; split into k=v pairs
(map (fn [pair] ; convert to {:k v} pairs
(let [[k v] (s/split pair #"=")]
{(keyword k) v})))
(reduce conj {})) ; combine into single dict
{:keys [host dbname port host]} details]
(-> details
(assoc :host host ; e.g. "localhost"
:make-pool? false
:db-type :postgres ; HACK hardcoded to postgres for time being until API has a way to choose DB type !
:port (Integer/parseInt port)) ; convert :port to an Integer
(cond-> (config/config-bool :mb-postgres-ssl) (assoc :ssl true :sslfactory "org.postgresql.ssl.NonValidatingFactory"))
(rename-keys {:dbname :db}))))
;; ## QP
(defn- timezone->set-timezone-sql [timezone]
(format "SET LOCAL timezone TO '%s';" timezone))
;; ## DRIVER
(defmethod sync-table :postgres [table]
(generic/sync-table! column->base-type sql-string-length-fn table))
(def ^:const driver
(generic-sql/make-sql-driver
:column->base-type column->base-type
:connection-details->korma-connection connection-details->korma-connection
:database->connection-details database->connection-details
:sql-string-length-fn :CHAR_LENGTH
:timezone->set-timezone-sql timezone->set-timezone-sql))
(ns metabase.driver.postgres.connection
(:require [clojure.set :refer [rename-keys]]
[clojure.string :as s]
[korma.db :as kdb]
[swiss.arrows :refer :all]
[metabase.config :as config]
[metabase.driver :refer [can-connect? can-connect-with-details? connection connection-details]]
[metabase.driver.generic-sql.connection :as generic]))
(defmethod connection-details :postgres [database]
(let [details (-<>> database :details :conn_str ; get conn str like "password=corvus user=corvus ..."
(s/split <> #" ") ; split into k=v pairs
(map (fn [pair] ; convert to {:k v} pairs
(let [[k v] (s/split pair #"=")]
{(keyword k) v})))
(reduce conj {})) ; combine into single dict
{:keys [host dbname port host]} details]
(-> details
(assoc :host host ; e.g. "localhost"
:make-pool? false
:db-type :postgres ; HACK hardcoded to postgres for time being until API has a way to choose DB type !
:port (Integer/parseInt port)) ; convert :port to an Integer
(cond-> (config/config-bool :mb-postgres-ssl) (assoc :ssl true :sslfactory "org.postgresql.ssl.NonValidatingFactory"))
(rename-keys {:dbname :db}))))
(defmethod connection :postgres [database]
(-> (connection-details database)
(dissoc :db-type)
kdb/postgres))
(defmethod can-connect? :postgres [database]
(generic/can-connect? database))
(defmethod can-connect-with-details? :postgres [details-map]
(let [connection (kdb/postgres (rename-keys details-map {:dbname :db}))]
(generic/test-connection connection)))
(ns metabase.driver.postgres.query-processor
(:require (metabase.driver.generic-sql [native :as native]
[query-processor :as generic])
[metabase.driver :refer [process-and-run]]))
(defmethod process-and-run :postgres [query]
(binding [native/*timezone->set-timezone-sql* (fn [timezone]
(format "SET LOCAL timezone TO '%s';" timezone))]
(generic/process-and-run query)))
......@@ -6,12 +6,10 @@
(defn- qp-query [table query-dict]
(binding [context/*table* table
context/*database* @(:db table)]
(->> (driver/process-and-run {:database (:db_id table)
(driver/driver-process-query {:database (:db_id table)
:type "query"
:query (assoc query-dict
:source_table (:id table))})
:data
:rows)))
:source_table (:id table))})))
(defn table-row-count
"Fetch the row count of TABLE via the query processor."
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment