Skip to content
Snippets Groups Projects
Unverified Commit 76fb3484 authored by Case Nelson's avatar Case Nelson Committed by GitHub
Browse files

Add ddl persistence support for mysql (#23443)

* Add ddl persistence support for mysql

Since mysql does not have the concept of a (non-select) statement timeout,
it is necessary to use core.async to monitor the create table as select
statement manually. While mariadb does support a general statement
timeout, this will work for both vendors. There are some race conditions
here but in the context of persistence, this is ok. Mainly, the timeout
could hit right as the create table finishes. If that happens, the
statement will not be killed (because it was closed when finished), the
table will be created, but the function will throw a timeout exception.
In the context of persistence, this is ok since persisted-info will
remain inactive and the next refresh will drop the table and try again.

The other aspect to consider is that mysql doesn't rollback ddl changes.
So we have to manually keep track of what was done when testing the db
and undo the steps on failure. This isn't perfect, as the undo itself
could throw an exception, thus leaving the schema or test table in place,
that should only happen if a role has a create table but not a delete
table grant. In any event, if a our schema or test table sticks around,
they could be removed manually by a dba, or ignored without harm.

* Handle integrating mysql, and make sure exceptions are logged

* Add tests for execute-with-timeout

* Fix linters

* Switch to async/thread because reflection hints are lost by a/go and it's better not to do io in go-blocks

* Fix test
parent ae2b0106
No related branches found
No related tags found
No related merge requests found
(ns metabase.driver.ddl.mysql
(:require [clojure.core.async :as a]
[clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[clojure.tools.logging :as log]
[java-time :as t]
[metabase.driver.ddl.interface :as ddl.i]
[metabase.driver.ddl.sql :as ddl.sql]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
[metabase.public-settings :as public-settings]
[metabase.query-processor :as qp]
[metabase.util.i18n :refer [trs]])
(:import java.sql.SQLNonTransientConnectionException))
(defn- exec-async [conn-chan db-spec sql+params]
(a/thread
(jdbc/with-db-connection [conn db-spec]
(try
(let [pid (:pid (first (ddl.sql/jdbc-query conn ["select connection_id() pid"])))]
(a/put! conn-chan pid)
(ddl.sql/jdbc-query conn sql+params))
(catch SQLNonTransientConnectionException _e
;; Our connection may be killed due to timeout, `kill` will throw an appropriate exception
nil)
(catch Exception e
(log/warn e)
(throw e))))
true))
(defn- kill [conn pid]
(let [results (ddl.sql/jdbc-query conn ["show processlist"])
result? (some (fn [r]
(and (= (:id r) pid)
(str/starts-with? (or (:info r) "") "-- Metabase")))
results)]
(when result?
;; Can't use a prepared parameter with these statements
(ddl.sql/execute! conn [(str "kill " pid)])
(throw (Exception. (trs "Killed mysql process id {0} due to timeout." pid))))))
(defn- execute-with-timeout!
"Spins up another channel to execute the statement.
If `timeout-ms` passes, send a kill statement to stop execution and throw exception
Otherwise return results returned by channel."
[conn db-spec timeout-ms sql+params]
(let [conn-chan (a/chan)
exec-chan (exec-async conn-chan db-spec sql+params)
pid (a/<!! conn-chan)
timeout-chan (a/timeout timeout-ms)
[v port] (a/alts!! [timeout-chan exec-chan])]
(cond
(= port timeout-chan) (kill conn pid)
(= port exec-chan) v)))
(defmethod ddl.i/refresh! :mysql [_driver database definition dataset-query]
(let [{:keys [query params]} (qp/compile dataset-query)
db-spec (sql-jdbc.conn/db->pooled-connection-spec database)]
(jdbc/with-db-connection [conn db-spec]
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database (:table-name definition))])
;; It is possible that this fails and rollback would not restore the table.
;; That is ok, the persisted-info will be marked inactive and the next refresh will try again.
(execute-with-timeout! conn
db-spec
(.toMillis (t/minutes 10))
(into [(ddl.sql/create-table-sql database definition query)] params))
{:state :success})))
(defmethod ddl.i/unpersist! :mysql
[_driver database persisted-info]
(jdbc/with-db-connection [conn (sql-jdbc.conn/db->pooled-connection-spec database)]
(try
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database (:table_name persisted-info))])
(catch Exception e
(log/warn e)
(throw e)))))
(defmethod ddl.i/check-can-persist :mysql
[database]
(let [schema-name (ddl.i/schema-name database (public-settings/site-uuid))
table-name (format "persistence_check_%s" (rand-int 10000))
db-spec (sql-jdbc.conn/db->pooled-connection-spec database)
steps [[:persist.check/create-schema
(fn check-schema [conn]
(let [existing-schemas (->> ["select schema_name from information_schema.schemata"]
(ddl.sql/jdbc-query conn)
(map :schema_name)
(into #{}))]
(or (contains? existing-schemas schema-name)
(ddl.sql/execute! conn [(ddl.sql/create-schema-sql database)]))))
(fn undo-check-schema [conn]
(ddl.sql/execute! conn [(ddl.sql/drop-schema-sql database)]))]
[:persist.check/create-table
(fn create-table [conn]
(execute-with-timeout! conn
db-spec
(.toMillis (t/minutes 10))
[(ddl.sql/create-table-sql
database
{:table-name table-name
:field-definitions [{:field-name "field"
:base-type :type/Text}]}
"values (1)")]))
(fn undo-create-table [conn]
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database table-name)]))]
[:persist.check/read-table
(fn read-table [conn]
(ddl.sql/jdbc-query conn [(format "select * from %s.%s"
schema-name table-name)]))
(constantly nil)]
[:persist.check/delete-table
(fn delete-table [conn]
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database table-name)]))
;; This will never be called, if the last step fails it does not need to be undone
(constantly nil)]]]
;; Unlike postgres, mysql ddl clauses will not rollback in a transaction.
;; So we keep track of undo-steps to manually rollback previous, completed steps.
(jdbc/with-db-connection [conn db-spec]
(loop [[[step stepfn undofn] & remaining] steps
undo-steps []]
(let [result (try (stepfn conn)
(log/info (trs "Step {0} was successful for db {1}"
step (:name database)))
::valid
(catch Exception e
(log/warn (trs "Error in `{0}` while checking for model persistence permissions." step))
(log/warn e)
(try
(doseq [[undo-step undofn] (reverse undo-steps)]
(log/warn (trs "Undoing step `{0}` for db {1}" undo-step (:name database)))
(undofn conn))
(catch Exception _e
(log/warn (trs "Unable to rollback database check for model persistence"))))
step))]
(cond (and (= result ::valid) remaining)
(recur remaining (conj undo-steps [step undofn]))
(= result ::valid)
[true :persist.check/valid]
:else
[false step]))))))
......@@ -4,46 +4,12 @@
[honeysql.core :as hsql]
[java-time :as t]
[metabase.driver.ddl.interface :as ddl.i]
[metabase.driver.ddl.sql :as ddl.sql]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
[metabase.driver.sql.util :as sql.u]
[metabase.public-settings :as public-settings]
[metabase.query-processor :as qp]
[metabase.util.i18n :refer [trs]]))
(defn- quote-fn [driver]
(fn quote [ident entity]
(sql.u/quote-name driver ident (ddl.i/format-name driver entity))))
(defn- add-remark [sql-str]
(str "-- Metabase\n"
sql-str))
(defn- execute! [conn [sql & params]]
(jdbc/execute! conn (into [(add-remark sql)] params)))
(defn- jdbc-query [conn [sql & params]]
(jdbc/query conn (into [(add-remark sql)] params)))
(defn- create-schema-sql
"SQL string to create a schema suitable for postgres"
[{driver :engine :as database}]
(let [q (quote-fn driver)]
(format "create schema %s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid))))))
(defn- create-table-sql [{driver :engine :as database} definition query]
(let [q (quote-fn driver)]
(format "create table %s.%s as %s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid)))
(q :table (:table-name definition))
query)))
(defn- drop-table-sql [{driver :engine :as database} table-name]
(let [q (quote-fn driver)]
(format "drop table if exists %s.%s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid)))
(q :table table-name))))
(defn- set-statement-timeout!
"Must be called within a transaction.
Sets the current transaction `statement_timeout` to the minimum
......@@ -54,7 +20,7 @@
(let [existing-timeout (->> (hsql/format {:select [:setting]
:from [:pg_settings]
:where [:= :name "statement_timeout"]})
(jdbc-query tx)
(ddl.sql/jdbc-query tx)
first
:setting
parse-long)
......@@ -63,25 +29,22 @@
ten-minutes
(min ten-minutes existing-timeout))]
;; Can't use a prepared parameter with these statements
(execute! tx [(format "SET LOCAL statement_timeout TO '%s'" (str new-timeout))])))
(ddl.sql/execute! tx [(format "SET LOCAL statement_timeout TO '%s'" (str new-timeout))])))
(defmethod ddl.i/refresh! :postgres [_driver database definition dataset-query]
(try
(let [{:keys [query params]} (qp/compile dataset-query)]
(jdbc/with-db-connection [conn (sql-jdbc.conn/db->pooled-connection-spec database)]
(jdbc/with-db-transaction [tx conn]
(set-statement-timeout! tx)
(execute! tx [(drop-table-sql database (:table-name definition))])
(execute! tx (into [(create-table-sql database definition query)] params)))
{:state :success}))
(catch Exception e
{:state :error :error (ex-message e)})))
(let [{:keys [query params]} (qp/compile dataset-query)]
(jdbc/with-db-connection [conn (sql-jdbc.conn/db->pooled-connection-spec database)]
(jdbc/with-db-transaction [tx conn]
(set-statement-timeout! tx)
(ddl.sql/execute! tx [(ddl.sql/drop-table-sql database (:table-name definition))])
(ddl.sql/execute! tx (into [(ddl.sql/create-table-sql database definition query)] params)))
{:state :success})))
(defmethod ddl.i/unpersist! :postgres
[_driver database persisted-info]
(jdbc/with-db-connection [conn (sql-jdbc.conn/db->pooled-connection-spec database)]
(try
(execute! conn [(drop-table-sql database (:table_name persisted-info))])
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database (:table_name persisted-info))])
(catch Exception e
(log/warn e)
(throw e)))))
......@@ -93,25 +56,25 @@
steps [[:persist.check/create-schema
(fn check-schema [conn]
(let [existing-schemas (->> ["select schema_name from information_schema.schemata"]
(jdbc-query conn)
(ddl.sql/jdbc-query conn)
(map :schema_name)
(into #{}))]
(or (contains? existing-schemas schema-name)
(execute! conn [(create-schema-sql database)]))))]
(ddl.sql/execute! conn [(ddl.sql/create-schema-sql database)]))))]
[:persist.check/create-table
(fn create-table [conn]
(execute! conn [(create-table-sql database
(ddl.sql/execute! conn [(ddl.sql/create-table-sql database
{:table-name table-name
:field-definitions [{:field-name "field"
:base-type :type/Text}]}
"values (1)")]))]
[:persist.check/read-table
(fn read-table [conn]
(jdbc-query conn [(format "select * from %s.%s"
(ddl.sql/jdbc-query conn [(format "select * from %s.%s"
schema-name table-name)]))]
[:persist.check/delete-table
(fn delete-table [conn]
(execute! conn [(drop-table-sql database table-name)]))]]]
(ddl.sql/execute! conn [(ddl.sql/drop-table-sql database table-name)]))]]]
(jdbc/with-db-connection [conn (sql-jdbc.conn/db->pooled-connection-spec database)]
(jdbc/with-db-transaction
[tx conn]
......
(ns metabase.driver.ddl.sql
(:require [clojure.java.jdbc :as jdbc]
[metabase.driver.ddl.interface :as ddl.i]
[metabase.driver.sql.util :as sql.u]
[metabase.public-settings :as public-settings]))
(defn- quote-fn [driver]
(fn quote [ident entity]
(sql.u/quote-name driver ident (ddl.i/format-name driver entity))))
(defn- add-remark [sql-str]
(str "-- Metabase\n"
sql-str))
(defn execute!
"Executes sql and params with a standard remark prepended to the statement."
[conn [sql & params]]
(jdbc/execute! conn (into [(add-remark sql)] params)))
(defn jdbc-query
"Queries sql and params with a standard remark prepended to the statement."
[conn [sql & params]]
(jdbc/query conn (into [(add-remark sql)] params)))
(defn create-schema-sql
"SQL string to create a schema suitable"
[{driver :engine :as database}]
(let [q (quote-fn driver)]
(format "create schema %s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid))))))
(defn drop-schema-sql
"SQL string to drop a schema suitable"
[{driver :engine :as database}]
(let [q (quote-fn driver)]
(format "drop schema if exists %s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid))))))
(defn create-table-sql
"Formats a create table statement within our own cache schema"
[{driver :engine :as database} definition query]
(let [q (quote-fn driver)]
(format "create table %s.%s as %s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid)))
(q :table (:table-name definition))
query)))
(defn drop-table-sql
"Formats a drop table statement within our own cache schema"
[{driver :engine :as database} table-name]
(let [q (quote-fn driver)]
(format "drop table if exists %s.%s"
(q :table (ddl.i/schema-name database (public-settings/site-uuid)))
(q :table table-name))))
......@@ -11,6 +11,7 @@
[metabase.db.spec :as mdb.spec]
[metabase.driver :as driver]
[metabase.driver.common :as driver.common]
[metabase.driver.ddl.mysql :as ddl.mysql]
[metabase.driver.sql-jdbc.common :as sql-jdbc.common]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
......@@ -27,6 +28,8 @@
[metabase.util.i18n :refer [deferred-tru trs]])
(:import [java.sql DatabaseMetaData ResultSet ResultSetMetaData Types]
[java.time LocalDateTime OffsetDateTime OffsetTime ZonedDateTime]))
(comment
ddl.mysql/keep-me)
(driver/register! :mysql, :parent :sql-jdbc)
......@@ -37,6 +40,12 @@
(defmethod driver/database-supports? [:mysql :nested-field-columns] [_ _ _] true)
(defmethod driver/database-supports? [:mysql :persist-models] [_driver _feat _db] true)
(defmethod driver/database-supports? [:mysql :persist-models-enabled]
[_driver _feat db]
(-> db :options :persist-models-enabled))
(defmethod driver/supports? [:mysql :regex] [_ _] false)
(defmethod driver/supports? [:mysql :percentile-aggregations] [_ _] false)
......
......@@ -7,6 +7,7 @@
[metabase.config :as config]
[metabase.db.metadata-queries :as metadata-queries]
[metabase.driver :as driver]
[metabase.driver.ddl.mysql :as ddl.mysql]
[metabase.driver.mysql :as mysql]
[metabase.driver.sql-jdbc.connection :as sql-jdbc.conn]
[metabase.driver.sql-jdbc.sync :as sql-jdbc.sync]
......@@ -475,3 +476,13 @@
"ORDER BY JSON_EXTRACT(`json`.`json_bit`, ?) ASC")
(:query compile-res)))
(is (= '("$.\"1234\"" "$.\"1234\"" "$.\"1234\"") (:params compile-res))))))))))
(deftest ddl.execute-with-timeout-test
(mt/test-driver :mysql
(mt/dataset json
(let [db-spec (sql-jdbc.conn/db->pooled-connection-spec (mt/db))]
(is (thrown-with-msg?
Exception
#"Killed mysql process id \d+ due to timeout."
(#'ddl.mysql/execute-with-timeout! db-spec db-spec 10 ["select sleep(5)"])))
(is (= true (#'ddl.mysql/execute-with-timeout! db-spec db-spec 5000 ["select sleep(0.1) as val"])))))))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment