From 7cb408a9b695bc31596bf856f4a3de77c8e6de17 Mon Sep 17 00:00:00 2001
From: Cam Saul <cammsaul@gmail.com>
Date: Mon, 29 Oct 2018 12:23:10 -0700
Subject: [PATCH] Add DB to QP Store; other cleanup [ci drivers]

---
 src/metabase/driver.clj                       | 17 +++--
 src/metabase/driver/bigquery.clj              | 20 ++---
 src/metabase/driver/generic_sql.clj           | 46 +++++++-----
 .../driver/generic_sql/query_processor.clj    | 73 +++++++++++-------
 src/metabase/driver/h2.clj                    |  7 +-
 src/metabase/driver/hive_like.clj             |  9 ++-
 src/metabase/driver/mongo.clj                 | 12 +--
 src/metabase/driver/presto.clj                | 10 +--
 src/metabase/query_processor.clj              |  2 +
 .../middleware/parameters/sql.clj             |  2 +-
 .../middleware/resolve_database.clj           | 19 +++++
 src/metabase/query_processor/store.clj        | 34 ++++++++-
 test/metabase/driver/generic_sql_test.clj     |  6 +-
 .../middleware/parameters/sql_test.clj        | 49 ++++++------
 test/metabase/query_processor_test.clj        |  1 +
 .../nested_queries_test.clj                   | 52 +++++--------
 .../sync/sync_metadata/comments_test.clj      | 10 +--
 test/metabase/test/data/bigquery.clj          | 61 ++++++++-------
 test/metabase/test/data/generic_sql.clj       | 75 +++++++++++--------
 test/metabase/test/data/interface.clj         | 47 ++++++------
 test/metabase/test/data/mongo.clj             | 44 ++++++-----
 test/metabase/test/data/presto.clj            | 28 ++++---
 22 files changed, 360 insertions(+), 264 deletions(-)
 create mode 100644 src/metabase/query_processor/middleware/resolve_database.clj

diff --git a/src/metabase/driver.clj b/src/metabase/driver.clj
index dfda3472990..6d1c2e3cafd 100644
--- a/src/metabase/driver.clj
+++ b/src/metabase/driver.clj
@@ -22,6 +22,7 @@
             [metabase.models
              [database :refer [Database]]
              [setting :refer [defsetting]]]
+            [metabase.query-processor.store :as qp.store]
             [metabase.sync.interface :as si]
             [metabase.util
              [date :as du]
@@ -397,13 +398,16 @@
   parses the date returned preserving it's timezone"
   [native-query date-formatters]
   (fn [driver database]
+    {:pre [(map? database)]}
     (let [settings (when-let [report-tz (report-timezone-if-supported driver)]
                      {:settings {:report-timezone report-tz}})
           time-str (try
-                     (->> (merge settings {:database database, :native {:query native-query}})
-                          (execute-query driver)
-                          :rows
-                          ffirst)
+                     (qp.store/with-store
+                       (qp.store/store-database! database)
+                       (->> (merge settings {:database database, :native {:query native-query}})
+                            (execute-query driver)
+                            :rows
+                            ffirst))
                      (catch Exception e
                        (throw
                         (Exception.
@@ -414,8 +418,9 @@
         (catch Exception e
           (throw
            (Exception.
-            (tru "Unable to parse date string ''{0}'' for database engine ''{1}''"
-                    time-str (-> database :engine name)) e)))))))
+            (str
+             (tru "Unable to parse date string ''{0}'' for database engine ''{1}''"
+                  time-str (-> database :engine name))) e)))))))
 
 (defn class->base-type
   "Return the `Field.base_type` that corresponds to a given class returned by the DB.
diff --git a/src/metabase/driver/bigquery.clj b/src/metabase/driver/bigquery.clj
index 165b7ce36b4..993331dfd02 100644
--- a/src/metabase/driver/bigquery.clj
+++ b/src/metabase/driver/bigquery.clj
@@ -22,9 +22,7 @@
             [metabase.mbql
              [schema :as mbql.s]
              [util :as mbql.u]]
-            [metabase.models
-             [database :refer [Database]]
-             [table :as table]]
+            [metabase.models.table :as table]
             [metabase.query-processor
              [store :as qp.store]
              [util :as qputil]]
@@ -315,6 +313,8 @@
 ;; This record type used for BigQuery table and field identifiers, since BigQuery has some stupid rules about how to
 ;; quote them (tables are like `dataset.table` and fields are like `dataset.table`.`field`)
 ;; This implements HoneySql's ToSql protocol, so we can just output this directly in most of our QP code below
+;;
+;; TODO - this is totally unnecessary now, we can just override `->honeysql` for `Field` and `Table` instead. FIXME!
 (defrecord ^:private BigQueryIdentifier [dataset-name ; optional; will use (dataset-name-for-current-query) otherwise
                                          table-name
                                          field-name]
@@ -438,12 +438,13 @@
 
 (defn- field->identifier
   "Generate appropriate identifier for a Field for SQL parameters. (NOTE: THIS IS ONLY USED FOR SQL PARAMETERS!)"
-  ;; TODO - Making 2 DB calls for each field to fetch its dataset is inefficient and makes me cry, but this method is
+  ;; TODO - Making a DB call for each field to fetch its Table is inefficient and makes me cry, but this method is
   ;; currently only used for SQL params so it's not a huge deal at this point
+  ;;
   ;; TODO - we should make sure these are in the QP store somewhere and then could at least batch the calls
   [{table-id :table_id, :as field}]
-  (let [{table-name :name, database-id :db_id} (db/select-one [table/Table :name :db_id], :id (u/get-id table-id))
-        details                                (db/select-one-field :details Database, :id (u/get-id database-id))]
+  (let [table-name (db/select-one-field :name table/Table :id (u/get-id table-id))
+        details    (:details (qp.store/database))]
     (map->BigQueryIdentifier {:dataset-name (:dataset-id details), :table-name table-name, :field-name (:name field)})))
 
 (defn- field-clause->field [field-clause]
@@ -524,7 +525,7 @@
     {source-table-id :source-table} :query
     :as                             outer-query}]
   {:pre [(integer? database-id)]}
-  (let [dataset-id         (:dataset-id (db/select-one-field :details Database :id (u/get-id database-id)))
+  (let [dataset-id         (-> (qp.store/database) :details :dataset-id)
         aliased-query      (pre-alias-aggregations outer-query)
         {table-name :name} (qp.store/table source-table-id)]
     (assert (seq dataset-id))
@@ -541,10 +542,9 @@
     (time/time-zone-for-id (.getID jvm-tz))
     time/utc))
 
-(defn- execute-query [{database-id                                            :database
-                       {sql :query, params :params, :keys [table-name mbql?]} :native
+(defn- execute-query [{{sql :query, params :params, :keys [table-name mbql?]} :native
                        :as                                                    outer-query}]
-  (let [database (db/select-one [Database :id :details], :id (u/get-id database-id))]
+  (let [database (qp.store/database)]
     (binding [*bigquery-timezone* (effective-query-timezone database)]
       (let [sql     (str "-- " (qputil/query->remark outer-query) "\n" (if (seq params)
                                                                          (unprepare/unprepare (cons sql params))
diff --git a/src/metabase/driver/generic_sql.clj b/src/metabase/driver/generic_sql.clj
index 437086bbb7b..b24f39ee059 100644
--- a/src/metabase/driver/generic_sql.clj
+++ b/src/metabase/driver/generic_sql.clj
@@ -95,7 +95,7 @@
      building a SQL statement. Defaults to `:ansi`, but other valid options are `:mysql`, `:sqlserver`, `:oracle`, and
      `:h2` (added in `metabase.util.honeysql-extensions`; like `:ansi`, but uppercases the result).
 
-        (hsql/format ... :quoting (quote-style driver))")
+        (hsql/format ... :quoting (quote-style driver), :allow-dashed-names? true)")
 
   (set-timezone-sql ^String [this]
     "*OPTIONAL*. This should be a format string containing a SQL statement to be used to set the timezone for the
@@ -131,7 +131,7 @@
   {:pre [(map? database)]}
   (log/debug (u/format-color 'cyan "Creating new connection pool for database %d ..." id))
   (let [details-with-tunnel (ssh/include-ssh-tunnel details) ;; If the tunnel is disabled this returned unchanged
-        spec (connection-details->spec (driver/engine->driver engine) details-with-tunnel)]
+        spec                (connection-details->spec (driver/engine->driver engine) details-with-tunnel)]
     (assoc (mdb/connection-pool (assoc spec
                                   :minimum-pool-size           1
                                   ;; prevent broken connections closed by dbs by testing them every 3 mins
@@ -238,7 +238,7 @@
 
 (def ^:private ^:dynamic *jdbc-options* {})
 
-(defn- query
+(defn query
   "Execute a HONEYSQL-FROM query against DATABASE, DRIVER, and optionally TABLE."
   ([driver database honeysql-form]
    (jdbc/query (db->jdbc-connection-spec database)
@@ -270,14 +270,16 @@
 
 ;;; ## Database introspection methods used by sync process
 
-(defmacro with-metadata
+;; Don't use this anymore! Use the new `jdbc/with-db-metadata` fn
+(defmacro ^:deprecated with-metadata
   "Execute BODY with `java.sql.DatabaseMetaData` for DATABASE."
   [[binding _ database] & body]
   `(with-open [^java.sql.Connection conn# (jdbc/get-connection (db->jdbc-connection-spec ~database))]
      (let [~binding (.getMetaData conn#)]
        ~@body)))
 
-(defmacro ^:private with-resultset-open
+;; Don't use this anymore! You can just `with-metadata` and `jdbc/result-set-seq` instead!!!
+(defmacro ^:private ^:deprecated with-resultset-open
   "This is like `with-open` but with JDBC ResultSet objects. Will execute `body` with a `jdbc/result-set-seq` bound
   the the symbols provided in the binding form. The binding form is just like `let` or `with-open`, but yield a
   `ResultSet`. That `ResultSet` will be closed upon exit of `body`."
@@ -288,10 +290,15 @@
        (let ~(vec (interleave (map first binding-pairs) (map #(list `~jdbc/result-set-seq %) rs-syms)))
          ~@body))))
 
+(defn get-catalogs
+  "Returns a set of all of the catalogs found via `metadata`"
+  [^DatabaseMetaData metadata]
+  (set (map :table_cat (jdbc/result-set-seq (.getCatalogs metadata)))))
+
 (defn- get-tables
   "Fetch a JDBC Metadata ResultSet of tables in the DB, optionally limited to ones belonging to a given schema."
-  ^ResultSet [^DatabaseMetaData metadata, ^String schema-or-nil]
-  (with-resultset-open [rs-seq (.getTables metadata nil schema-or-nil "%" ; tablePattern "%" = match all tables
+  ^ResultSet [^DatabaseMetaData metadata, ^String schema-or-nil, ^String database-name-or-nil]
+  (with-resultset-open [rs-seq (.getTables metadata database-name-or-nil schema-or-nil "%" ; tablePattern "%" = match all tables
                                            (into-array String ["TABLE", "VIEW", "FOREIGN TABLE", "MATERIALIZED VIEW"]))]
     ;; Ensure we read all rows before exiting
     (doall rs-seq)))
@@ -303,12 +310,12 @@
 
    This is as much as 15x faster for Databases with lots of system tables than `post-filtered-active-tables` (4
    seconds vs 60)."
-  [driver, ^DatabaseMetaData metadata]
+  [driver, ^DatabaseMetaData metadata, & [database-name-or-nil]]
   (with-resultset-open [rs-seq (.getSchemas metadata)]
     (let [all-schemas (set (map :table_schem rs-seq))
           schemas     (set/difference all-schemas (excluded-schemas driver))]
       (set (for [schema schemas
-                 table  (get-tables metadata schema)]
+                 table  (get-tables metadata schema database-name-or-nil)]
              (let [remarks (:remarks table)]
                {:name        (:table_name table)
                 :schema      schema
@@ -318,9 +325,9 @@
 (defn post-filtered-active-tables
   "Alternative implementation of `ISQLDriver/active-tables` best suited for DBs with little or no support for schemas.
    Fetch *all* Tables, then filter out ones whose schema is in `excluded-schemas` Clojure-side."
-  [driver, ^DatabaseMetaData metadata]
+  [driver, ^DatabaseMetaData metadata  & [database-name-or-nil]]
   (set (for [table   (filter #(not (contains? (excluded-schemas driver) (:table_schem %)))
-                             (get-tables metadata nil))]
+                             (get-tables metadata nil nil))]
          (let [remarks (:remarks table)]
            {:name        (:table_name  table)
             :schema      (:table_schem table)
@@ -343,8 +350,10 @@
       (str "Invalid type: " special-type))
     special-type))
 
-(defn- describe-table-fields [^DatabaseMetaData metadata, driver, {schema :schema, table-name :name}]
-  (with-resultset-open [rs-seq (.getColumns metadata nil schema table-name nil)]
+(defn describe-table-fields
+  "Returns a set of column metadata for `schema` and `table-name` using `metadata`. "
+  [^DatabaseMetaData metadata, driver, {schema :schema, table-name :name}, & [database-name-or-nil]]
+  (with-resultset-open [rs-seq (.getColumns metadata database-name-or-nil schema table-name nil)]
     (set (for [{database-type :type_name, column-name :column_name, remarks :remarks} rs-seq]
            (merge {:name          column-name
                    :database-type database-type
@@ -354,7 +363,8 @@
                   (when-let [special-type (calculated-special-type driver column-name database-type)]
                     {:special-type special-type}))))))
 
-(defn- add-table-pks
+(defn add-table-pks
+  "Using `metadata` find any primary keys for `table` and assoc `:pk?` to true for those columns."
   [^DatabaseMetaData metadata, table]
   (with-resultset-open [rs-seq (.getPrimaryKeys metadata nil nil (:name table))]
     (let [pks (set (map :column_name rs-seq))]
@@ -380,9 +390,11 @@
          ;; find PKs and mark them
          (add-table-pks metadata))))
 
-(defn- describe-table-fks [driver database table]
+(defn describe-table-fks
+  "Default implementation of `describe-table-fks` for JDBC based drivers."
+  [driver database table & [database-name-or-nil]]
   (with-metadata [metadata driver database]
-    (with-resultset-open [rs-seq (.getImportedKeys metadata nil (:schema table) (:name table))]
+    (with-resultset-open [rs-seq (.getImportedKeys metadata database-name-or-nil (:schema table) (:name table))]
       (set (for [result rs-seq]
              {:fk-column-name   (:fkcolumn_name result)
               :dest-table       {:name   (:pktable_name result)
@@ -415,7 +427,7 @@
 (s/defn ^:private honeysql->prepared-stmt-subs
   "Convert X to a replacement snippet info map by passing it to HoneySQL's `format` function."
   [driver x]
-  (let [[snippet & args] (hsql/format x, :quoting (quote-style driver))]
+  (let [[snippet & args] (hsql/format x, :quoting (quote-style driver), :allow-dashed-names? true)]
     (make-stmt-subs snippet args)))
 
 (s/defmethod ->prepared-substitution [Object nil] :- PreparedStatementSubstitution
diff --git a/src/metabase/driver/generic_sql/query_processor.clj b/src/metabase/driver/generic_sql/query_processor.clj
index 3cb2236d200..4092e3ab980 100644
--- a/src/metabase/driver/generic_sql/query_processor.clj
+++ b/src/metabase/driver/generic_sql/query_processor.clj
@@ -13,7 +13,9 @@
             [metabase.mbql
              [schema :as mbql.s]
              [util :as mbql.u]]
-            [metabase.models.database :refer [Database]]
+            [metabase.models
+             [field :refer [Field]]
+             [table :refer [Table]]]
             [metabase.query-processor
              [interface :as i]
              [store :as qp.store]
@@ -25,8 +27,7 @@
              [i18n :refer [tru]]]
             [schema.core :as s])
   (:import [java.sql PreparedStatement ResultSet ResultSetMetaData SQLException]
-           [java.util Calendar Date TimeZone]
-           metabase.models.field.FieldInstance))
+           [java.util Calendar Date TimeZone]))
 
 ;; TODO - yet another `*query*` dynamic var. We should really consolidate them all so we only need a single one.
 (def ^:dynamic *query*
@@ -46,7 +47,7 @@
 
 (s/defn ^:private qualified-alias
   "Convert the given `FIELD` to a stringified alias, for use in a SQL `AS` clause."
-  [driver, field :- FieldInstance]
+  [driver, field :- (class Field)]
   (some->> field
            (sql/field->alias driver)
            hx/qualify-and-escape-dots))
@@ -92,14 +93,19 @@
   ;; original formula.
   (->honeysql driver (mbql.u/expression-with-name *query* expression-name)))
 
-(defmethod ->honeysql [Object FieldInstance]
+(defn cast-unix-timestamp-field-if-needed
+  "Wrap a `field-identifier` in appropriate HoneySQL expressions if it refers to a UNIX timestamp Field."
+  [driver field field-identifier]
+  (condp #(isa? %2 %1) (:special_type field)
+    :type/UNIXTimestampSeconds      (sql/unix-timestamp->timestamp driver field-identifier :seconds)
+    :type/UNIXTimestampMilliseconds (sql/unix-timestamp->timestamp driver field-identifier :milliseconds)
+    field-identifier))
+
+(defmethod ->honeysql [Object (class Field)]
   [driver field]
   (let [table            (qp.store/table (:table_id field))
         field-identifier (keyword (hx/qualify-and-escape-dots (:schema table) (:name table) (:name field)))]
-    (condp #(isa? %2 %1) (:special_type field)
-      :type/UNIXTimestampSeconds      (sql/unix-timestamp->timestamp driver field-identifier :seconds)
-      :type/UNIXTimestampMilliseconds (sql/unix-timestamp->timestamp driver field-identifier :milliseconds)
-      field-identifier)))
+    (cast-unix-timestamp-field-if-needed driver field field-identifier)))
 
 (defmethod ->honeysql [Object :field-id]
   [driver [_ field-id]]
@@ -110,14 +116,16 @@
   ;; because the dest field needs to be qualified like `categories__via_category_id.name` instead of the normal
   ;; `public.category.name` we will temporarily swap out the `categories` Table in the QP store for the duration of
   ;; converting this `fk->` clause to HoneySQL. We'll remove the `:schema` and swap out the `:name` with the alias so
-  ;; other `->honeysql` impls (e.g. the `FieldInstance` one) will do the correct thing automatically without having to
+  ;; other `->honeysql` impls (e.g. the `(class Field` one) will do the correct thing automatically without having to
   ;; worry about the context in which they are being called
   (qp.store/with-pushed-store
     (when-let [{:keys [join-alias table-id]} (mbql.u/fk-clause->join-info *query* fk-clause)]
       (when table-id
         (qp.store/store-table! (assoc (qp.store/table table-id)
                                  :schema nil
-                                 :name   join-alias))))
+                                 :name   join-alias
+                                 ;; for drivers that need to know these things, like Snowflake
+                                 :alias? true))))
     (->honeysql driver dest-field-clause)))
 
 (defmethod ->honeysql [Object :field-literal]
@@ -314,22 +322,20 @@
 (defn- make-honeysql-join-clauses
   "Returns a seq of honeysql join clauses, joining to `table-or-query-expr`. `jt-or-jq` can be either a `JoinTable` or
   a `JoinQuery`"
-  [table-or-query-expr {:keys [join-alias fk-field-id pk-field-id]}]
-  (let [source-table-id                                  (mbql.u/query->source-table-id *query*)
-        {source-table-name :name, source-schema :schema} (qp.store/table source-table-id)
-        source-field                                     (qp.store/field fk-field-id)
-        pk-field                                         (qp.store/field pk-field-id)]
+  [driver table-or-query-expr {:keys [join-alias fk-field-id pk-field-id]}]
+  (let [source-field (qp.store/field fk-field-id)
+        pk-field     (qp.store/field pk-field-id)]
     [[table-or-query-expr (keyword join-alias)]
      [:=
-      (hx/qualify-and-escape-dots source-schema source-table-name (:name source-field))
+      (->honeysql driver source-field)
       (hx/qualify-and-escape-dots join-alias (:name pk-field))]]))
 
 (s/defn ^:private join-info->honeysql
   [driver , {:keys [query table-id], :as info} :- mbql.s/JoinInfo]
   (if query
-    (make-honeysql-join-clauses (build-honeysql-form driver query) info)
+    (make-honeysql-join-clauses driver (build-honeysql-form driver query) info)
     (let [table (qp.store/table table-id)]
-      (make-honeysql-join-clauses (hx/qualify-and-escape-dots (:schema table) (:name table)) info))))
+      (make-honeysql-join-clauses driver (->honeysql driver table) info))))
 
 (defn apply-join-tables
   "Apply expanded query `join-tables` clause to `honeysql-form`. Default implementation of `apply-join-tables` for SQL
@@ -367,12 +373,16 @@
 
 ;;; -------------------------------------------------- source-table --------------------------------------------------
 
+(defmethod ->honeysql [Object (class Table)]
+  [_ table]
+  (let [{table-name :name, schema :schema} table]
+    (hx/qualify-and-escape-dots schema table-name)))
+
 (defn apply-source-table
   "Apply `source-table` clause to `honeysql-form`. Default implementation of `apply-source-table` for SQL drivers.
   Override as needed."
-  [_ honeysql-form {source-table-id :source-table}]
-  (let [{table-name :name, schema :schema} (qp.store/table source-table-id)]
-    (h/from honeysql-form (hx/qualify-and-escape-dots schema table-name))))
+  [driver honeysql-form {source-table-id :source-table}]
+  (h/from honeysql-form (->honeysql driver (qp.store/table source-table-id))))
 
 
 ;;; +----------------------------------------------------------------------------------------------------------------+
@@ -435,7 +445,11 @@
   [driver honeysql-form {:keys [source-query], :as inner-query}]
   (qp.store/with-pushed-store
     (when-let [source-table-id (:source-table source-query)]
-      (qp.store/store-table! (assoc (qp.store/table source-table-id) :schema nil, :name (name source-query-alias))))
+      (qp.store/store-table! (assoc (qp.store/table source-table-id)
+                               :schema nil
+                               :name   (name source-query-alias)
+                               ;; some drivers like Snowflake need to know this so they don't include Database name
+                               :alias? true)))
     (apply-top-level-clauses driver honeysql-form (dissoc inner-query :source-query))))
 
 
@@ -587,7 +601,7 @@
 
 (defn- run-query
   "Run the query itself."
-  [{sql :query, params :params, remark :remark} timezone connection]
+  [driver {sql :query, params :params, remark :remark} timezone connection]
   (let [sql              (str "-- " remark "\n" (hx/unescape-dots sql))
         statement        (into [sql] params)
         [columns & rows] (cancellable-run-query connection sql params
@@ -648,14 +662,15 @@
     (log/debug (u/format-color 'green (tru "Setting timezone with statement: {0}" sql)))
     (jdbc/db-do-prepared connection [sql])))
 
-(defn- run-query-without-timezone [_ _ connection query]
-  (do-in-transaction connection (partial run-query query nil)))
+(defn- run-query-without-timezone [driver _ connection query]
+  (do-in-transaction connection (partial run-query driver query nil)))
 
 (defn- run-query-with-timezone [driver {:keys [^String report-timezone] :as settings} connection query]
   (try
     (do-in-transaction connection (fn [transaction-connection]
                                     (set-timezone! driver settings transaction-connection)
-                                    (run-query query
+                                    (run-query driver
+                                               query
                                                (some-> report-timezone TimeZone/getTimeZone)
                                                transaction-connection)))
     (catch SQLException e
@@ -670,11 +685,11 @@
 
 (defn execute-query
   "Process and run a native (raw SQL) QUERY."
-  [driver {settings :settings, database-id :database, query :native, :as outer-query}]
+  [driver {settings :settings, query :native, :as outer-query}]
   (let [query (assoc query :remark (qputil/query->remark outer-query))]
     (do-with-try-catch
       (fn []
-        (let [db-connection (sql/db->jdbc-connection-spec (Database (u/get-id database-id)))]
+        (let [db-connection (sql/db->jdbc-connection-spec (qp.store/database))]
           ((if (seq (:report-timezone settings))
              run-query-with-timezone
              run-query-without-timezone) driver settings db-connection query))))))
diff --git a/src/metabase/driver/h2.clj b/src/metabase/driver/h2.clj
index c08570eece5..9f7f83e51a8 100644
--- a/src/metabase/driver/h2.clj
+++ b/src/metabase/driver/h2.clj
@@ -7,11 +7,10 @@
              [util :as u]]
             [metabase.db.spec :as dbspec]
             [metabase.driver.generic-sql :as sql]
-            [metabase.models.database :refer [Database]]
+            [metabase.query-processor.store :as qp.store]
             [metabase.util
              [honeysql-extensions :as hx]
-             [i18n :refer [tru]]]
-            [toucan.db :as db]))
+             [i18n :refer [tru]]]))
 
 (def ^:private ^:const column->base-type
   {:ARRAY                       :type/*
@@ -129,7 +128,7 @@
     ;; connection string. We don't allow SQL execution on H2 databases for the default admin account for security
     ;; reasons
     (when (= (keyword query-type) :native)
-      (let [{:keys [db]}   (db/select-one-field :details Database :id database-id)
+      (let [{:keys [db]}   (:details (qp.store/database))
             _              (assert db)
             [_ options]    (connection-string->file+options db)
             {:strs [USER]} options]
diff --git a/src/metabase/driver/hive_like.clj b/src/metabase/driver/hive_like.clj
index 0615aad3ada..a557af5c045 100644
--- a/src/metabase/driver/hive_like.clj
+++ b/src/metabase/driver/hive_like.clj
@@ -4,7 +4,9 @@
              [core :as hsql]
              [format :as hformat]]
             [metabase.driver.generic-sql.util.unprepare :as unprepare]
-            [metabase.models.field :refer [Field]]
+            [metabase.models
+             [field :refer [Field]]
+             [table :refer [Table]]]
             [metabase.util :as u]
             [metabase.util.honeysql-extensions :as hx]
             [toucan.db :as db])
@@ -104,10 +106,11 @@
   "Return the pieces that represent a path to FIELD, of the form `[table-name parent-fields-name* field-name]`.
    This function should be used by databases where schemas do not make much sense."
   [{field-name :name, table-id :table_id, parent-id :parent_id}]
-  ;; TODO - we are making too many DB calls here! Why aren't we using the QP Store?
+  ;; TODO - we are making too many DB calls here!
+  ;; (At least this is only used for SQL parameters, which is why we can't currently use the Store)
   (conj (vec (if-let [parent (Field parent-id)]
                (qualified-name-components parent)
-               (let [{table-name :name, schema :schema} (db/select-one ['Table :name :schema], :id table-id)]
+               (let [{table-name :name, schema :schema} (db/select-one [Table :name :schema], :id table-id)]
                  [table-name])))
         field-name))
 
diff --git a/src/metabase/driver/mongo.clj b/src/metabase/driver/mongo.clj
index 03b2cb28cff..7b6a9d2fa94 100644
--- a/src/metabase/driver/mongo.clj
+++ b/src/metabase/driver/mongo.clj
@@ -8,16 +8,16 @@
             [metabase.driver.mongo
              [query-processor :as qp]
              [util :refer [with-mongo-connection]]]
-            [metabase.models.database :refer [Database]]
-            [metabase.util.ssh :as ssh]
+            [metabase.query-processor.store :as qp.store]
+            [metabase.util
+             [i18n :refer [tru]]
+             [ssh :as ssh]]
             [monger
              [collection :as mc]
              [command :as cmd]
              [conversion :as conv]
              [db :as mdb]]
-            [metabase.util.i18n :refer [tru]]
-            [schema.core :as s]
-            [toucan.db :as db])
+            [schema.core :as s])
   (:import com.mongodb.DB))
 
 ;;; ## MongoDriver
@@ -51,7 +51,7 @@
 
 (defn- process-query-in-context [qp]
   (fn [{database-id :database, :as query}]
-    (with-mongo-connection [_ (db/select-one [Database :details], :id database-id)]
+    (with-mongo-connection [_ (qp.store/database)]
       (qp query))))
 
 
diff --git a/src/metabase/driver/presto.clj b/src/metabase/driver/presto.clj
index 2305f09abd1..5232bd592a9 100644
--- a/src/metabase/driver/presto.clj
+++ b/src/metabase/driver/presto.clj
@@ -18,14 +18,14 @@
             [metabase.driver.generic-sql :as sql]
             [metabase.driver.generic-sql.query-processor :as sqlqp]
             [metabase.driver.generic-sql.util.unprepare :as unprepare]
-            [metabase.models.database :refer [Database]]
-            [metabase.query-processor.util :as qputil]
+            [metabase.query-processor
+             [store :as qp.store]
+             [util :as qputil]]
             [metabase.util
              [date :as du]
              [honeysql-extensions :as hx]
              [i18n :refer [tru]]
-             [ssh :as ssh]]
-            [toucan.db :as db])
+             [ssh :as ssh]])
   (:import java.sql.Time
            java.util.Date))
 
@@ -241,7 +241,7 @@
   (let [sql                    (str "-- "
                                     (qputil/query->remark outer-query) "\n"
                                     (unprepare/unprepare (cons sql params) :quote-escape "'", :iso-8601-fn :from_iso8601_timestamp))
-        details                (merge (db/select-one-field :details Database :id (u/get-id database-id))
+        details                (merge (:details (qp.store/database))
                                       settings)
         {:keys [columns rows]} (execute-presto-query! details sql)
         columns                (for [[col name] (map vector columns (map :name columns))]
diff --git a/src/metabase/query_processor.clj b/src/metabase/query_processor.clj
index 135ade1fdba..4589eaed64c 100644
--- a/src/metabase/query_processor.clj
+++ b/src/metabase/query_processor.clj
@@ -37,6 +37,7 @@
              [parameters :as parameters]
              [permissions :as perms]
              [reconcile-breakout-and-order-by-bucketing :as reconcile-bucketing]
+             [resolve-database :as resolve-database]
              [resolve-driver :as resolve-driver]
              [resolve-fields :as resolve-fields]
              [resolve-joined-tables :as resolve-joined-tables]
@@ -134,6 +135,7 @@
       ;; TODO - I think we should do this much earlier
       resolve-driver/resolve-driver
       bind-timezone/bind-effective-timezone
+      resolve-database/resolve-database
       fetch-source-query/fetch-source-query
       store/initialize-store
       query-throttle/maybe-add-query-throttle
diff --git a/src/metabase/query_processor/middleware/parameters/sql.clj b/src/metabase/query_processor/middleware/parameters/sql.clj
index a58ef58f35b..3763d0b6b07 100644
--- a/src/metabase/query_processor/middleware/parameters/sql.clj
+++ b/src/metabase/query_processor/middleware/parameters/sql.clj
@@ -356,7 +356,7 @@
 (s/defn ^:private honeysql->replacement-snippet-info :- ParamSnippetInfo
   "Convert X to a replacement snippet info map by passing it to HoneySQL's `format` function."
   [x]
-  (let [[snippet & args] (hsql/format x, :quoting (sql/quote-style qp.i/*driver*))]
+  (let [[snippet & args] (hsql/format x, :quoting (sql/quote-style qp.i/*driver*), :allow-dashed-names? true)]
     {:replacement-snippet     snippet
      :prepared-statement-args args}))
 
diff --git a/src/metabase/query_processor/middleware/resolve_database.clj b/src/metabase/query_processor/middleware/resolve_database.clj
new file mode 100644
index 00000000000..ab439c1711d
--- /dev/null
+++ b/src/metabase/query_processor/middleware/resolve_database.clj
@@ -0,0 +1,19 @@
+(ns metabase.query-processor.middleware.resolve-database
+  (:require [metabase.util :as u]
+            [metabase.query-processor.store :as qp.store]
+            [metabase.models.database :as database :refer [Database]]
+            [metabase.util.i18n :refer [tru]]
+            [toucan.db :as db]))
+
+(defn- resolve-database* [{database-id :database, :as query}]
+  (u/prog1 query
+    (when-not (= database-id database/virtual-id)
+      (qp.store/store-database! (or (db/select-one (apply vector Database qp.store/database-columns-to-fetch)
+                                      :id (u/get-id database-id))
+                                    (throw (Exception. (str (tru "Database {0} does not exist." database-id)))))))))
+
+(defn resolve-database
+  "Middleware that resolves the Database referenced by the query under that `:database` key and stores it in the QP
+  Store."
+  [qp]
+  (comp qp resolve-database*))
diff --git a/src/metabase/query_processor/store.clj b/src/metabase/query_processor/store.clj
index 3f7a1d45f10..853014a6ef3 100644
--- a/src/metabase/query_processor/store.clj
+++ b/src/metabase/query_processor/store.clj
@@ -13,6 +13,7 @@
   but fetching all Fields in a single pass and storing them for reuse is dramatically more efficient than fetching
   those Fields potentially dozens of times in a single query execution."
   (:require [metabase.models
+             [database :refer [Database]]
              [field :refer [Field]]
              [table :refer [Table]]]
             [metabase.util.i18n :refer [tru]]
@@ -69,7 +70,21 @@
   [& body]
   `(do-with-pushed-store (fn [] ~@body)))
 
-;; TODO - DATABASE ??
+(def database-columns-to-fetch
+  "Columns you should fetch for the Database referenced by the query before stashing in the store."
+  [:id
+   :engine
+   :name
+   :details])
+
+(def ^:private DatabaseInstanceWithRequiredStoreKeys
+  (s/both
+   (class Database)
+   {:id      su/IntGreaterThanZero
+    :engine  s/Keyword
+    :name    su/NonBlankString
+    :details su/Map
+    s/Any    s/Any}))
 
 (def table-columns-to-fetch
   "Columns you should fetch for any Table you want to stash in the Store."
@@ -118,14 +133,20 @@
 
 ;;; ------------------------------------------ Saving objects in the Store -------------------------------------------
 
+(s/defn store-database!
+  "Store the Database referenced by this query for the duration of the current query execution. Throws an Exception if
+  database is invalid or doesn't have all the required keys."
+  [database :- DatabaseInstanceWithRequiredStoreKeys]
+  (swap! *store* assoc :database database))
+
 (s/defn store-table!
-  "Store a `table` in the QP Store for the duration of the current query execution. Throws an Exception if Table is
+  "Store a `table` in the QP Store for the duration of the current query execution. Throws an Exception if table is
   invalid or doesn't have all required keys."
   [table :- TableInstanceWithRequiredStoreKeys]
   (swap! *store* assoc-in [:tables (u/get-id table)] table))
 
 (s/defn store-field!
-  "Store a `field` in the QP Store for the duration of the current query execution. Throws an Exception if Field is
+  "Store a `field` in the QP Store for the duration of the current query execution. Throws an Exception if field is
   invalid or doesn't have all required keys."
   [field :- FieldInstanceWithRequiredStorekeys]
   (swap! *store* assoc-in [:fields (u/get-id field)] field))
@@ -138,6 +159,13 @@
 
 ;;; ---------------------------------------- Fetching objects from the Store -----------------------------------------
 
+(s/defn database :- DatabaseInstanceWithRequiredStoreKeys
+  "Fetch the Database referenced by the current query from the QP Store. Throws an Exception if valid item is not
+  returned."
+  []
+  (or (:database @*store*)
+      (throw (Exception. (str (tru "Error: Database is not present in the Query Processor Store."))))))
+
 (s/defn table :- TableInstanceWithRequiredStoreKeys
   "Fetch Table with `table-id` from the QP Store. Throws an Exception if valid item is not returned."
   [table-id :- su/IntGreaterThanZero]
diff --git a/test/metabase/driver/generic_sql_test.clj b/test/metabase/driver/generic_sql_test.clj
index fc660a30ce6..2845071b992 100644
--- a/test/metabase/driver/generic_sql_test.clj
+++ b/test/metabase/driver/generic_sql_test.clj
@@ -84,9 +84,9 @@
    {:name "The Apple Pan",                :price 2, :category_id 11, :id 3}
    {:name "Wurstküche",                   :price 2, :category_id 29, :id 4}
    {:name "Brite Spot Family Restaurant", :price 2, :category_id 20, :id 5}]
-  (for [row (take 5 (sort-by :id (#'sql/table-rows-seq datasets/*driver*
-                                   (db/select-one 'Database :id (id))
-                                   (db/select-one 'Table :id (id :venues)))))]
+  (for [row (take 5 (sort-by :id (driver/table-rows-seq datasets/*driver*
+                                                        (db/select-one 'Database :id (id))
+                                                        (db/select-one 'Table :id (id :venues)))))]
     ;; different DBs use different precisions for these
     (-> (dissoc row :latitude :longitude)
         (update :price int)
diff --git a/test/metabase/query_processor/middleware/parameters/sql_test.clj b/test/metabase/query_processor/middleware/parameters/sql_test.clj
index fa008364b57..0524a6015b0 100644
--- a/test/metabase/query_processor/middleware/parameters/sql_test.clj
+++ b/test/metabase/query_processor/middleware/parameters/sql_test.clj
@@ -13,11 +13,11 @@
             [metabase.test
              [data :as data]
              [util :as tu]]
-            [metabase.test.data
-             [datasets :as datasets]
-             [generic-sql :as generic-sql]]
-            [metabase.util.date :as du]
-            [toucan.db :as db]))
+            [metabase.test.data.datasets :as datasets]
+            [metabase.util
+             [date :as du]
+             [schema :as su]]
+            [schema.core :as s]))
 
 ;;; ----------------------------------------------- basic parser tests -----------------------------------------------
 
@@ -554,19 +554,12 @@
 
 ;;; -------------------------------------------- "REAL" END-TO-END-TESTS ---------------------------------------------
 
-(defn- quote-name [identifier]
-  (generic-sql/quote-name datasets/*driver* identifier))
-
-(defn- checkins-identifier []
-  ;; HACK ! I don't have all day to write protocol methods to make this work the "right" way so for BigQuery and
-  ;; Presto we will just hackily return the correct identifier here
-  (case datasets/*engine*
-    :bigquery "`test_data.checkins`"
-    :presto   "\"default\".\"checkins\""
-    (let [{table-name :name, schema :schema} (db/select-one ['Table :name :schema], :id (data/id :checkins))]
-      (str (when (seq schema)
-             (str (quote-name schema) \.))
-           (quote-name table-name)))))
+(s/defn ^:private checkins-identifier :- su/NonBlankString
+  "Get the identifier used for `checkins` for the current driver by looking at what the driver uses when converting MBQL
+  to SQL. Different drivers qualify to different degrees (i.e. `table` vs `schema.table` vs `database.schema.table`)."
+  []
+  (let [sql (:query (qp/query->native (data/mbql-query checkins)))]
+    (second (re-find #"FROM\s([^\s()]+)" sql))))
 
 ;; as with the MBQL parameters tests Redshift and Crate fail for unknown reasons; disable their tests for now
 (def ^:private ^:const sql-parameters-engines
@@ -604,8 +597,8 @@
                                                      :dimension    [:field-id (data/id :checkins :date)]}}}
         :parameters []))))
 
-;; test that relative dates work correctly. It should be enough to try just one type of relative date here,
-;; since handling them gets delegated to the functions in `metabase.query-processor.parameters`, which is fully-tested :D
+;; test that relative dates work correctly. It should be enough to try just one type of relative date here, since
+;; handling them gets delegated to the functions in `metabase.query-processor.parameters`, which is fully-tested :D
 (datasets/expect-with-engines sql-parameters-engines
   [0]
   (first-row
@@ -647,12 +640,13 @@
   (tu/with-temporary-setting-values [report-timezone "America/Los_Angeles"]
     (first-row
       (process-native
-        :native     {:query         (cond
-                                      (= :bigquery datasets/*engine*)
+        :native     {:query         (case datasets/*engine*
+                                      :bigquery
                                       "SELECT {{date}} as date"
-                                      (= :oracle datasets/*engine*)
+
+                                      :oracle
                                       "SELECT cast({{date}} as date) from dual"
-                                      :else
+
                                       "SELECT cast({{date}} as date)")
                      :template-tags {"date" {:name "date" :display-name "Date" :type :date}}}
         :parameters [{:type :date/single :target [:variable [:template-tag "date"]] :value "2018-04-18"}]))))
@@ -660,6 +654,13 @@
 
 ;;; -------------------------------------------- SQL PARAMETERS 2.0 TESTS --------------------------------------------
 
+;; make sure we handle quotes inside names correctly!
+(expect
+  {:replacement-snippet     "\"test-data\".\"PUBLIC\".\"checkins\".\"date\"",
+   :prepared-statement-args nil}
+  (binding [qp.i/*driver* (driver/engine->driver :postgres)]
+    (#'sql/honeysql->replacement-snippet-info :test-data.PUBLIC.checkins.date)))
+
 ;; Some random end-to-end param expansion tests added as part of the SQL Parameters 2.0 rewrite
 
 (expect
diff --git a/test/metabase/query_processor_test.clj b/test/metabase/query_processor_test.clj
index d0528dabfcf..469eb3ddd34 100644
--- a/test/metabase/query_processor_test.clj
+++ b/test/metabase/query_processor_test.clj
@@ -273,6 +273,7 @@
     (aggregate-col :count)
     (aggregate-col :avg (venues-col :id))"
   {:arglists '([ag-col-kw] [ag-col-kw field])}
+  ;; TODO - cumulative count doesn't require a FIELD !!!!!!!!!
   ([ag-col-kw]
    (assert (= ag-col-kw) :count)
    {:base_type    :type/Integer
diff --git a/test/metabase/query_processor_test/nested_queries_test.clj b/test/metabase/query_processor_test/nested_queries_test.clj
index 0729760e91c..a7f6705523c 100644
--- a/test/metabase/query_processor_test/nested_queries_test.clj
+++ b/test/metabase/query_processor_test/nested_queries_test.clj
@@ -23,6 +23,7 @@
              [util :as tu]]
             [metabase.test.data
              [dataset-definitions :as defs]
+             [generic-sql :as sql.test]
              [datasets :as datasets]
              [users :refer [create-users-if-needed! user->client]]]
             [toucan.db :as db]
@@ -62,26 +63,6 @@
                                    :limit        10}
                     :limit        5}}))))
 
-;; TODO - `identifier`, `quoted-identifier` might belong in some sort of shared util namespace
-(defn- identifier
-  "Return a properly formatted *UNQUOTED* identifier for a Table or Field.
-  (This handles DBs like H2 who require uppercase identifiers, or databases like Redshift do clever hacks
-   like prefixing table names with a unique schema for each test run because we're not
-   allowed to create new databases.)"
-  (^String [table-kw]
-   (let [{schema :schema, table-name :name} (db/select-one [Table :name :schema] :id (data/id table-kw))]
-     (name (hsql/qualify schema table-name))))
-  (^String [table-kw field-kw]
-   (db/select-one-field :name Field :id (data/id table-kw field-kw))))
-
-(defn- quote-identifier [identifier]
-  (first (hsql/format (keyword identifier)
-           :quoting (generic-sql/quote-style datasets/*driver*))))
-
-(def ^:private ^{:arglists '([table-kw] [table-kw field-kw])} ^String quoted-identifier
-  "Return a *QUOTED* identifier for a Table or Field. (This behaves just like `identifier`, but quotes the result)."
-  (comp quote-identifier identifier))
-
 ;; make sure we can do a basic query with a SQL source-query
 (datasets/expect-with-engines (non-timeseries-engines-with-feature :nested-queries)
   {:rows [[1 -165.374  4 3 "Red Medicine"                 10.0646]
@@ -89,10 +70,11 @@
           [3 -118.428 11 2 "The Apple Pan"                34.0406]
           [4 -118.465 29 2 "Wurstküche"                   33.9997]
           [5 -118.261 20 2 "Brite Spot Family Restaurant" 34.0778]]
-   :cols [{:name "id",          :base_type (data/expected-base-type->actual :type/Integer)}
+   ;; Oracle doesn't have Integer types, they always come back as DECIMAL
+   :cols [{:name "id",          :base_type (case datasets/*engine* :oracle :type/Decimal :type/Integer)}
           {:name "longitude",   :base_type :type/Float}
-          {:name "category_id", :base_type (data/expected-base-type->actual :type/Integer)}
-          {:name "price",       :base_type (data/expected-base-type->actual :type/Integer)}
+          {:name "category_id", :base_type (case datasets/*engine* :oracle :type/Decimal :type/Integer)}
+          {:name "price",       :base_type (case datasets/*engine* :oracle :type/Decimal :type/Integer)}
           {:name "name",        :base_type :type/Text}
           {:name "latitude",    :base_type :type/Float}]}
   (format-rows-by [int (partial u/round-to-decimals 4) int int str (partial u/round-to-decimals 4)]
@@ -100,14 +82,15 @@
       (qp/process-query
         {:database (data/id)
          :type     :query
-         :query    {:source-query {:native (format "SELECT %s, %s, %s, %s, %s, %s FROM %s"
-                                                   (quoted-identifier :venues :id)
-                                                   (quoted-identifier :venues :longitude)
-                                                   (quoted-identifier :venues :category_id)
-                                                   (quoted-identifier :venues :price)
-                                                   (quoted-identifier :venues :name)
-                                                   (quoted-identifier :venues :latitude)
-                                                   (quoted-identifier :venues))}
+         :query    {:source-query {:native (:query
+                                            (qp/query->native
+                                              (data/mbql-query venues
+                                                {:fields [[:field-id $id]
+                                                          [:field-id $longitude]
+                                                          [:field-id $category_id]
+                                                          [:field-id $price]
+                                                          [:field-id $name]
+                                                          [:field-id $latitude]]})))}
                     :order-by     [[:asc [:field-literal (data/format-name :id) :type/Integer]]]
                     :limit        5}}))))
 
@@ -197,13 +180,13 @@
 
 ;; make sure we can do a query with breakout and aggregation using a SQL source query
 (datasets/expect-with-engines (non-timeseries-engines-with-feature :nested-queries)
-  breakout-results
+   breakout-results
   (rows+cols
     (format-rows-by [int int]
       (qp/process-query
         {:database (data/id)
          :type     :query
-         :query    {:source-query {:native (format "SELECT * FROM %s" (quoted-identifier :venues))}
+         :query    {:source-query {:native (:query (qp/query->native (data/mbql-query venues)))}
                     :aggregation  [:count]
                     :breakout     [[:field-literal (keyword (data/format-name :price)) :type/Integer]]}}))))
 
@@ -503,6 +486,9 @@
         (qp/process-query (query-with-source-card card)))
       results-metadata))
 
+(defn- identifier [table-kw field-kw]
+  (db/select-one-field :name Field :id (data/id table-kw field-kw)))
+
 ;; make sure using a time interval filter works
 (datasets/expect-with-engines (non-timeseries-engines-with-feature :nested-queries)
   :completed
diff --git a/test/metabase/sync/sync_metadata/comments_test.clj b/test/metabase/sync/sync_metadata/comments_test.clj
index 2954c56320e..b28c818afa4 100644
--- a/test/metabase/sync/sync_metadata/comments_test.clj
+++ b/test/metabase/sync/sync_metadata/comments_test.clj
@@ -61,7 +61,7 @@
     {:name (data/format-name "comment_after_sync"), :description "added comment"}}
   (data/with-temp-db [db comment-after-sync]
     ;; modify the source DB to add the comment and resync
-    (i/create-db! ds/*driver* (assoc-in comment-after-sync [:table-definitions 0 :field-definitions 0 :field-comment] "added comment") true)
+    (i/create-db! ds/*driver* (assoc-in comment-after-sync [:table-definitions 0 :field-definitions 0 :field-comment] "added comment"), {:skip-drop-db? true})
     (sync/sync-table! (Table (data/id "comment_after_sync")))
     (db->fields db)))
 
@@ -98,7 +98,7 @@
 (ds/expect-with-engines #{:h2 :postgres}
   #{{:name (data/format-name "table_with_comment_after_sync"), :description "added comment"}}
   (data/with-temp-db [db (basic-table "table_with_comment_after_sync" nil)]
-     ;; modify the source DB to add the comment and resync
-     (i/create-db! ds/*driver* (basic-table "table_with_comment_after_sync" "added comment") true)
-     (metabase.sync.sync-metadata.tables/sync-tables! db)
-     (db->tables db)))
+    ;; modify the source DB to add the comment and resync
+    (i/create-db! ds/*driver* (basic-table "table_with_comment_after_sync" "added comment") {:skip-drop-db? true})
+    (metabase.sync.sync-metadata.tables/sync-tables! db)
+    (db->tables db)))
diff --git a/test/metabase/test/data/bigquery.clj b/test/metabase/test/data/bigquery.clj
index 65833494aa3..23e0e38d253 100644
--- a/test/metabase/test/data/bigquery.clj
+++ b/test/metabase/test/data/bigquery.clj
@@ -200,35 +200,38 @@
 (def ^:private existing-datasets
   (atom #{}))
 
-(defn- create-db! [{:keys [database-name table-definitions]}]
-  {:pre [(seq database-name) (sequential? table-definitions)]}
-    ;; fetch existing datasets if we haven't done so yet
-  (when-not (seq @existing-datasets)
-    (reset! existing-datasets (set (existing-dataset-names)))
-    (println "These BigQuery datasets have already been loaded:\n" (u/pprint-to-str (sort @existing-datasets))))
-  ;; now check and see if we need to create the requested one
-  (let [database-name (normalize-name database-name)]
-    (when-not (contains? @existing-datasets database-name)
-      (try
-        (u/auto-retry 10
-          ;; if the dataset failed to load successfully last time around, destroy whatever was loaded so we start
-          ;; again from a blank slate
-          (u/ignore-exceptions
-            (destroy-dataset! database-name))
-          (create-dataset! database-name)
-          ;; do this in parallel because otherwise it can literally take an hour to load something like
-          ;; fifty_one_different_tables
-          (u/pdoseq [tabledef table-definitions]
-            (load-tabledef! database-name tabledef))
-          (swap! existing-datasets conj database-name)
-          (println (u/format-color 'green "[OK]")))
-        ;; if creating the dataset ultimately fails to complete, then delete it so it will hopefully work next time
-        ;; around
-        (catch Throwable e
-          (u/ignore-exceptions
-            (println (u/format-color 'red "Failed to load BigQuery dataset '%s'." database-name))
-            (destroy-dataset! database-name))
-          (throw e))))))
+(defn- create-db!
+  ([db-def]
+   (create-db! db-def nil))
+  ([{:keys [database-name table-definitions]} _]
+   {:pre [(seq database-name) (sequential? table-definitions)]}
+   ;; fetch existing datasets if we haven't done so yet
+   (when-not (seq @existing-datasets)
+     (reset! existing-datasets (set (existing-dataset-names)))
+     (println "These BigQuery datasets have already been loaded:\n" (u/pprint-to-str (sort @existing-datasets))))
+   ;; now check and see if we need to create the requested one
+   (let [database-name (normalize-name database-name)]
+     (when-not (contains? @existing-datasets database-name)
+       (try
+         (u/auto-retry 10
+           ;; if the dataset failed to load successfully last time around, destroy whatever was loaded so we start
+           ;; again from a blank slate
+           (u/ignore-exceptions
+             (destroy-dataset! database-name))
+           (create-dataset! database-name)
+           ;; do this in parallel because otherwise it can literally take an hour to load something like
+           ;; fifty_one_different_tables
+           (u/pdoseq [tabledef table-definitions]
+             (load-tabledef! database-name tabledef))
+           (swap! existing-datasets conj database-name)
+           (println (u/format-color 'green "[OK]")))
+         ;; if creating the dataset ultimately fails to complete, then delete it so it will hopefully work next time
+         ;; around
+         (catch Throwable e
+           (u/ignore-exceptions
+             (println (u/format-color 'red "Failed to load BigQuery dataset '%s'." database-name))
+             (destroy-dataset! database-name))
+           (throw e)))))))
 
 
 ;;; --------------------------------------------- IDriverTestExtensions ----------------------------------------------
diff --git a/test/metabase/test/data/generic_sql.clj b/test/metabase/test/data/generic_sql.clj
index 265055d8d45..40f27321504 100644
--- a/test/metabase/test/data/generic_sql.clj
+++ b/test/metabase/test/data/generic_sql.clj
@@ -68,6 +68,7 @@
   (pk-field-name ^String [this]
     "*Optional* Name of a PK field. Defaults to `\"id\"`.")
 
+  ;; TODO - WHAT ABOUT SCHEMA NAME???
   (qualified-name-components [this, ^String database-name]
                              [this, ^String database-name, ^String table-name]
                              [this, ^String database-name, ^String table-name, ^String field-name]
@@ -222,13 +223,19 @@
                                     (du/->Timestamp v du/utc)
                                     v))))))
 
+(defn add-ids
+  "Add an `:id` column to each row in `rows`, for databases that should have data inserted with the ID explicitly
+  specified."
+  [rows]
+  (for [[i row] (m/indexed rows)]
+    (assoc row :id (inc i))))
+
 (defn load-data-add-ids
   "Add IDs to each row, presumabily for doing a parallel insert. This arg should go before `load-data-chunked` or
   `load-data-one-at-a-time`."
   [insert!]
   (fn [rows]
-    (insert! (vec (for [[i row] (m/indexed rows)]
-                    (assoc row :id (inc i)))))))
+    (insert! (vec (add-ids rows)))))
 
 (defn load-data-chunked
   "Insert rows in chunks, which default to 200 rows each."
@@ -290,6 +297,7 @@
 (def load-data-all-at-once!            "Insert all rows at once."                             (make-load-data-fn))
 (def load-data-chunked!                "Insert rows in chunks of 200 at a time."              (make-load-data-fn load-data-chunked))
 (def load-data-one-at-a-time!          "Insert rows one at a time."                           (make-load-data-fn load-data-one-at-a-time))
+(def load-data-add-ids!                "Insert all rows at once; add IDs."                    (make-load-data-fn load-data-add-ids))
 (def load-data-chunked-parallel!       "Insert rows in chunks of 200 at a time, in parallel." (make-load-data-fn load-data-add-ids (partial load-data-chunked pmap)))
 (def load-data-one-at-a-time-parallel! "Insert rows one at a time, in parallel."              (make-load-data-fn load-data-add-ids (partial load-data-one-at-a-time pmap)))
 ;; ^ the parallel versions aren't neccesarily faster than the sequential versions for all drivers so make sure to do some profiling in order to pick the appropriate implementation
@@ -354,39 +362,42 @@
         (execute! driver context dbdef (s/replace statement #"â…‹" ";"))))))
 
 (defn- create-db!
-  ([driver database-definition]
-    (create-db! driver database-definition false))
-  ([driver {:keys [table-definitions], :as dbdef} skip-drop-db?]
-    (when-not skip-drop-db?
-      ;; Exec SQL for creating the DB
-      (execute-sql! driver :server dbdef (str (drop-db-if-exists-sql driver dbdef) ";\n"
-                                              (create-db-sql driver dbdef))))
-    ;; Build combined statement for creating tables + FKs + comments
-    (let [statements (atom [])]
-      ;; Add the SQL for creating each Table
-      (doseq [tabledef table-definitions]
-        (swap! statements conj (drop-table-if-exists-sql driver dbdef tabledef)
-               (create-table-sql driver dbdef tabledef)))
-      ;; Add the SQL for adding FK constraints
-      (doseq [{:keys [field-definitions], :as tabledef} table-definitions]
-        (doseq [{:keys [fk], :as fielddef} field-definitions]
-          (when fk
-            (swap! statements conj (add-fk-sql driver dbdef tabledef fielddef)))))
-      ;; Add the SQL for adding table comments
-      (doseq [{:keys [table-comment], :as tabledef} table-definitions]
-        (when table-comment
-          (swap! statements conj (standalone-table-comment-sql driver dbdef tabledef))))
-      ;; Add the SQL for adding column comments
-      (doseq [{:keys [field-definitions], :as tabledef} table-definitions]
-        (doseq [{:keys [field-comment], :as fielddef} field-definitions]
-          (when field-comment
-            (swap! statements conj (standalone-column-comment-sql driver dbdef tabledef fielddef)))))
-      ;; exec the combined statement
-      (execute-sql! driver :db dbdef (s/join ";\n" (map hx/unescape-dots @statements))))
+  "Default implementation of `create-db!` for SQL drivers."
+  ([driver db-def]
+   (create-db! driver db-def nil))
+  ([driver {:keys [table-definitions], :as dbdef} {:keys [skip-drop-db?]
+                                                   :or   {skip-drop-db? false}}]
+   (when-not skip-drop-db?
+     ;; Exec SQL for creating the DB
+     (execute-sql! driver :server dbdef (str (drop-db-if-exists-sql driver dbdef) ";\n"
+                                             (create-db-sql driver dbdef))))
+   ;; Build combined statement for creating tables + FKs + comments
+   (let [statements (atom [])]
+     ;; Add the SQL for creating each Table
+     (doseq [tabledef table-definitions]
+       (swap! statements conj (drop-table-if-exists-sql driver dbdef tabledef)
+              (create-table-sql driver dbdef tabledef)))
+
+     ;; Add the SQL for adding FK constraints
+     (doseq [{:keys [field-definitions], :as tabledef} table-definitions]
+       (doseq [{:keys [fk], :as fielddef} field-definitions]
+         (when fk
+           (swap! statements conj (add-fk-sql driver dbdef tabledef fielddef)))))
+     ;; Add the SQL for adding table comments
+     (doseq [{:keys [table-comment], :as tabledef} table-definitions]
+       (when table-comment
+         (swap! statements conj (standalone-table-comment-sql driver dbdef tabledef))))
+     ;; Add the SQL for adding column comments
+     (doseq [{:keys [field-definitions], :as tabledef} table-definitions]
+       (doseq [{:keys [field-comment], :as fielddef} field-definitions]
+         (when field-comment
+           (swap! statements conj (standalone-column-comment-sql driver dbdef tabledef fielddef)))))
+     ;; exec the combined statement
+     (execute-sql! driver :db dbdef (s/join ";\n" (map hx/unescape-dots @statements))))
    ;; Now load the data for each Table
    (doseq [tabledef table-definitions]
      (du/profile (format "load-data for %s %s %s" (name driver) (:database-name dbdef) (:table-name tabledef))
-        (load-data! driver dbdef tabledef)))))
+       (load-data! driver dbdef tabledef)))))
 
 
 (def IDriverTestExtensionsMixin
diff --git a/test/metabase/test/data/interface.clj b/test/metabase/test/data/interface.clj
index 32d6a0960d9..7d538d13269 100644
--- a/test/metabase/test/data/interface.clj
+++ b/test/metabase/test/data/interface.clj
@@ -110,17 +110,21 @@
     "Return the connection details map that should be used to connect to this database (i.e. a Metabase `Database`
      details map). CONTEXT is one of:
 
-     *  `:server` - Return details for making the connection in a way that isn't DB-specific (e.g., for
-                    creating/destroying databases)
-     *  `:db`     - Return details for connecting specifically to the DB.")
+ *  `:server` - Return details for making the connection in a way that isn't DB-specific (e.g., for
+                creating/destroying databases)
+ *  `:db`     - Return details for connecting specifically to the DB.")
 
-  (create-db! [this, ^DatabaseDefinition database-definition]
-              [this, ^DatabaseDefinition database-definition, ^Boolean skip-drop-db?]
+  (create-db!
+    [this, ^DatabaseDefinition database-definition]
+    [this, ^DatabaseDefinition database-definition {:keys [skip-drop-db?]}]
     "Create a new database from DATABASE-DEFINITION, including adding tables, fields, and foreign key constraints,
-     and add the appropriate data. This method should drop existing databases with the same name if applicable,
-     unless the skip-drop-db? arg is true. This is to workaround a scenario where the postgres driver terminates
-     the connection before dropping the DB and causes some tests to fail.
-     (This refers to creating the actual *DBMS* database itself, *not* a Metabase `Database` object.)")
+     and add the appropriate data. This method should drop existing databases with the same name if applicable, unless
+     the skip-drop-db? arg is true. This is to workaround a scenario where the postgres driver terminates the
+     connection before dropping the DB and causes some tests to fail.
+     (This refers to creating the actual *DBMS* database itself, *not* a Metabase `Database` object.)
+
+ Optional `options` as third param. Currently supported options include `skip-drop-db?`. If unspecified,`skip-drop-db?`
+ should default to `false`.")
 
   ;; TODO - this would be more useful if DATABASE-DEFINITION was a parameter
   (default-schema ^String [this]
@@ -129,7 +133,7 @@
   (expected-base-type->actual [this base-type]
     "*OPTIONAL*. Return the base type type that is actually used to store `Fields` of BASE-TYPE.
      The default implementation of this method is an identity fn. This is provided so DBs that don't support a given
-     BASE-TYPE used in the test data can specifiy what type we should expect in the results instead.  For example,
+     BASE-TYPE used in the test data can specifiy what type we should expect in the results instead. For example,
      Oracle has no `INTEGER` data types, so `:type/Integer` test values are instead stored as `NUMBER`, which we map
      to `:type/Decimal`.")
 
@@ -166,17 +170,19 @@
 (defn create-table-definition
   "Convenience for creating a `TableDefinition`."
   ^TableDefinition [^String table-name, field-definition-maps rows]
-  (s/validate TableDefinition (map->TableDefinition {:table-name        table-name
-                                                     :rows              rows
-                                                     :field-definitions (mapv create-field-definition field-definition-maps)})))
+  (s/validate TableDefinition (map->TableDefinition
+                               {:table-name        table-name
+                                :rows              rows
+                                :field-definitions (mapv create-field-definition field-definition-maps)})))
 
 (defn create-database-definition
   "Convenience for creating a new `DatabaseDefinition`."
   {:style/indent 1}
   ^DatabaseDefinition [^String database-name & table-name+field-definition-maps+rows]
-  (s/validate DatabaseDefinition (map->DatabaseDefinition {:database-name     database-name
-                                                           :table-definitions (mapv (partial apply create-table-definition)
-                                                                                    table-name+field-definition-maps+rows)})))
+  (s/validate DatabaseDefinition (map->DatabaseDefinition
+                                  {:database-name     database-name
+                                   :table-definitions (mapv (partial apply create-table-definition)
+                                                            table-name+field-definition-maps+rows)})))
 
 (def ^:private ^:const edn-definitions-dir "./test/metabase/test/data/dataset_definitions/")
 
@@ -184,12 +190,9 @@
   (edn/read-string (slurp (str edn-definitions-dir dbname ".edn"))))
 
 (defn update-table-def
-  "Function useful for modifying a table definition before it's
-  applied. Will invoke `UPDATE-TABLE-DEF-FN` on the vector of column
-  definitions and `UPDATE-ROWS-FN` with the vector of rows in the
-  database definition. `TABLE-DEF` is the database
-  definition (typically used directly in a `def-database-definition`
-  invocation)."
+  "Function useful for modifying a table definition before it's applied. Will invoke `UPDATE-TABLE-DEF-FN` on the vector
+  of column definitions and `UPDATE-ROWS-FN` with the vector of rows in the database definition. `TABLE-DEF` is the
+  database definition (typically used directly in a `def-database-definition` invocation)."
   [table-name-to-update update-table-def-fn update-rows-fn table-def]
   (vec
    (for [[table-name table-def rows :as orig-table-def] table-def]
diff --git a/test/metabase/test/data/mongo.clj b/test/metabase/test/data/mongo.clj
index 1317bd9b629..fba2247223d 100644
--- a/test/metabase/test/data/mongo.clj
+++ b/test/metabase/test/data/mongo.clj
@@ -18,26 +18,30 @@
   (with-open [mongo-connection (mg/connect (database->connection-details dbdef))]
     (mg/drop-db mongo-connection (i/escaped-name dbdef))))
 
-(defn- create-db! [{:keys [table-definitions], :as dbdef}]
-  (destroy-db! dbdef)
-  (with-mongo-connection [mongo-db (database->connection-details dbdef)]
-    (doseq [{:keys [field-definitions table-name rows]} table-definitions]
-      (let [field-names (for [field-definition field-definitions]
-                          (keyword (:field-name field-definition)))]
-        ;; Use map-indexed so we can get an ID for each row (index + 1)
-        (doseq [[i row] (map-indexed (partial vector) rows)]
-          (let [row (for [v row]
-                      ;; Conver all the java.sql.Timestamps to java.util.Date, because the Mongo driver insists on being obnoxious and going from
-                      ;; using Timestamps in 2.x to Dates in 3.x
-                      (if (instance? java.sql.Timestamp v)
-                        (java.util.Date. (.getTime ^java.sql.Timestamp v))
-                        v))]
-            (try
-              ;; Insert each row
-              (mc/insert mongo-db (name table-name) (assoc (zipmap field-names row)
-                                                      :_id (inc i)))
-              ;; If row already exists then nothing to do
-              (catch com.mongodb.MongoException _))))))))
+(defn- create-db!
+  ([db-def]
+   (create-db! db-def nil))
+  ([{:keys [table-definitions], :as dbdef} {:keys [skip-drop-db?], :or {skip-drop-db? false}}]
+   (when-not skip-drop-db?
+     (destroy-db! dbdef))
+   (with-mongo-connection [mongo-db (database->connection-details dbdef)]
+     (doseq [{:keys [field-definitions table-name rows]} table-definitions]
+       (let [field-names (for [field-definition field-definitions]
+                           (keyword (:field-name field-definition)))]
+         ;; Use map-indexed so we can get an ID for each row (index + 1)
+         (doseq [[i row] (map-indexed (partial vector) rows)]
+           (let [row (for [v row]
+                       ;; Conver all the java.sql.Timestamps to java.util.Date, because the Mongo driver insists on being obnoxious and going from
+                       ;; using Timestamps in 2.x to Dates in 3.x
+                       (if (instance? java.sql.Timestamp v)
+                         (java.util.Date. (.getTime ^java.sql.Timestamp v))
+                         v))]
+             (try
+               ;; Insert each row
+               (mc/insert mongo-db (name table-name) (assoc (zipmap field-names row)
+                                                       :_id (inc i)))
+               ;; If row already exists then nothing to do
+               (catch com.mongodb.MongoException _)))))))))
 
 
 (u/strict-extend MongoDriver
diff --git a/test/metabase/test/data/presto.clj b/test/metabase/test/data/presto.clj
index 21f4b6550dd..2c0f0279461 100644
--- a/test/metabase/test/data/presto.clj
+++ b/test/metabase/test/data/presto.clj
@@ -71,18 +71,22 @@
       query
       (unprepare/unprepare (cons query params) :quote-escape "'", :iso-8601-fn :from_iso8601_timestamp))))
 
-(defn- create-db! [{:keys [table-definitions] :as dbdef}]
-  (let [details (database->connection-details :db dbdef)]
-    (doseq [tabledef table-definitions
-            :let [rows       (:rows tabledef)
-                  ;; generate an ID for each row because we don't have auto increments
-                  keyed-rows (map-indexed (fn [i row] (conj row (inc i))) rows)
-                  ;; make 100 rows batches since we have to inline everything
-                  batches    (partition 100 100 nil keyed-rows)]]
-      (#'presto/execute-presto-query! details (drop-table-if-exists-sql dbdef tabledef))
-      (#'presto/execute-presto-query! details (create-table-sql dbdef tabledef))
-      (doseq [batch batches]
-        (#'presto/execute-presto-query! details (insert-sql dbdef tabledef batch))))))
+(defn- create-db!
+  ([db-def]
+   (create-db! db-def nil))
+  ([{:keys [table-definitions] :as dbdef} {:keys [skip-drop-db?], :or {skip-drop-db? false}}]
+   (let [details (database->connection-details :db dbdef)]
+     (doseq [tabledef table-definitions
+             :let     [rows       (:rows tabledef)
+                       ;; generate an ID for each row because we don't have auto increments
+                       keyed-rows (map-indexed (fn [i row] (conj row (inc i))) rows)
+                       ;; make 100 rows batches since we have to inline everything
+                       batches    (partition 100 100 nil keyed-rows)]]
+       (when-not skip-drop-db?
+         (#'presto/execute-presto-query! details (drop-table-if-exists-sql dbdef tabledef)))
+       (#'presto/execute-presto-query! details (create-table-sql dbdef tabledef))
+       (doseq [batch batches]
+         (#'presto/execute-presto-query! details (insert-sql dbdef tabledef batch)))))))
 
 ;;; IDriverTestExtensions implementation
 
-- 
GitLab